From 3348eb17251ac3ffb009f8ab1d4ee5b41b137dbf Mon Sep 17 00:00:00 2001 From: zaorangyang Date: Thu, 26 Oct 2023 20:48:51 +0800 Subject: [PATCH] [BugFix] Fix SDK write failure for non-default tables (#298) Signed-off-by: Zaorang Yang --- .../load/stream/DefaultStreamLoadManager.java | 2 +- .../data/load/stream/DefaultStreamLoader.java | 10 ++--- .../properties/StreamLoadProperties.java | 16 ++++---- .../properties/StreamLoadTableProperties.java | 37 ++++++++++++------- .../load/stream/v2/StreamLoadManagerV2.java | 2 +- 5 files changed, 38 insertions(+), 29 deletions(-) diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoadManager.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoadManager.java index 1eda052c..9228d4d8 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoadManager.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoadManager.java @@ -414,7 +414,7 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t synchronized (regions) { region = regions.get(uniqueKey); if (region == null) { - StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey); + StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey, database, table); LabelGenerator labelGenerator = labelGeneratorFactory.create(database, table); region = new BatchTableRegion(uniqueKey, database, table, this, tableProperties, streamLoader, labelGenerator); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java index f0d39671..e6898c8e 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/DefaultStreamLoader.java @@ -153,8 +153,7 @@ public Future send(TableRegion region) { log.warn("Stream load not start"); } if (begin(region)) { - StreamLoadTableProperties tableProperties = properties.getTableProperties(region.getUniqueKey()); - return executorService.submit(() -> send(tableProperties, region)); + return executorService.submit(() -> sendToSR(region)); } else { region.fail(new StreamLoadFailException("Transaction start failed, db : " + region.getDatabase())); } @@ -168,8 +167,7 @@ public Future send(TableRegion region, int delayMs) { log.warn("Stream load not start"); } if (begin(region)) { - StreamLoadTableProperties tableProperties = properties.getTableProperties(region.getUniqueKey()); - return executorService.schedule(() -> send(tableProperties, region), delayMs, TimeUnit.MILLISECONDS); + return executorService.schedule(() -> sendToSR(region), delayMs, TimeUnit.MILLISECONDS); } else { region.fail(new StreamLoadFailException("Transaction start failed, db : " + region.getDatabase())); } @@ -278,7 +276,7 @@ protected void initDefaultHeaders(StreamLoadProperties properties) { .toArray(Header[]::new); } - protected StreamLoadResponse send(StreamLoadTableProperties tableProperties, TableRegion region) { + protected StreamLoadResponse sendToSR(TableRegion region) { try { String host = getAvailableHost(); String sendUrl = getSendUrl(host, region.getDatabase(), region.getTable()); @@ -289,7 +287,7 @@ protected StreamLoadResponse send(StreamLoadTableProperties tableProperties, Tab httpPut.setEntity(region.getHttpEntity()); httpPut.setHeaders(defaultHeaders); - + StreamLoadTableProperties tableProperties = region.getProperties(); for (Map.Entry entry : tableProperties.getProperties().entrySet()) { httpPut.removeHeaders(entry.getKey()); httpPut.addHeader(entry.getKey(), entry.getValue()); diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java index 51ea478d..e255eba0 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadProperties.java @@ -22,7 +22,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.starrocks.data.load.stream.StarRocksVersion; -import com.starrocks.data.load.stream.StreamLoadUtils; import java.io.Serializable; import java.util.Arrays; @@ -166,12 +165,15 @@ public StreamLoadTableProperties getDefaultTableProperties() { return defaultTableProperties; } - public StreamLoadTableProperties getTableProperties(String database, String table) { - return getTableProperties(StreamLoadUtils.getTableUniqueKey(database, table)); - } - - public StreamLoadTableProperties getTableProperties(String uniqueKey) { - return tablePropertiesMap.getOrDefault(uniqueKey, defaultTableProperties); + public StreamLoadTableProperties getTableProperties(String uniqueKey, String database, String table) { + StreamLoadTableProperties tableProperties = tablePropertiesMap.getOrDefault(uniqueKey, defaultTableProperties); + if (!tableProperties.getDatabase().equals(database) || !tableProperties.getDatabase().equals(table)) { + StreamLoadTableProperties.Builder tablePropertiesBuilder = StreamLoadTableProperties.builder(); + tablePropertiesBuilder = tablePropertiesBuilder.copyFrom(tableProperties).database(database).table(table); + return tablePropertiesBuilder.build(); + } else { + return tableProperties; + } } public Map getTablePropertiesMap() { diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java index 4c48f8e8..bffd4fa8 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/properties/StreamLoadTableProperties.java @@ -34,10 +34,9 @@ public class StreamLoadTableProperties implements Serializable { private final String table; private final StreamLoadDataFormat dataFormat; private final Map properties; - - private final boolean enableUpsertDelete; private final long chunkLimit; private final int maxBufferRows; + private final String columns; private StreamLoadTableProperties(Builder builder) { this.database = builder.database; @@ -47,7 +46,6 @@ private StreamLoadTableProperties(Builder builder) { ? StreamLoadUtils.getTableUniqueKey(database, table) : builder.uniqueKey; - this.enableUpsertDelete = builder.enableUpsertDelete; this.dataFormat = builder.dataFormat == null ? StreamLoadDataFormat.JSON : builder.dataFormat; @@ -59,8 +57,11 @@ private StreamLoadTableProperties(Builder builder) { } this.maxBufferRows = builder.maxBufferRows; this.properties = builder.properties; + this.columns = builder.columns; } + public String getColumns() {return columns; } + public String getUniqueKey() { return uniqueKey; } @@ -73,10 +74,6 @@ public String getTable() { return table; } - public boolean isEnableUpsertDelete() { - return enableUpsertDelete; - } - public StreamLoadDataFormat getDataFormat() { return dataFormat; } @@ -102,8 +99,6 @@ public static class Builder { private String database; private String table; private String columns; - - private boolean enableUpsertDelete; private StreamLoadDataFormat dataFormat; private long chunkLimit; private int maxBufferRows = Integer.MAX_VALUE; @@ -114,6 +109,25 @@ private Builder() { } + // This function does not copy the uniqueKey and properties attributes because the uniqueKey + // is generated in the StreamLoadTableProperties constructor. + // The properties only contains three elements(database,table,columns), which are automatically + // populated during the build process. + // TODO: StreamLoadProperties.headers hold properties common to multiple tables, while + // StreamLoadTableProperties.properties hold the specific properties of an individual table. + // This should be taken into consideration during the refactoring. + public Builder copyFrom(StreamLoadTableProperties streamLoadTableProperties) { + // TODO: datbase, table, columns are private propertis for an individual table. + // We may not copy thers private propertis. + database(streamLoadTableProperties.getDatabase()); + table(streamLoadTableProperties.getTable()); + columns(streamLoadTableProperties.getColumns()); + streamLoadDataFormat(streamLoadTableProperties.getDataFormat()); + chunkLimit(streamLoadTableProperties.getChunkLimit()); + maxBufferRows(streamLoadTableProperties.getMaxBufferRows()); + return this; + } + public Builder uniqueKey(String uniqueKey) { this.uniqueKey = uniqueKey; return this; @@ -134,11 +148,6 @@ public Builder columns(String columns) { return this; } - public Builder enableUpsertDelete(boolean enableUpsertDelete) { - this.enableUpsertDelete = enableUpsertDelete; - return this; - } - public Builder streamLoadDataFormat(StreamLoadDataFormat dataFormat) { this.dataFormat = dataFormat; return this; diff --git a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java index 39425bf6..2dd2f982 100644 --- a/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java +++ b/starrocks-stream-load-sdk/src/main/java/com/starrocks/data/load/stream/v2/StreamLoadManagerV2.java @@ -440,7 +440,7 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t synchronized (regions) { region = regions.get(uniqueKey); if (region == null) { - StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey); + StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey, database, table); LabelGenerator labelGenerator = labelGeneratorFactory.create(database, table); region = new TransactionTableRegion(uniqueKey, database, table, this, tableProperties, streamLoader, labelGenerator, maxRetries, retryIntervalInMs);