-
Notifications
You must be signed in to change notification settings - Fork 79
Open
Description
I noticed that new batches are triggered even if all shards are empty and avoidEmptyBatches is set to true. This leads to ProvisionedThroughputExceededExceptions (I don't understand why at the moment). I thought this issue would resolve the problem. But it persists even after upgrading the kinsis-sql library to the newest version.
As a workaround, we thought avoidEmptyBatches would mitigate this problem by avoiding to much API calls if no data is available in the Kinesis stream.
But the exceptions still come up and we can see in the logs that new batches are triggered, always for the same sequence number, even if there is no new data at all:
1/01/08 08:19:31 INFO KinesisSource: Purging Committed Entries. ThresholdBatchId = 43986
21/01/08 08:19:31 INFO KinesisSource: End Offset is {"metadata":{"streamName":"viewprogress","batchId":"44086"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49613635859865357079366475818029505419698299042659303426"}}
21/01/08 08:19:31 INFO KinesisSource: Processing 1 shards from Stream(ShardInfo(shardId-000000000000,AFTER_SEQUENCE_NUMBER,49613635859865357079366475818029505419698299042659303426))
21/01/08 08:19:31 INFO KinesisSource: GetBatch generating RDD of offset range: ShardInfo(shardId-000000000000,AFTER_SEQUENCE_NUMBER,49613635859865357079366475818029505419698299042659303426)
This is the code we use to create the stream:
watchlist_events_stream = (
spark.readStream.format("kinesis")
.option("streamName", KINESIS_STREAM_NAME)
.option("endpointUrl", KINESIS_ENDPOINT_URL)
.option(
"awsstsrolearn",
f"{ASSUME_ROLE_ARN_PREFIX}{ACCOUNT_ID_DICT[args.stage]}{ASSUME_ROLE_ARN_SUFFIX}",
)
.option("awsstssessionname", SESSION_NAME)
.option("kinesis.client.avoidEmptyBatches", "true")
.load()
)
Any idea what could go wrong here?
Metadata
Metadata
Assignees
Labels
No labels