Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix]support table function on compute nodes #34577

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
import com.starrocks.rpc.BackendServiceClient;
import com.starrocks.rpc.PGetFileSchemaRequest;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.sql.ast.ImportColumnDesc;
import com.starrocks.system.Backend;
import com.starrocks.system.ComputeNode;
import com.starrocks.thrift.TBrokerFileStatus;
import com.starrocks.thrift.TBrokerRangeDesc;
import com.starrocks.thrift.TBrokerScanRange;
Expand Down Expand Up @@ -287,13 +288,21 @@ private List<Column> getFileSchema() throws DdlException {
return Lists.newArrayList();
}
TNetworkAddress address;
List<Long> backendIds = GlobalStateMgr.getCurrentSystemInfo().getBackendIds(true);
if (backendIds.isEmpty()) {
throw new DdlException("Failed to send proxy request. No alive backends");
List<Long> nodeIds = GlobalStateMgr.getCurrentSystemInfo().getBackendIds(true);
if (RunMode.getCurrentRunMode() == RunMode.SHARED_DATA) {
nodeIds.addAll(GlobalStateMgr.getCurrentSystemInfo().getComputeNodeIds(true));
}
Collections.shuffle(backendIds);
Backend be = GlobalStateMgr.getCurrentSystemInfo().getBackend(backendIds.get(0));
address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
if (nodeIds.isEmpty()) {
if (RunMode.getCurrentRunMode() == RunMode.SHARED_NOTHING) {
throw new DdlException("Failed to send proxy request. No alive backends");
} else {
throw new DdlException("Failed to send proxy request. No alive backends or compute nodes");
}
}

Collections.shuffle(nodeIds);
ComputeNode node = GlobalStateMgr.getCurrentSystemInfo().getBackendOrComputeNode(nodeIds.get(0));
address = new TNetworkAddress(node.getHost(), node.getBrpcPort());

PGetFileSchemaResult result;
try {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code reviews can vary in depth and focus depending on the context of the code change and what the reviewers are looking for. Here’s a summary of potential suggestions for this particular diff snippet review:

  1. Consistent Naming:

    • The renaming from Backend to ComputeNode might be implying that Backend has been either replaced or extended by ComputeNode. It's crucial to verify everywhere in the codebase that this change is consistently applied and documented if it is part of a larger refactoring.
  2. Exception Message Clarity:

    • The new exception message "Failed to send proxy request. No alive backends" could potentially be made more clear, as you're not just looking for "backends" anymore due to the shared-nothing addition but also "compute nodes." Adjusting the messages to reflect the current state of the system could reduce debugging time in the future.
  3. Feature Toggles or Runtime Configuration:

    • Adding conditional logic based on RunMode.getCurrentRunMode() seems like it might be introducing runtime configuration control flow, which can be both powerful and risky. Ensure there is enough test coverage for both branches (shared-nothing and otherwise) and that these runtime changes are well-documented.
  4. Error Handling and Retry Logic:

    • The adjustment adds new exception paths which could benefit from more sophisticated error handling. Depending on how critical this getFileSchema function is within the system, adding retry logic or better fallbacks might be worthwhile.
  5. Thread Safety and Concurrency:

    • Collections.shuffle(nodeIds); implies a mutation of the nodeIds list. If nodeIds is shared across threads, this operation could introduce race conditions. Ensure that either nodeIds is confined to a single thread or is otherwise safely published between threads.
  6. Code Duplication:

    • There appears to be some code duplication between the cases for handling when nodeIds is empty and when looking for backends or compute nodes. If similar logic is used in multiple places, consider abstracting it into a method to promote reuse and simplify future updates.
  7. Unit Testing:

    • With logic being changed/added (especially around conditions like RunMode.getCurrentRunMode()), unit tests need to be updated or added to cover the new scenarios to ensure that the behavior is as expected.
  8. Performance Implications:

    • By adding additional backend and compute node checks (as well as potentially making requests to each), there can be performance implications. Verify that this does not significantly impact the performance, especially in large deployments.
  9. Documentation and Comments:

    • There aren't any comments in the provided diff. Contextual comments explaining why decisions were made (such as the introduction of different node types and run modes) can be incredibly valuable, particularly when unusual or non-obvious logic is introduced.
  10. Dependency on Global State:

    • The overall logic relies heavily on the global state (GlobalStateMgr). Be aware that such reliance makes unit testing more difficult and may increase coupling, which can lead to issues with maintainability.

Without the broader context or understanding of prior and future pieces of work, these are top-level suggestions. Changes connected with databases, distributed systems, or resilient service design typically involve deeper considerations, which would be reviewed as part of a more thorough code base analysis.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,37 @@

package com.starrocks.catalog;

import com.starrocks.common.DdlException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.system.SystemInfoService;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TableFunctionTableTest {
@Test
public void testNormal() {
Map<String, String> properties = new HashMap<>();

Map<String, String> properties = new HashMap<>();

@Before
public void setUp() {
properties.put("path", "fake://some_bucket/some_path/*");
properties.put("format", "ORC");
properties.put("columns_from_path", "col_path1, col_path2, col_path3");
}

@Test
public void testNormal() {
Assertions.assertDoesNotThrow(() -> {
TableFunctionTable table = new TableFunctionTable(properties);
List<Column> schema = table.getFullSchema();
Expand All @@ -40,4 +56,43 @@ public void testNormal() {
Assertions.assertEquals(new Column("col_path3", ScalarType.createDefaultString(), true), schema.get(4));
});
}

@Test
public void testGetFileSchema(@Mocked GlobalStateMgr globalStateMgr,
@Mocked SystemInfoService systemInfoService) {
new Expectations() {
{
globalStateMgr.getCurrentSystemInfo();
result = systemInfoService;
minTimes = 0;


systemInfoService.getBackendIds(anyBoolean);
result = new ArrayList<>();
minTimes = 0;
}
};

try {
new TableFunctionTable(properties);
} catch (DdlException e) {
Assert.assertTrue(e.getMessage().contains("Failed to send proxy request. No alive backends"));
}


new MockUp<RunMode>() {
@Mock
public RunMode getCurrentRunMode() {
return RunMode.SHARED_DATA;
}
};

try {
new TableFunctionTable(properties);
} catch (DdlException e) {
Assert.assertTrue(e.getMessage().
contains("Failed to send proxy request. No alive backends or compute nodes"));
}

}
}