Skip to content

Commit

Permalink
[BugFix] Fix SDK write failure for non-default tables (#298)
Browse files Browse the repository at this point in the history
Signed-off-by: Zaorang Yang <zaorangy@gmail.com>
  • Loading branch information
zaorangyang authored and banmoy committed Dec 4, 2023
1 parent cdc8c78 commit 3348eb1
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,7 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t
synchronized (regions) {
region = regions.get(uniqueKey);
if (region == null) {
StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey);
StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey, database, table);
LabelGenerator labelGenerator = labelGeneratorFactory.create(database, table);
region = new BatchTableRegion(uniqueKey, database, table, this, tableProperties,
streamLoader, labelGenerator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,7 @@ public Future<StreamLoadResponse> send(TableRegion region) {
log.warn("Stream load not start");
}
if (begin(region)) {
StreamLoadTableProperties tableProperties = properties.getTableProperties(region.getUniqueKey());
return executorService.submit(() -> send(tableProperties, region));
return executorService.submit(() -> sendToSR(region));
} else {
region.fail(new StreamLoadFailException("Transaction start failed, db : " + region.getDatabase()));
}
Expand All @@ -168,8 +167,7 @@ public Future<StreamLoadResponse> send(TableRegion region, int delayMs) {
log.warn("Stream load not start");
}
if (begin(region)) {
StreamLoadTableProperties tableProperties = properties.getTableProperties(region.getUniqueKey());
return executorService.schedule(() -> send(tableProperties, region), delayMs, TimeUnit.MILLISECONDS);
return executorService.schedule(() -> sendToSR(region), delayMs, TimeUnit.MILLISECONDS);
} else {
region.fail(new StreamLoadFailException("Transaction start failed, db : " + region.getDatabase()));
}
Expand Down Expand Up @@ -278,7 +276,7 @@ protected void initDefaultHeaders(StreamLoadProperties properties) {
.toArray(Header[]::new);
}

protected StreamLoadResponse send(StreamLoadTableProperties tableProperties, TableRegion region) {
protected StreamLoadResponse sendToSR(TableRegion region) {
try {
String host = getAvailableHost();
String sendUrl = getSendUrl(host, region.getDatabase(), region.getTable());
Expand All @@ -289,7 +287,7 @@ protected StreamLoadResponse send(StreamLoadTableProperties tableProperties, Tab
httpPut.setEntity(region.getHttpEntity());

httpPut.setHeaders(defaultHeaders);

StreamLoadTableProperties tableProperties = region.getProperties();
for (Map.Entry<String, String> entry : tableProperties.getProperties().entrySet()) {
httpPut.removeHeaders(entry.getKey());
httpPut.addHeader(entry.getKey(), entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.starrocks.data.load.stream.StarRocksVersion;
import com.starrocks.data.load.stream.StreamLoadUtils;

import java.io.Serializable;
import java.util.Arrays;
Expand Down Expand Up @@ -166,12 +165,15 @@ public StreamLoadTableProperties getDefaultTableProperties() {
return defaultTableProperties;
}

public StreamLoadTableProperties getTableProperties(String database, String table) {
return getTableProperties(StreamLoadUtils.getTableUniqueKey(database, table));
}

public StreamLoadTableProperties getTableProperties(String uniqueKey) {
return tablePropertiesMap.getOrDefault(uniqueKey, defaultTableProperties);
public StreamLoadTableProperties getTableProperties(String uniqueKey, String database, String table) {
StreamLoadTableProperties tableProperties = tablePropertiesMap.getOrDefault(uniqueKey, defaultTableProperties);
if (!tableProperties.getDatabase().equals(database) || !tableProperties.getDatabase().equals(table)) {
StreamLoadTableProperties.Builder tablePropertiesBuilder = StreamLoadTableProperties.builder();
tablePropertiesBuilder = tablePropertiesBuilder.copyFrom(tableProperties).database(database).table(table);
return tablePropertiesBuilder.build();
} else {
return tableProperties;
}
}

public Map<String, StreamLoadTableProperties> getTablePropertiesMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,9 @@ public class StreamLoadTableProperties implements Serializable {
private final String table;
private final StreamLoadDataFormat dataFormat;
private final Map<String, String> properties;

private final boolean enableUpsertDelete;
private final long chunkLimit;
private final int maxBufferRows;
private final String columns;

private StreamLoadTableProperties(Builder builder) {
this.database = builder.database;
Expand All @@ -47,7 +46,6 @@ private StreamLoadTableProperties(Builder builder) {
? StreamLoadUtils.getTableUniqueKey(database, table)
: builder.uniqueKey;

this.enableUpsertDelete = builder.enableUpsertDelete;
this.dataFormat = builder.dataFormat == null
? StreamLoadDataFormat.JSON
: builder.dataFormat;
Expand All @@ -59,8 +57,11 @@ private StreamLoadTableProperties(Builder builder) {
}
this.maxBufferRows = builder.maxBufferRows;
this.properties = builder.properties;
this.columns = builder.columns;
}

public String getColumns() {return columns; }

public String getUniqueKey() {
return uniqueKey;
}
Expand All @@ -73,10 +74,6 @@ public String getTable() {
return table;
}

public boolean isEnableUpsertDelete() {
return enableUpsertDelete;
}

public StreamLoadDataFormat getDataFormat() {
return dataFormat;
}
Expand All @@ -102,8 +99,6 @@ public static class Builder {
private String database;
private String table;
private String columns;

private boolean enableUpsertDelete;
private StreamLoadDataFormat dataFormat;
private long chunkLimit;
private int maxBufferRows = Integer.MAX_VALUE;
Expand All @@ -114,6 +109,25 @@ private Builder() {

}

// This function does not copy the uniqueKey and properties attributes because the uniqueKey
// is generated in the StreamLoadTableProperties constructor.
// The properties only contains three elements(database,table,columns), which are automatically
// populated during the build process.
// TODO: StreamLoadProperties.headers hold properties common to multiple tables, while
// StreamLoadTableProperties.properties hold the specific properties of an individual table.
// This should be taken into consideration during the refactoring.
public Builder copyFrom(StreamLoadTableProperties streamLoadTableProperties) {
// TODO: datbase, table, columns are private propertis for an individual table.
// We may not copy thers private propertis.
database(streamLoadTableProperties.getDatabase());
table(streamLoadTableProperties.getTable());
columns(streamLoadTableProperties.getColumns());
streamLoadDataFormat(streamLoadTableProperties.getDataFormat());
chunkLimit(streamLoadTableProperties.getChunkLimit());
maxBufferRows(streamLoadTableProperties.getMaxBufferRows());
return this;
}

public Builder uniqueKey(String uniqueKey) {
this.uniqueKey = uniqueKey;
return this;
Expand All @@ -134,11 +148,6 @@ public Builder columns(String columns) {
return this;
}

public Builder enableUpsertDelete(boolean enableUpsertDelete) {
this.enableUpsertDelete = enableUpsertDelete;
return this;
}

public Builder streamLoadDataFormat(StreamLoadDataFormat dataFormat) {
this.dataFormat = dataFormat;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ protected TableRegion getCacheRegion(String uniqueKey, String database, String t
synchronized (regions) {
region = regions.get(uniqueKey);
if (region == null) {
StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey);
StreamLoadTableProperties tableProperties = properties.getTableProperties(uniqueKey, database, table);
LabelGenerator labelGenerator = labelGeneratorFactory.create(database, table);
region = new TransactionTableRegion(uniqueKey, database, table, this,
tableProperties, streamLoader, labelGenerator, maxRetries, retryIntervalInMs);
Expand Down

0 comments on commit 3348eb1

Please sign in to comment.