Skip to content

Commit 9659401

Browse files
author
gituser
committed
Merge branch '1.10_test_4.2.x' into 1.10_release_4.2.x
2 parents e4e7412 + a326319 commit 9659401

File tree

5 files changed

+5
-5
lines changed

5 files changed

+5
-5
lines changed

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/deserialization/DtKafkaDeserializationSchemaWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
6666
if (endPartition.contains(record.partition())) {
6767
return null;
6868
}
69-
if (specificEndOffsets != null) {
69+
if (specificEndOffsets != null && !specificEndOffsets.isEmpty()) {
7070
Long endOffset = specificEndOffsets.get(topicPartition);
7171
if (endOffset != null && record.offset() >= endOffset - 1) {
7272
endPartition.add(record.partition());

kafka/kafka-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exceptio
112112
MetricGroup consumerMetricGroup,
113113
boolean useMetrics) throws Exception {
114114

115-
final OffsetMap offsetMap = seekOffset(props, topic);
115+
final OffsetMap offsetMap = sampleSize > 0 ? seekOffset(props, topic) : new OffsetMap();
116116
Map<KafkaTopicPartition, Long> rebuild;
117117

118118
rebuild =

kafka09/kafka09-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer09.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public void run(SourceFunction.SourceContext<Row> sourceContext) throws Exceptio
113113
MetricGroup consumerMetricGroup,
114114
boolean useMetrics) throws Exception {
115115

116-
final OffsetMap offsetMap = seekOffset(props, topic);
116+
final OffsetMap offsetMap = sampleSize > 0 ? seekOffset(props, topic) : new OffsetMap();
117117
Map<KafkaTopicPartition, Long> rebuild;
118118

119119
rebuild =

kafka10/kafka10-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer010.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void run(SourceContext<Row> sourceContext) throws Exception {
111111
OffsetCommitMode offsetCommitMode,
112112
MetricGroup consumerMetricGroup,
113113
boolean useMetrics) throws Exception {
114-
final OffsetMap offsetMap = seekOffset(props, topic);
114+
final OffsetMap offsetMap = sampleSize > 0 ? seekOffset(props, topic) : new OffsetMap();
115115
Map<KafkaTopicPartition, Long> rebuild;
116116

117117
rebuild =

kafka11/kafka11-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaConsumer011.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void run(SourceContext<Row> sourceContext) throws Exception {
111111
OffsetCommitMode offsetCommitMode,
112112
MetricGroup consumerMetricGroup,
113113
boolean useMetrics) throws Exception {
114-
final OffsetMap offsetMap = seekOffset(props, topic);
114+
final OffsetMap offsetMap = sampleSize > 0 ? seekOffset(props, topic) : new OffsetMap();
115115
Map<KafkaTopicPartition, Long> rebuild;
116116

117117
rebuild =

0 commit comments

Comments
 (0)