Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@
-->
<Bug code="EI,EI2" />

<!--
Dead local stores are generated by javac and also are used as a way to attach @SuppressWarnings
to a single statement instead of a class or method. Other analyzers catch dead code
and respect @SuppressWarnings("unused") so we do not need this.
-->
<Bug code="DLS" />

<!--
Beam DoFns are invoked via reflection by looking at the annotations. To spotbugs, these methods
seem to be uncallable because they are on anonymous classes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,11 @@ public BoundedReadFromUnboundedSource<T> withMaxNumRecords(long maxNumRecords) {
* Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount of data from
* the given {@link UnboundedSource}. The bound is specified as an amount of time to read for.
* Each split of the source will read for this much time.
*
* @param maxReadTime upper bound of how long to read from the unbounded source; disabled if
* null
*/
public BoundedReadFromUnboundedSource<T> withMaxReadTime(Duration maxReadTime) {
public BoundedReadFromUnboundedSource<T> withMaxReadTime(@Nullable Duration maxReadTime) {
return new BoundedReadFromUnboundedSource<>(source, Long.MAX_VALUE, maxReadTime);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.beam.sdk.annotations.Experimental;
Expand All @@ -35,7 +36,6 @@
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.common.serialization.Deserializer;
import org.checkerframework.checker.nullness.qual.Nullable;

Expand All @@ -47,7 +47,6 @@
@Experimental(Kind.SOURCE_SINK)
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class ConfluentSchemaRegistryDeserializerProvider<T> implements DeserializerProvider<T> {
private final SerializableFunction<Void, SchemaRegistryClient> schemaRegistryClientProviderFn;
Expand Down Expand Up @@ -112,21 +111,23 @@ public static <T> ConfluentSchemaRegistryDeserializerProvider<T> of(
@Nullable Map<String, ?> schemaRegistryConfigs) {
return new ConfluentSchemaRegistryDeserializerProvider(
(SerializableFunction<Void, SchemaRegistryClient>)
input ->
new CachedSchemaRegistryClient(
schemaRegistryUrl, schemaRegistryCacheCapacity, schemaRegistryConfigs),
input -> {
@SuppressWarnings("nullness") // confluent library is not annnotated
CachedSchemaRegistryClient client =
new CachedSchemaRegistryClient(
schemaRegistryUrl, schemaRegistryCacheCapacity, schemaRegistryConfigs);
return client;
},
schemaRegistryUrl,
subject,
version);
}

@Override
public Deserializer<T> getDeserializer(Map<String, ?> configs, boolean isKey) {
ImmutableMap<String, Object> csrConfig =
ImmutableMap.<String, Object>builder()
.putAll(configs)
.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl)
.build();
@SuppressWarnings("unchecked")
Map<String, Object> csrConfig = new HashMap<>((Map<String, Object>) configs);
csrConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
Deserializer<T> deserializer =
(Deserializer<T>)
new ConfluentSchemaRegistryDeserializer(getSchemaRegistryClient(), getAvroSchema());
Expand Down Expand Up @@ -160,7 +161,6 @@ private SchemaRegistryClient getSchemaRegistryClient() {

@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
class ConfluentSchemaRegistryDeserializer extends KafkaAvroDeserializer {
Schema readerSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,14 @@
* the latest offset consumed so far.
*/
@DefaultCoder(AvroCoder.class)
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class KafkaCheckpointMark implements UnboundedSource.CheckpointMark {

private List<PartitionMark> partitions;

@AvroIgnore
private Optional<KafkaUnboundedReader<?, ?>> reader; // Present when offsets need to be committed.

@SuppressWarnings("initialization") // Avro will set the fields by breaking abstraction
private KafkaCheckpointMark() {} // for Avro

public KafkaCheckpointMark(
Expand Down Expand Up @@ -80,6 +78,7 @@ public static class PartitionMark implements Serializable {
private long nextOffset;
private long watermarkMillis = MIN_WATERMARK_MILLIS;

@SuppressWarnings("initialization")
private PartitionMark() {} // for Avro

public PartitionMark(String topic, int partition, long offset, long watermarkMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@

/** A {@link PTransform} that commits offsets of {@link KafkaRecord}. */
@SuppressWarnings({
"nullness", // TODO(https://github.com/apache/beam/issues/20497)
"rawtypes" // TODO(https://github.com/apache/beam/issues/20447)
})
public class KafkaCommitOffset<K, V>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,12 @@
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.Preconditions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
Expand All @@ -78,6 +80,7 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.DateTimeUtils;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
Expand All @@ -90,7 +93,6 @@
*/
@SuppressWarnings({
"rawtypes", // TODO(https://github.com/apache/beam/issues/20447)
"nullness", // TODO(https://github.com/apache/beam/issues/20497)
// TODO(https://github.com/apache/beam/issues/21230): Remove when new version of
// errorprone is released (2.11.0)
"unused"
Expand Down Expand Up @@ -156,11 +158,12 @@ static void ensureEOSSupport() {

@Override
public PCollection<Void> expand(PCollection<ProducerRecord<K, V>> input) {
String topic = Preconditions.checkStateNotNull(spec.getTopic());

int numShards = spec.getNumShards();
if (numShards <= 0) {
try (Consumer<?, ?> consumer = openConsumer(spec)) {
numShards = consumer.partitionsFor(spec.getTopic()).size();
numShards = consumer.partitionsFor(topic).size();
LOG.info(
"Using {} shards for exactly-once writer, matching number of partitions "
+ "for topic '{}'",
Expand Down Expand Up @@ -299,8 +302,9 @@ public void processElement(
long nextId = MoreObjects.firstNonNull(nextIdState.read(), 0L);
long minBufferedId = MoreObjects.firstNonNull(minBufferedIdState.read(), Long.MAX_VALUE);

String sinkGroupId = Preconditions.checkStateNotNull(spec.getSinkGroupId());
ShardWriterCache<K, V> cache =
(ShardWriterCache<K, V>) CACHE_BY_GROUP_ID.getUnchecked(spec.getSinkGroupId());
(ShardWriterCache<K, V>) CACHE_BY_GROUP_ID.getUnchecked(sinkGroupId);
ShardWriter<K, V> writer = cache.removeIfPresent(shard);
if (writer == null) {
writer = initShardWriter(shard, writerIdState, nextId);
Expand Down Expand Up @@ -433,7 +437,7 @@ private static class ShardMetadata {
public final long sequenceId;

@JsonProperty("id")
public final String writerId;
public final @Nullable String writerId;

private ShardMetadata() { // for json deserializer
sequenceId = -1;
Expand Down Expand Up @@ -477,6 +481,7 @@ void beginTxn() {

Future<RecordMetadata> sendRecord(
TimestampedValue<ProducerRecord<K, V>> record, Counter sendCounter) {
String topic = Preconditions.checkStateNotNull(spec.getTopic());
try {
Long timestampMillis =
spec.getPublishTimestampFunction() != null
Expand All @@ -485,10 +490,11 @@ Future<RecordMetadata> sendRecord(
.getMillis()
: null;

@SuppressWarnings("nullness") // Kafka library not annotated
Future<RecordMetadata> result =
producer.send(
new ProducerRecord<>(
spec.getTopic(),
topic,
null,
timestampMillis,
record.getValue().key(),
Expand All @@ -502,6 +508,7 @@ Future<RecordMetadata> sendRecord(
}

void commitTxn(long lastRecordId, Counter numTransactions) throws IOException {
String topic = Preconditions.checkStateNotNull(spec.getTopic());
try {
// Store id in consumer group metadata for the partition.
// NOTE: Kafka keeps this metadata for 24 hours since the last update. This limits
Expand All @@ -510,7 +517,7 @@ void commitTxn(long lastRecordId, Counter numTransactions) throws IOException {
ProducerSpEL.sendOffsetsToTransaction(
producer,
ImmutableMap.of(
new TopicPartition(spec.getTopic(), shard),
new TopicPartition(topic, shard),
new OffsetAndMetadata(
0L,
JSON_MAPPER.writeValueAsString(new ShardMetadata(lastRecordId, writerId)))),
Expand All @@ -533,6 +540,7 @@ private ShardWriter<K, V> initShardWriter(

String producerName = String.format("producer_%d_for_%s", shard, spec.getSinkGroupId());
Producer<K, V> producer = initializeExactlyOnceProducer(spec, producerName);
String topic = Preconditions.checkStateNotNull(spec.getTopic());

// Fetch latest committed metadata for the partition (if any). Checks committed sequence ids.
try {
Expand All @@ -542,7 +550,7 @@ private ShardWriter<K, V> initShardWriter(
OffsetAndMetadata committed;

try (Consumer<?, ?> consumer = openConsumer(spec)) {
committed = consumer.committed(new TopicPartition(spec.getTopic(), shard));
committed = consumer.committed(new TopicPartition(topic, shard));
}

long committedSeqId = -1;
Expand Down Expand Up @@ -665,6 +673,7 @@ private static class ShardWriterCache<K, V> {
TimeUnit.MILLISECONDS);
}

@Nullable
ShardWriter<K, V> removeIfPresent(int shard) {
return cache.asMap().remove(shard);
}
Expand Down Expand Up @@ -693,33 +702,35 @@ void insert(int shard, ShardWriter<K, V> writer) {
* partitions for a topic rather than for fetching messages.
*/
private static Consumer<?, ?> openConsumer(WriteRecords<?, ?> spec) {
return spec.getConsumerFactoryFn()
.apply(
ImmutableMap.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
spec.getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
ConsumerConfig.GROUP_ID_CONFIG,
spec.getSinkGroupId(),
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class));
SerializableFunction<Map<String, Object>, ? extends Consumer<?, ?>> consumerFactoryFn =
Preconditions.checkArgumentNotNull(spec.getConsumerFactoryFn());

Map<String, Object> consumerConfig = new HashMap<>();
consumerConfig.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Preconditions.checkArgumentNotNull(
spec.getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)));
if (spec.getSinkGroupId() != null) {
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, spec.getSinkGroupId());
}
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);

return consumerFactoryFn.apply(consumerConfig);
}

private static <K, V> Producer<K, V> initializeExactlyOnceProducer(
WriteRecords<K, V> spec, String producerName) {

Map<String, Object> producerConfig = new HashMap<>(spec.getProducerConfig());
producerConfig.putAll(
ImmutableMap.of(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
spec.getKeySerializer(),
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
spec.getValueSerializer(),
ProducerSpEL.ENABLE_IDEMPOTENCE_CONFIG,
true,
ProducerSpEL.TRANSACTIONAL_ID_CONFIG,
producerName));
if (spec.getKeySerializer() != null) {
producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, spec.getKeySerializer());
}
if (spec.getValueSerializer() != null) {
producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, spec.getValueSerializer());
}
producerConfig.put(ProducerSpEL.ENABLE_IDEMPOTENCE_CONFIG, true);
producerConfig.put(ProducerSpEL.TRANSACTIONAL_ID_CONFIG, producerName);

Producer<K, V> producer =
spec.getProducerFactoryFn() != null
Expand Down
Loading