Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// ColocationGroupProcNode.java
package org.apache.doris.proc;

import org.apache.doris.catalog.ColocationGroup;
import org.apache.doris.catalog.ColocationGroupMgr;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Proc Node for /colocation_group/{group_id}
* 展示指定Colocation Group的详细信息
*/
public class ColocationGroupProcNode implements ProcNode {

// 列名定义(Proc接口返回的表头)
public static final List<String> TITLE_NAMES = new ArrayList<String>() {
{
add("GroupId");
add("GroupName");
add("DbId");
add("TableName");
add("ReplicaNum");
add("DistributionCol");
add("IsStable");
add("CreateTime");
add("LastUpdateTime");
}
};

private final long groupId;

// 构造函数:接收group_id参数
public ColocationGroupProcNode(long groupId) {
this.groupId = groupId;
}

@Override
public ProcResult fetchResult() throws AnalysisException {
// 1. 权限校验(Doris标准权限检查)
if (!ConnectContext.get().getCurrentUser().hasGlobalPriv(PrivPredicate.ADMIN)) {
throw new AnalysisException("Require ADMIN privilege to access colocation group proc");
}

// 2. 获取Colocation Group管理器
ColocationGroupMgr cgMgr = Env.getCurrentEnv().getColocationGroupMgr();
ColocationGroup cg = cgMgr.getColocationGroupById(groupId);

// 3. 校验Group是否存在
if (cg == null) {
throw new AnalysisException("Colocation group with id " + groupId + " not found");
}

// 4. 构造返回结果
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);

// 封装Colocation Group核心信息
List<String> row = new ArrayList<>();
row.add(String.valueOf(cg.getId()));
row.add(cg.getName());
row.add(String.valueOf(cg.getDbId()));
row.add(String.join(",", cg.getTableIds().stream()
.map(tableId -> Env.getCurrentEnv().getTable(tableId).getName())
.toList()));
row.add(String.valueOf(cg.getReplicaNum()));
row.add(cg.getDistributionCol());
row.add(String.valueOf(cg.isStable()));
row.add(cg.getCreateTime().toString());
row.add(cg.getLastUpdateTime().toString());

result.addRow(row);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.resource.Tag;
import org.apache.doris.common.cloud.CloudReplica;//New addition: Incorporation of cloud environment category

import com.google.common.collect.ImmutableList;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// ColocationGroupProcNode.java
package org.apache.doris.proc;

import org.apache.doris.catalog.ColocationGroup;
import org.apache.doris.catalog.ColocationGroupMgr;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Proc Node for /colocation_group/{group_id}
* 展示指定Colocation Group的详细信息
*/
public class ColocationGroupProcNode implements ProcNode {

// 列名定义(Proc接口返回的表头)
public static final List<String> TITLE_NAMES = new ArrayList<String>() {
{
add("GroupId");
add("GroupName");
add("DbId");
add("TableName");
add("ReplicaNum");
add("DistributionCol");
add("IsStable");
add("CreateTime");
add("LastUpdateTime");
}
};

private final long groupId;

// 构造函数:接收group_id参数
public ColocationGroupProcNode(long groupId) {
this.groupId = groupId;
}

@Override
public ProcResult fetchResult() throws AnalysisException {
// 1. 权限校验(Doris标准权限检查)
if (!ConnectContext.get().getCurrentUser().hasGlobalPriv(PrivPredicate.ADMIN)) {
throw new AnalysisException("Require ADMIN privilege to access colocation group proc");
}

// 2. 获取Colocation Group管理器
ColocationGroupMgr cgMgr = Env.getCurrentEnv().getColocationGroupMgr();
ColocationGroup cg = cgMgr.getColocationGroupById(groupId);

// 3. 校验Group是否存在
if (cg == null) {
throw new AnalysisException("Colocation group with id " + groupId + " not found");
}

// 4. 构造返回结果
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);

// 封装Colocation Group核心信息
List<String> row = new ArrayList<>();
row.add(String.valueOf(cg.getId()));
row.add(cg.getName());
row.add(String.valueOf(cg.getDbId()));
row.add(String.join(",", cg.getTableIds().stream()
.map(tableId -> Env.getCurrentEnv().getTable(tableId).getName())
.toList()));
row.add(String.valueOf(cg.getReplicaNum()));
row.add(cg.getDistributionCol());
row.add(String.valueOf(cg.isStable()));
row.add(cg.getCreateTime().toString());
row.add(cg.getLastUpdateTime().toString());

result.addRow(row);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,35 @@ public static void destroy() {
INSTANCE = null;
}

// 在ProcService类中找到init()方法,添加以下代码
private void init() {
// ... 原有代码 ...

// 1. 创建Colocation Group根目录
ProcDir colocationGroupDir = new ProcDir();
// 注册到根Proc目录(/colocation_group)
rootProcDir.register("colocation_group", colocationGroupDir);

// 2. 为/colocation_group目录添加子节点处理器(处理{group_id}参数)
colocationGroupDir.setChildCreator(new ProcNodeCreator() {
@Override
public ProcNode create(List<String> parts) throws AnalysisException {
// 校验参数格式:必须是数字类型的group_id
if (parts.size() != 1) {
throw new AnalysisException("Invalid path: /colocation_group/{group_id}, require exactly one numeric group id");
}
long groupId;
try {
groupId = Long.parseLong(parts.get(0));
} catch (NumberFormatException e) {
throw new AnalysisException("Group id must be a number, got: " + parts.get(0));
}
// 创建并返回具体的ColocationGroupProcNode
return new ColocationGroupProcNode(groupId);
}
});

// ... 原有代码 ...
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// TestColocationGroupProcNode.java
package org.apache.doris.proc;

import org.apache.doris.catalog.ColocationGroup;
import org.apache.doris.catalog.ColocationGroupMgr;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.utframe.UtFrameUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.util.List;

public class TestColocationGroupProcNode {

private long testGroupId;

@Before
public void setUp() throws Exception {
// 初始化测试环境
UtFrameUtils.createMinimalDorisCluster();
ConnectContext ctx = UtFrameUtils.createDefaultCtx();
ConnectContext.setCurrent(ctx);

// 创建测试用Colocation Group
ColocationGroupMgr cgMgr = Env.getCurrentEnv().getColocationGroupMgr();
ColocationGroup testGroup = new ColocationGroup(1000L, "test_cg", 100L,
List.of(10001L), 3, "id", true);
cgMgr.addColocationGroup(testGroup);
testGroupId = testGroup.getId();
}

@Test
public void testFetchResult() throws AnalysisException {
// 测试存在的Group ID
ColocationGroupProcNode procNode = new ColocationGroupProcNode(testGroupId);
ProcResult result = procNode.fetchResult();

// 验证返回结果
Assert.assertEquals(ColocationGroupProcNode.TITLE_NAMES, result.getNames());
Assert.assertEquals(1, result.getRows().size());

List<String> row = result.getRows().get(0);
Assert.assertEquals(String.valueOf(testGroupId), row.get(0));
Assert.assertEquals("test_cg", row.get(1));
Assert.assertEquals("100", row.get(2));

// 测试不存在的Group ID
try {
ColocationGroupProcNode invalidProcNode = new ColocationGroupProcNode(9999L);
invalidProcNode.fetchResult();
Assert.fail("Should throw AnalysisException for invalid group id");
} catch (AnalysisException e) {
Assert.assertTrue(e.getMessage().contains("not found"));
}
}
}