Skip to content

Commit

Permalink
[Enhancement] Support publish version for lake table (StarRocks#7642)
Browse files Browse the repository at this point in the history
  • Loading branch information
sduzh authored Jun 23, 2022
1 parent ff88445 commit 12f2a5a
Show file tree
Hide file tree
Showing 25 changed files with 597 additions and 285 deletions.
3 changes: 2 additions & 1 deletion be/src/gen_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ set(SRC_FILES
${GEN_CPP_DIR}/segment.pb.cc
${GEN_CPP_DIR}/persistent_index.pb.cc
${GEN_CPP_DIR}/tablet_schema.pb.cc
${GEN_CPP_DIR}/starlake.pb.cc
${GEN_CPP_DIR}/lake_types.pb.cc
${GEN_CPP_DIR}/lake_service.pb.cc
#$${GEN_CPP_DIR}/opcode/functions.cc
#$${GEN_CPP_DIR}/opcode/vector-functions.cc
#$${GEN_CPP_DIR}/opcode/opcode-registry-init.cc
Expand Down
19 changes: 6 additions & 13 deletions be/src/service/service_be/lake_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <brpc/controller.h>

#include "common/status.h"
#include "gen_cpp/lake_types.pb.h"
#include "runtime/exec_env.h"
#include "storage/lake/tablet.h"
#include "storage/lake/tablet_manager.h"
Expand Down Expand Up @@ -94,32 +95,25 @@ void LakeServiceImpl::publish_version(::google::protobuf::RpcController* control
::starrocks::lake::PublishVersionResponse* response,
::google::protobuf::Closure* done) {
brpc::ClosureGuard guard(done);
(void)controller;
auto cntl = static_cast<brpc::Controller*>(controller);

if (!request->has_base_version()) {
response->mutable_status()->set_status_code(TStatusCode::INVALID_ARGUMENT);
response->mutable_status()->add_error_msgs("missing base version");
cntl->SetFailed("missing base version");
return;
}
if (!request->has_new_version()) {
response->mutable_status()->set_status_code(TStatusCode::INVALID_ARGUMENT);
response->mutable_status()->add_error_msgs("missing new version");
cntl->SetFailed("missing new version");
return;
}
if (request->txn_ids_size() == 0) {
response->mutable_status()->set_status_code(TStatusCode::INVALID_ARGUMENT);
response->mutable_status()->add_error_msgs("missing txn_ids");
cntl->SetFailed("missing txn_ids");
return;
}
if (request->tablet_ids_size() == 0) {
response->mutable_status()->set_status_code(TStatusCode::INVALID_ARGUMENT);
response->mutable_status()->add_error_msgs("missing tablet_ids");
cntl->SetFailed("missing tablet_ids");
return;
}

// Will not update status code since here, only failed_tablets will be updated.
response->mutable_status()->set_status_code(TStatusCode::OK);

// TODO move the execution to TaskWorkerPool
for (const auto& tablet_id : request->tablet_ids()) {
auto res = _env->lake_tablet_manager()->get_tablet(tablet_id);
Expand All @@ -144,7 +138,6 @@ void LakeServiceImpl::abort_txn(::google::protobuf::RpcController* controller,

// TODO: move the execution to TaskWorkerPool
// This rpc never fail.
response->mutable_status()->set_status_code(TStatusCode::OK);
for (const auto& tablet_id : request->tablet_ids()) {
auto tablet = _env->lake_tablet_manager()->get_tablet(tablet_id);
if (!tablet.ok()) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/service/service_be/lake_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#pragma once

#include "gen_cpp/starlake.pb.h"
#include "gen_cpp/lake_service.pb.h"

namespace starrocks {
class ExecEnv;
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
#include "fmt/format.h"
#include "fs/fs.h"
#include "gen_cpp/AgentService_types.h"
#include "gen_cpp/lake_types.pb.h"
#include "gen_cpp/olap_file.pb.h"
#include "gen_cpp/starlake.pb.h"
#include "gutil/strings/util.h"
#include "storage/lake/group_assigner.h"
#include "storage/lake/metadata_iterator.h"
Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/tablet_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#include <memory>

#include "gen_cpp/starlake.pb.h"
#include "gen_cpp/lake_types.pb.h"

namespace starrocks::lake {

Expand Down
2 changes: 1 addition & 1 deletion be/src/storage/lake/txn_log.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

#include <memory>

#include "gen_cpp/starlake.pb.h"
#include "gen_cpp/lake_types.pb.h"

namespace starrocks::lake {

Expand Down
38 changes: 17 additions & 21 deletions be/test/service/lake_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "service/service_be/lake_service.h"

#include <brpc/controller.h>
#include <gtest/gtest.h>

#include "runtime/exec_env.h"
Expand Down Expand Up @@ -78,47 +79,51 @@ class LakeServiceTest : public testing::Test {
};

TEST_F(LakeServiceTest, test_publish_version_missing_tablet_ids) {
brpc::Controller cntl;
lake::PublishVersionRequest request;
lake::PublishVersionResponse response;
request.set_base_version(1);
request.set_new_version(2);
request.add_txn_ids(1000);
_lake_service.publish_version(nullptr, &request, &response, nullptr);
ASSERT_NE(TStatusCode::OK, response.status().status_code());
ASSERT_EQ("missing tablet_ids", response.status().error_msgs(0));
_lake_service.publish_version(&cntl, &request, &response, nullptr);
ASSERT_TRUE(cntl.Failed());
ASSERT_EQ("missing tablet_ids", cntl.ErrorText());
}

TEST_F(LakeServiceTest, test_publish_version_missing_txn_ids) {
brpc::Controller cntl;
lake::PublishVersionRequest request;
lake::PublishVersionResponse response;
request.set_base_version(1);
request.set_new_version(2);
request.add_tablet_ids(_tablet_id);
_lake_service.publish_version(nullptr, &request, &response, nullptr);
ASSERT_NE(TStatusCode::OK, response.status().status_code());
ASSERT_EQ("missing txn_ids", response.status().error_msgs(0));
_lake_service.publish_version(&cntl, &request, &response, nullptr);
ASSERT_TRUE(cntl.Failed());
ASSERT_EQ("missing txn_ids", cntl.ErrorText());
}

TEST_F(LakeServiceTest, test_publish_version_missing_base_version) {
brpc::Controller cntl;
lake::PublishVersionRequest request;
lake::PublishVersionResponse response;
request.set_new_version(2);
request.add_tablet_ids(_tablet_id);
request.add_txn_ids(1000);
_lake_service.publish_version(nullptr, &request, &response, nullptr);
ASSERT_NE(TStatusCode::OK, response.status().status_code());
ASSERT_EQ("missing base version", response.status().error_msgs(0));
_lake_service.publish_version(&cntl, &request, &response, nullptr);
ASSERT_TRUE(cntl.Failed());
ASSERT_EQ("missing base version", cntl.ErrorText());
}

TEST_F(LakeServiceTest, test_publish_version_missing_new_version) {
brpc::Controller cntl;
lake::PublishVersionRequest request;
lake::PublishVersionResponse response;
request.set_base_version(1);
request.add_tablet_ids(_tablet_id);
request.add_txn_ids(1000);
_lake_service.publish_version(nullptr, &request, &response, nullptr);
ASSERT_NE(TStatusCode::OK, response.status().status_code());
ASSERT_EQ("missing new version", response.status().error_msgs(0));
_lake_service.publish_version(&cntl, &request, &response, nullptr);
ASSERT_TRUE(cntl.Failed());
ASSERT_EQ("missing new version", cntl.ErrorText());
}

TEST_F(LakeServiceTest, test_publish_version_for_write) {
Expand Down Expand Up @@ -154,7 +159,6 @@ TEST_F(LakeServiceTest, test_publish_version_for_write) {
request.add_tablet_ids(_tablet_id);
request.add_txn_ids(1000);
_lake_service.publish_version(nullptr, &request, &response, nullptr);
ASSERT_EQ(TStatusCode::OK, response.status().status_code()) << response.status().error_msgs(0);
ASSERT_EQ(0, response.failed_tablets_size());
}
// Publish txn 1001
Expand All @@ -166,7 +170,6 @@ TEST_F(LakeServiceTest, test_publish_version_for_write) {
request.add_tablet_ids(_tablet_id);
request.add_txn_ids(1001);
_lake_service.publish_version(nullptr, &request, &response, nullptr);
ASSERT_EQ(TStatusCode::OK, response.status().status_code()) << response.status().error_msgs(0);
ASSERT_EQ(0, response.failed_tablets_size());
}

Expand Down Expand Up @@ -197,7 +200,6 @@ TEST_F(LakeServiceTest, test_publish_version_for_write) {
request.add_tablet_ids(_tablet_id);
request.add_txn_ids(1001);
_lake_service.publish_version(nullptr, &request, &response, nullptr);
ASSERT_EQ(TStatusCode::OK, response.status().status_code()) << response.status().error_msgs(0);
ASSERT_EQ(0, response.failed_tablets_size());
}
// Send publish version request again with an non-exist tablet
Expand All @@ -210,7 +212,6 @@ TEST_F(LakeServiceTest, test_publish_version_for_write) {
request.add_tablet_ids(9999);
request.add_txn_ids(1001);
_lake_service.publish_version(nullptr, &request, &response, nullptr);
ASSERT_EQ(TStatusCode::OK, response.status().status_code()) << response.status().error_msgs(0);
ASSERT_EQ(1, response.failed_tablets_size());
ASSERT_EQ(9999, response.failed_tablets(0));
}
Expand All @@ -223,7 +224,6 @@ TEST_F(LakeServiceTest, test_publish_version_for_write) {
request.add_tablet_ids(_tablet_id);
request.add_txn_ids(1111);
_lake_service.publish_version(nullptr, &request, &response, nullptr);
ASSERT_EQ(TStatusCode::OK, response.status().status_code()) << response.status().error_msgs(0);
ASSERT_EQ(1, response.failed_tablets_size());
ASSERT_EQ(_tablet_id, response.failed_tablets(0));
}
Expand All @@ -238,7 +238,6 @@ TEST_F(LakeServiceTest, test_publish_version_for_write) {
request.add_tablet_ids(_tablet_id);
request.add_txn_ids(1001);
_lake_service.publish_version(nullptr, &request, &response, nullptr);
ASSERT_EQ(TStatusCode::OK, response.status().status_code()) << response.status().error_msgs(0);
ASSERT_EQ(0, response.failed_tablets_size());
}
}
Expand Down Expand Up @@ -274,9 +273,6 @@ TEST_F(LakeServiceTest, test_abort) {
request.add_txn_ids(1000);
request.add_txn_ids(1001);
_lake_service.abort_txn(nullptr, &request, &response, nullptr);
ASSERT_TRUE(response.has_status());
ASSERT_TRUE(response.status().has_status_code());
ASSERT_EQ(TStatusCode::OK, response.status().status_code()) << response.status().error_msgs(0);
}

ASSIGN_OR_ABORT(auto tablet, _tablet_mgr->get_tablet(_tablet_id));
Expand Down
20 changes: 20 additions & 0 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -801,6 +801,26 @@ under the License.
<skip>${skip.plugin}</skip>
</configuration>
</execution>
<execution>
<id>gen_proto_starlake</id>
<phase>generate-sources</phase>
<goals>
<!-- DO NOT use goal 'java', it will terminate the VM after done -->
<goal>exec</goal>
</goals>
<configuration>
<executable>${java.home}/bin/java</executable>
<arguments>
<argument>-jar</argument>
<argument>
${settings.localRepository}/com/baidu/jprotobuf/${jprotobuf.version}/jprotobuf-${jprotobuf.version}-jar-with-dependencies.jar
</argument>
<argument>--java_out=${basedir}/target/generated-sources/proto</argument>
<argument>${starrocks.home}/gensrc/proto/lake_service.proto</argument>
</arguments>
<skip>${skip.plugin}</skip>
</configuration>
</execution>
</executions>
</plugin>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,11 +323,10 @@ private TOlapTableLocationParam createLocation(OlapTable table) throws UserExcep
Multimap<Long, Long> allBePathsMap = HashMultimap.create();
for (Long partitionId : partitionIds) {
Partition partition = table.getPartition(partitionId);
boolean useStarOS = partition.isUseStarOS();
int quorum = table.getPartitionInfo().getQuorumNum(partition.getId());
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
if (useStarOS) {
if (table.isLakeTable()) {
locationParam.addToTablets(new TTabletLocation(
tablet.getId(), Lists.newArrayList(((LakeTablet) tablet).getPrimaryBackendId())));
} else {
Expand Down
38 changes: 26 additions & 12 deletions fe/fe-core/src/main/java/com/starrocks/rpc/BackendServiceProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class BackendServiceProxy {

private RpcClient rpcClient;
// TODO(zc): use TNetworkAddress,
private Map<TNetworkAddress, PBackendService> serviceMap;
private Map<TNetworkAddress, PBackendService> backendServiceMap;
private Map<TNetworkAddress, LakeService> lakeServiceMap;

static {
int javaRuntimeVersion = JdkUtils.getJavaVersionAsInteger(System.getProperty("java.version"));
Expand All @@ -76,23 +77,36 @@ private BackendServiceProxy() {
rpcOptions.setMaxIdleSize(Config.brpc_connection_pool_size);
rpcOptions.setMaxWait(Config.brpc_idle_wait_max_time);
rpcClient = new RpcClient(rpcOptions);
serviceMap = Maps.newHashMap();
backendServiceMap = Maps.newHashMap();
}

public static BackendServiceProxy getInstance() {
return SingletonHolder.INSTANCE;
}

protected synchronized PBackendService getProxy(TNetworkAddress address) {
PBackendService service = serviceMap.get(address);
protected synchronized PBackendService getBackendService(TNetworkAddress address) {
PBackendService service = backendServiceMap.get(address);
if (service != null) {
return service;
}
ProtobufRpcProxy<PBackendService> proxy = new ProtobufRpcProxy(rpcClient, PBackendService.class);
proxy.setHost(address.getHostname());
proxy.setPort(address.getPort());
service = proxy.proxy();
serviceMap.put(address, service);
backendServiceMap.put(address, service);
return service;
}

public synchronized LakeService getLakeService(TNetworkAddress address) {
LakeService service = lakeServiceMap.get(address);
if (service != null) {
return service;
}
ProtobufRpcProxy<LakeService> proxy = new ProtobufRpcProxy(rpcClient, LakeService.class);
proxy.setHost(address.getHostname());
proxy.setPort(address.getPort());
service = proxy.proxy();
lakeServiceMap.put(address, service);
return service;
}

Expand All @@ -102,7 +116,7 @@ public Future<PExecPlanFragmentResult> execPlanFragmentAsync(
final PExecPlanFragmentRequest pRequest = new PExecPlanFragmentRequest();
pRequest.setRequest(tRequest);
try {
final PBackendService service = getProxy(address);
final PBackendService service = getBackendService(address);
return service.execPlanFragmentAsync(pRequest);
} catch (NoSuchElementException e) {
try {
Expand All @@ -112,7 +126,7 @@ public Future<PExecPlanFragmentResult> execPlanFragmentAsync(
} catch (InterruptedException interruptedException) {
// do nothing
}
final PBackendService service = getProxy(address);
final PBackendService service = getBackendService(address);
return service.execPlanFragmentAsync(pRequest);
} catch (NoSuchElementException noSuchElementException) {
LOG.warn("Execute plan fragment retry failed, address={}:{}",
Expand Down Expand Up @@ -141,7 +155,7 @@ public Future<PCancelPlanFragmentResult> cancelPlanFragmentAsync(
qid.lo = queryId.lo;
pRequest.queryId = qid;
try {
final PBackendService service = getProxy(address);
final PBackendService service = getBackendService(address);
return service.cancelPlanFragmentAsync(pRequest);
} catch (NoSuchElementException e) {
// retry
Expand All @@ -151,7 +165,7 @@ public Future<PCancelPlanFragmentResult> cancelPlanFragmentAsync(
} catch (InterruptedException interruptedException) {
// do nothing
}
final PBackendService service = getProxy(address);
final PBackendService service = getBackendService(address);
return service.cancelPlanFragmentAsync(pRequest);
} catch (NoSuchElementException noSuchElementException) {
LOG.warn("Cancel plan fragment retry failed, address={}:{}",
Expand All @@ -168,7 +182,7 @@ public Future<PCancelPlanFragmentResult> cancelPlanFragmentAsync(
public Future<PFetchDataResult> fetchDataAsync(
TNetworkAddress address, PFetchDataRequest request) throws RpcException {
try {
PBackendService service = getProxy(address);
PBackendService service = getBackendService(address);
return service.fetchDataAsync(request);
} catch (Throwable e) {
LOG.warn("fetch data catch a exception, address={}:{}",
Expand All @@ -180,7 +194,7 @@ public Future<PFetchDataResult> fetchDataAsync(
public Future<PTriggerProfileReportResult> triggerProfileReportAsync(
TNetworkAddress address, PTriggerProfileReportRequest request) throws RpcException {
try {
final PBackendService service = getProxy(address);
final PBackendService service = getBackendService(address);
return service.triggerProfileReport(request);
} catch (Throwable e) {
LOG.warn("fetch data catch a exception, address={}:{}",
Expand All @@ -192,7 +206,7 @@ public Future<PTriggerProfileReportResult> triggerProfileReportAsync(
public Future<PProxyResult> getInfo(
TNetworkAddress address, PProxyRequest request) throws RpcException {
try {
final PBackendService service = getProxy(address);
final PBackendService service = getBackendService(address);
return service.getInfo(request);
} catch (Throwable e) {
LOG.warn("failed to get info, address={}:{}", address.getHostname(), address.getPort(), e);
Expand Down
Loading

0 comments on commit 12f2a5a

Please sign in to comment.