From 945c9f76ec928afe5237a880a0239d4e12a74b74 Mon Sep 17 00:00:00 2001 From: lightzhao <40714172+lightzhao@users.noreply.github.com> Date: Wed, 28 Dec 2022 14:43:01 +0800 Subject: [PATCH] [Bug][KafkaSource]Failed to parse offset format (#3810) * When the kafka source is started in offset mode, when the topicName has a "-", the exception will be resolved, causing the assignment to fail * fix code style. * update change-log Co-authored-by: zhaoliang01 --- docs/en/connector-v2/source/kafka.md | 1 + .../seatunnel/kafka/source/KafkaSource.java | 6 ++-- .../seatunnel/kafka/KafkaStartOffsetTest.java | 36 +++++++++++++++++++ 3 files changed, 41 insertions(+), 2 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/KafkaStartOffsetTest.java diff --git a/docs/en/connector-v2/source/kafka.md b/docs/en/connector-v2/source/kafka.md index 96af5808101d..61b96936d861 100644 --- a/docs/en/connector-v2/source/kafka.md +++ b/docs/en/connector-v2/source/kafka.md @@ -210,3 +210,4 @@ source { - [Improve] Support setting read starting offset or time at startup config ([3157](https://github.com/apache/incubator-seatunnel/pull/3157)) - [Improve] Support for dynamic discover topic & partition in streaming mode ([3125](https://github.com/apache/incubator-seatunnel/pull/3125)) +- [Bug] Fixed the problem that parsing the offset format failed when the startup mode was offset([3810](https://github.com/apache/incubator-seatunnel/pull/3810)) diff --git a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java index 3cc2a998bb20..4783d2893bb7 100644 --- a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java +++ b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java @@ -137,9 +137,11 @@ public void prepare(Config config) throws PrepareFailException { Map specificStartOffsets = new HashMap<>(); ObjectNode jsonNodes = JsonUtils.parseObject(offsetsJson); jsonNodes.fieldNames().forEachRemaining(key -> { - String[] topicAndPartition = key.split("-"); + int splitIndex = key.lastIndexOf("-"); + String topic = key.substring(0, splitIndex); + String partition = key.substring(splitIndex + 1); long offset = jsonNodes.get(key).asLong(); - TopicPartition topicPartition = new TopicPartition(topicAndPartition[0], Integer.valueOf(topicAndPartition[1])); + TopicPartition topicPartition = new TopicPartition(topic, Integer.valueOf(partition)); specificStartOffsets.put(topicPartition, offset); }); this.metadata.setSpecificStartOffsets(specificStartOffsets); diff --git a/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/KafkaStartOffsetTest.java b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/KafkaStartOffsetTest.java new file mode 100644 index 000000000000..9585a5ab330b --- /dev/null +++ b/seatunnel-connectors-v2/connector-kafka/src/test/java/org/apache/seatunnel/connectors/seatunnel/kafka/KafkaStartOffsetTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.kafka; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class KafkaStartOffsetTest { + + @Test + void getTopicNameAndPartition(){ + String topicName = "my-topic-test"; + int partIndex = 1; + String key = "my-topic-test-1"; + int splitIndex = key.lastIndexOf("-"); + String topic = key.substring(0, splitIndex); + String partition = key.substring(splitIndex + 1); + Assertions.assertEquals(topic, topicName); + Assertions.assertEquals(Integer.valueOf(partition), partIndex); + } +}