Skip to content

Commit

Permalink
[BugFix]support table function on compute nodes (#34577)
Browse files Browse the repository at this point in the history
Signed-off-by: abc982627271 <liuxuefen@starrocks.com>
  • Loading branch information
abc982627271 authored Nov 14, 2023
1 parent ab2b465 commit 24a6d0e
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
import com.starrocks.rpc.BackendServiceClient;
import com.starrocks.rpc.PGetFileSchemaRequest;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.sql.ast.ImportColumnDesc;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
import com.starrocks.thrift.TBrokerFileStatus;
import com.starrocks.thrift.TBrokerRangeDesc;
import com.starrocks.thrift.TBrokerScanRange;
Expand Down Expand Up @@ -287,13 +288,21 @@ private List<Column> getFileSchema() throws DdlException {
return Lists.newArrayList();
}
TNetworkAddress address;
List<Long> backendIds = GlobalStateMgr.getCurrentSystemInfo().getBackendIds(true);
if (backendIds.isEmpty()) {
throw new DdlException("Failed to send proxy request. No alive backends");
List<Long> nodeIds = GlobalStateMgr.getCurrentSystemInfo().getBackendIds(true);
if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA) {
nodeIds.addAll(GlobalStateMgr.getCurrentSystemInfo().getComputeNodeIds(true));
}
Collections.shuffle(backendIds);
Backend be = GlobalStateMgr.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
if (nodeIds.isEmpty()) {
if (RunMode.getCurrentRunMode() == RunMode.SHARED_NOTHING) {
throw new DdlException("Failed to send proxy request. No alive backends");
} else {
throw new DdlException("Failed to send proxy request. No alive backends or compute nodes");
}
}

Collections.shuffle(nodeIds);
ComputeNode node = GlobalStateMgr.getCurrentSystemInfo().getBackendOrComputeNode(nodeIds.get(0));
address = new TNetworkAddress(node.getHost(), node.getBrpcPort());

PGetFileSchemaResult result;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,37 @@

package com.starrocks.catalog;

import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.system.Backend;
import com.starrocks.system.SystemInfoService;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TableFunctionTableTest {
@Test
public void testNormal() {
Map<String, String> properties = new HashMap<>();
Map<String, String> properties = new HashMap<>();

@Before
public void setUp() {
properties.put("path", "fake://some_bucket/some_path/*");
properties.put("format", "ORC");
properties.put("columns_from_path", "col_path1, col_path2, col_path3");
}

@Test
public void testNormal() {
Assertions.assertDoesNotThrow(() -> {
TableFunctionTable table = new TableFunctionTable(properties);
List<Column> schema = table.getFullSchema();
Expand All @@ -40,4 +56,73 @@ public void testNormal() {
Assertions.assertEquals(new Column("col_path3", ScalarType.createDefaultString(), true), schema.get(4));
});
}

@Test
public void testGetFileSchema(@Mocked GlobalStateMgr globalStateMgr,
@Mocked SystemInfoService systemInfoService) throws Exception {
new Expectations() {
{
globalStateMgr.getCurrentSystemInfo();
result = systemInfoService;
minTimes = 0;

systemInfoService.getBackendIds(anyBoolean);
result = new ArrayList<>();
minTimes = 0;
}
};

TableFunctionTable t = new TableFunctionTable(properties);

Method method = TableFunctionTable.class.getDeclaredMethod("getFileSchema", null);
method.setAccessible(true);

try {
method.invoke(t, null);
} catch (Exception e) {
Assert.assertTrue(e.getCause().getMessage().contains("Failed to send proxy request. No alive backends"));
}

new MockUp<RunMode>() {
@Mock
public RunMode getCurrentRunMode() {
return RunMode.SHARED_DATA;
}
};

try {
method.invoke(t, null);
} catch (Exception e) {
Assert.assertTrue(e.getCause().getMessage().
contains("Failed to send proxy request. No alive backends or compute nodes"));
}

Backend backend = new Backend(1L, "192.168.1.1", 9050);
backend.setBrpcPort(8050);

List<Long> nodeList = new ArrayList<>();
nodeList.add(1L);

new Expectations() {
{
systemInfoService.getBackendIds(anyBoolean);
result = nodeList;
minTimes = 0;

systemInfoService.getComputeNodeIds(anyBoolean);
result = new ArrayList<>();
minTimes = 0;

systemInfoService.getBackendOrComputeNode(anyLong);
result = backend;
minTimes = 0;
}
};

try {
method.invoke(t, null);
} catch (Exception e) {
Assert.assertFalse(false);
}
}
}

0 comments on commit 24a6d0e

Please sign in to comment.