Skip to content

Commit

Permalink
[SR-3215] Bugfix: kafka_default_offsets does not take effect in routi…
Browse files Browse the repository at this point in the history
…ne load (StarRocks#3)
  • Loading branch information
wyb authored Sep 5, 2021
1 parent 286ea6c commit 6322617
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.Optional;

/**
* ALTER ROUTINE LOAD db.label
* ALTER ROUTINE LOAD FOR db.label
* PROPERTIES(
* ...
* )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,34 +427,44 @@ private void checkKafkaProperties() throws AnalysisException {
throw new AnalysisException(KAFKA_TOPIC_PROPERTY + " is a required property");
}

// check custom kafka property before check partitions,
// because partitions can use kafka_default_offsets property
analyzeCustomProperties(dataSourceProperties, customKafkaProperties);

// check partitions
String kafkaPartitionsString = dataSourceProperties.get(KAFKA_PARTITIONS_PROPERTY);
if (kafkaPartitionsString != null) {
analyzeKafkaPartitionProperty(kafkaPartitionsString, this.kafkaPartitionOffsets);
analyzeKafkaPartitionProperty(kafkaPartitionsString, customKafkaProperties, kafkaPartitionOffsets);
}

// check offset
String kafkaOffsetsString = dataSourceProperties.get(KAFKA_OFFSETS_PROPERTY);
if (kafkaOffsetsString != null) {
analyzeKafkaOffsetProperty(kafkaOffsetsString, this.kafkaPartitionOffsets);
analyzeKafkaOffsetProperty(kafkaOffsetsString, kafkaPartitionOffsets);
}

// check custom kafka property
analyzeCustomProperties(this.dataSourceProperties, this.customKafkaProperties);
}

public static void analyzeKafkaPartitionProperty(String kafkaPartitionsString,
Map<String, String> customKafkaProperties,
List<Pair<Integer, Long>> kafkaPartitionOffsets)
throws AnalysisException {
kafkaPartitionsString = kafkaPartitionsString.replaceAll(" ", "");
if (kafkaPartitionsString.isEmpty()) {
throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY + " could not be a empty string");
}
String[] kafkaPartionsStringList = kafkaPartitionsString.split(",");
for (String s : kafkaPartionsStringList) {

// get kafka default offset if set
Long kafkaDefaultOffset = null;
if (customKafkaProperties.containsKey(KAFKA_DEFAULT_OFFSETS)) {
kafkaDefaultOffset = getKafkaOffset(customKafkaProperties.get(KAFKA_DEFAULT_OFFSETS));
}

String[] kafkaPartitionsStringList = kafkaPartitionsString.split(",");
for (String s : kafkaPartitionsStringList) {
try {
kafkaPartitionOffsets.add(Pair.create(getIntegerValueFromString(s, KAFKA_PARTITIONS_PROPERTY),
KafkaProgress.OFFSET_END_VAL));
kafkaPartitionOffsets.add(
Pair.create(getIntegerValueFromString(s, KAFKA_PARTITIONS_PROPERTY),
kafkaDefaultOffset == null ? KafkaProgress.OFFSET_END_VAL : kafkaDefaultOffset));
} catch (AnalysisException e) {
throw new AnalysisException(KAFKA_PARTITIONS_PROPERTY
+ " must be a number string with comma-separated");
Expand All @@ -475,25 +485,31 @@ public static void analyzeKafkaOffsetProperty(String kafkaOffsetsString,
}

for (int i = 0; i < kafkaOffsetsStringList.length; i++) {
// defined in librdkafka/rdkafkacpp.h
// OFFSET_BEGINNING: -2
// OFFSET_END: -1
try {
kafkaPartitionOffsets.get(i).second = getLongValueFromString(kafkaOffsetsStringList[i],
KAFKA_OFFSETS_PROPERTY);
if (kafkaPartitionOffsets.get(i).second < 0) {
throw new AnalysisException("Can not specify offset smaller than 0");
}
} catch (AnalysisException e) {
if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_BEGINNING_VAL;
} else if (kafkaOffsetsStringList[i].equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
kafkaPartitionOffsets.get(i).second = KafkaProgress.OFFSET_END_VAL;
} else {
throw e;
}
kafkaPartitionOffsets.get(i).second = getKafkaOffset(kafkaOffsetsStringList[i]);
}
}

// Get kafka offset from string
// defined in librdkafka/rdkafkacpp.h
// OFFSET_BEGINNING: -2
// OFFSET_END: -1
public static long getKafkaOffset(String offsetStr) throws AnalysisException {
long offset = -1;
try {
offset = getLongValueFromString(offsetStr, "kafka offset");
if (offset < 0) {
throw new AnalysisException("Can not specify offset smaller than 0");
}
} catch (AnalysisException e) {
if (offsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
offset = KafkaProgress.OFFSET_BEGINNING_VAL;
} else if (offsetStr.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
offset = KafkaProgress.OFFSET_END_VAL;
} else {
throw e;
}
}
return offset;
}

public static void analyzeCustomProperties(Map<String, String> dataSourceProperties,
Expand All @@ -510,6 +526,11 @@ public static void analyzeCustomProperties(Map<String, String> dataSourcePropert
}
// can be extended in the future which other prefix
}

// check kafka_default_offsets
if (customKafkaProperties.containsKey(KAFKA_DEFAULT_OFFSETS)) {
getKafkaOffset(customKafkaProperties.get(KAFKA_DEFAULT_OFFSETS));
}
}

private static int getIntegerValueFromString(String valueString, String propertyName) throws AnalysisException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ private void checkKafkaProperties() throws AnalysisException {
// check partitions
final String kafkaPartitionsString = properties.get(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY);
if (kafkaPartitionsString != null) {

if (!properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) {
throw new AnalysisException("Partition and offset must be specified at the same time");
}

CreateRoutineLoadStmt.analyzeKafkaPartitionProperty(kafkaPartitionsString, kafkaPartitionOffsets);
CreateRoutineLoadStmt.analyzeKafkaPartitionProperty(kafkaPartitionsString, Maps.newHashMap(),
kafkaPartitionOffsets);
} else {
if (properties.containsKey(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY)) {
throw new AnalysisException("Missing kafka partition info");
Expand All @@ -135,7 +135,7 @@ public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("type: ").append(type);
sb.append(", kafka partition offsets: ").append(kafkaPartitionOffsets);
sb.append(", custome properties: ").append(customKafkaProperties);
sb.append(", custom properties: ").append(customKafkaProperties);
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.starrocks.catalog.Catalog;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.Table;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.ErrorCode;
Expand Down Expand Up @@ -82,8 +83,8 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
private List<Integer> customKafkaPartitions = Lists.newArrayList();
// current kafka partitions is the actually partition which will be fetched
private List<Integer> currentKafkaPartitions = Lists.newArrayList();
// optional, user want to set default offset when new partiton add or offset not set.
private String kafkaDefaultOffSet = "";
// optional, user want to set default offset when new partition add or offset not set.
private Long kafkaDefaultOffSet = null;
// kafka properties, property prefix will be mapped to kafka custom parameters, which can be extended in the future
private Map<String, String> customProperties = Maps.newHashMap();
private Map<String, String> convertedCustomProperties = Maps.newHashMap();
Expand Down Expand Up @@ -148,7 +149,12 @@ private void convertCustomProperties(boolean rebuild) throws DdlException {
}
}
if (convertedCustomProperties.containsKey(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS)) {
kafkaDefaultOffSet = convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS);
try {
kafkaDefaultOffSet = CreateRoutineLoadStmt.getKafkaOffset(
convertedCustomProperties.remove(CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS));
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
}
}

Expand Down Expand Up @@ -417,16 +423,7 @@ private void updateNewPartitionProgress() {
for (Integer kafkaPartition : currentKafkaPartitions) {
if (!((KafkaProgress) progress).containsPartition(kafkaPartition)) {
// if offset is not assigned, start from OFFSET_END
long beginOffSet = KafkaProgress.OFFSET_END_VAL;
if (!kafkaDefaultOffSet.isEmpty()) {
if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_BEGINNING)) {
beginOffSet = KafkaProgress.OFFSET_BEGINNING_VAL;
} else if (kafkaDefaultOffSet.equalsIgnoreCase(KafkaProgress.OFFSET_END)) {
beginOffSet = KafkaProgress.OFFSET_END_VAL;
} else {
beginOffSet = KafkaProgress.OFFSET_END_VAL;
}
}
long beginOffSet = kafkaDefaultOffSet == null ? KafkaProgress.OFFSET_END_VAL : kafkaDefaultOffSet;
((KafkaProgress) progress).addPartitionOffset(Pair.create(kafkaPartition, beginOffSet));
if (LOG.isDebugEnabled()) {
LOG.debug(new LogBuilder(LogKey.ROUTINE_LOAD_JOB, id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.load.routineload.KafkaProgress;
import com.starrocks.load.routineload.LoadDataSourceType;
import mockit.Injectable;
import mockit.Mock;
Expand Down Expand Up @@ -141,4 +143,95 @@ public void analyze(Analyzer analyzer1) {
Assert.assertEquals("+08:00", createRoutineLoadStmt.getTimezone());
}

@Test
public void testKafkaOffset(@Injectable Analyzer analyzer) throws UserException {
new MockUp<StatementBase>() {
@Mock
public void analyze(Analyzer analyzer1) {
return;
}
};

String jobName = "job1";
String dbName = "db1";
String tableNameString = "table1";
String kafkaDefaultOffsetsKey = "property." + CreateRoutineLoadStmt.KAFKA_DEFAULT_OFFSETS;

// load property
List<String> partitionNameString = Lists.newArrayList();
partitionNameString.add("p1");
PartitionNames partitionNames = new PartitionNames(false, partitionNameString);
ColumnSeparator columnSeparator = new ColumnSeparator(",");
List<ParseNode> loadPropertyList = new ArrayList<>();
loadPropertyList.add(columnSeparator);
loadPropertyList.add(partitionNames);

// 1. kafka_offsets
// 1 -> OFFSET_BEGINNING, 2 -> OFFSET_END
Map<String, String> customProperties = getCustomProperties();
customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2");
customProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "OFFSET_BEGINNING,OFFSET_END");
LabelName labelName = new LabelName(dbName, jobName);
CreateRoutineLoadStmt createRoutineLoadStmt = new CreateRoutineLoadStmt(
labelName, tableNameString, loadPropertyList, Maps.newHashMap(),
LoadDataSourceType.KAFKA.name(), customProperties);
createRoutineLoadStmt.analyze(analyzer);
List<Pair<Integer, Long>> partitionOffsets = createRoutineLoadStmt.getKafkaPartitionOffsets();
Assert.assertEquals(2, partitionOffsets.size());
Assert.assertEquals(KafkaProgress.OFFSET_BEGINNING_VAL, (long) partitionOffsets.get(0).second);
Assert.assertEquals(KafkaProgress.OFFSET_END_VAL, (long) partitionOffsets.get(1).second);

// 2. no kafka_offsets and property.kafka_default_offsets
// 1,2 -> OFFSET_END
customProperties = getCustomProperties();
customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2");
labelName = new LabelName(dbName, jobName);
createRoutineLoadStmt =
new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, Maps.newHashMap(),
LoadDataSourceType.KAFKA.name(), customProperties);
createRoutineLoadStmt.analyze(analyzer);
partitionOffsets = createRoutineLoadStmt.getKafkaPartitionOffsets();
Assert.assertEquals(2, partitionOffsets.size());
Assert.assertEquals(KafkaProgress.OFFSET_END_VAL, (long) partitionOffsets.get(0).second);
Assert.assertEquals(KafkaProgress.OFFSET_END_VAL, (long) partitionOffsets.get(1).second);

// 3. property.kafka_default_offsets
// 1,2 -> 10
customProperties = getCustomProperties();
customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2");
customProperties.put(kafkaDefaultOffsetsKey, "10");
labelName = new LabelName(dbName, jobName);
createRoutineLoadStmt =
new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, Maps.newHashMap(),
LoadDataSourceType.KAFKA.name(), customProperties);
createRoutineLoadStmt.analyze(analyzer);
partitionOffsets = createRoutineLoadStmt.getKafkaPartitionOffsets();
Assert.assertEquals(2, partitionOffsets.size());
Assert.assertEquals(10, (long) partitionOffsets.get(0).second);
Assert.assertEquals(10, (long) partitionOffsets.get(1).second);

// 4. both kafka_offsets and property.kafka_default_offsets
// 1 -> OFFSET_BEGINNING, 2 -> OFFSET_END, 3 -> 11
customProperties = getCustomProperties();
customProperties.put(CreateRoutineLoadStmt.KAFKA_PARTITIONS_PROPERTY, "1,2,3");
customProperties.put(CreateRoutineLoadStmt.KAFKA_OFFSETS_PROPERTY, "OFFSET_BEGINNING,OFFSET_END,11");
customProperties.put(kafkaDefaultOffsetsKey, "10");
labelName = new LabelName(dbName, jobName);
createRoutineLoadStmt =
new CreateRoutineLoadStmt(labelName, tableNameString, loadPropertyList, Maps.newHashMap(),
LoadDataSourceType.KAFKA.name(), customProperties);
createRoutineLoadStmt.analyze(analyzer);
partitionOffsets = createRoutineLoadStmt.getKafkaPartitionOffsets();
Assert.assertEquals(3, partitionOffsets.size());
Assert.assertEquals(KafkaProgress.OFFSET_BEGINNING_VAL, (long) partitionOffsets.get(0).second);
Assert.assertEquals(KafkaProgress.OFFSET_END_VAL, (long) partitionOffsets.get(1).second);
Assert.assertEquals(11, (long) partitionOffsets.get(2).second);
}

private Map<String, String> getCustomProperties() {
Map<String, String> customProperties = Maps.newHashMap();
customProperties.put(CreateRoutineLoadStmt.KAFKA_TOPIC_PROPERTY, "topic1");
customProperties.put(CreateRoutineLoadStmt.KAFKA_BROKER_LIST_PROPERTY, "127.0.0.1:8080");
return customProperties;
}
}

0 comments on commit 6322617

Please sign in to comment.