Open
Description
I noticed that new batches are triggered even if all shards are empty and avoidEmptyBatches
is set to true. This leads to ProvisionedThroughputExceededException
s (I don't understand why at the moment). I thought this issue would resolve the problem. But it persists even after upgrading the kinsis-sql library to the newest version.
As a workaround, we thought avoidEmptyBatches
would mitigate this problem by avoiding to much API calls if no data is available in the Kinesis stream.
But the exceptions still come up and we can see in the logs that new batches are triggered, always for the same sequence number, even if there is no new data at all:
1/01/08 08:19:31 INFO KinesisSource: Purging Committed Entries. ThresholdBatchId = 43986
21/01/08 08:19:31 INFO KinesisSource: End Offset is {"metadata":{"streamName":"viewprogress","batchId":"44086"},"shardId-000000000000":{"iteratorType":"AFTER_SEQUENCE_NUMBER","iteratorPosition":"49613635859865357079366475818029505419698299042659303426"}}
21/01/08 08:19:31 INFO KinesisSource: Processing 1 shards from Stream(ShardInfo(shardId-000000000000,AFTER_SEQUENCE_NUMBER,49613635859865357079366475818029505419698299042659303426))
21/01/08 08:19:31 INFO KinesisSource: GetBatch generating RDD of offset range: ShardInfo(shardId-000000000000,AFTER_SEQUENCE_NUMBER,49613635859865357079366475818029505419698299042659303426)
This is the code we use to create the stream:
watchlist_events_stream = (
spark.readStream.format("kinesis")
.option("streamName", KINESIS_STREAM_NAME)
.option("endpointUrl", KINESIS_ENDPOINT_URL)
.option(
"awsstsrolearn",
f"{ASSUME_ROLE_ARN_PREFIX}{ACCOUNT_ID_DICT[args.stage]}{ASSUME_ROLE_ARN_SUFFIX}",
)
.option("awsstssessionname", SESSION_NAME)
.option("kinesis.client.avoidEmptyBatches", "true")
.load()
)
Any idea what could go wrong here?
Metadata
Metadata
Assignees
Labels
No labels