-
Notifications
You must be signed in to change notification settings - Fork 80
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ProvisionedThroughputExceededException and new batches are triggered even if avoidEmptyBatches=true #97
Comments
@itsvikramagr Sorry for pushing this, but we use this connector in production and need some advice on how to avoid those LimitExceeded errors on "empty" streams. After some retries, our jobs break and the only thing we can do at the moment is to restart them. Do you have any idea why we get all those errors and how to fix it? And why |
@juliankeppel - I would have to look into the complete logs to give a better suggestions. When do you see "ProvisionedThroughputExceededExceptions"? Most likely kinesis is throttling us when we make getRecords call in the executors. does it happen when there are non empty streams? Are there multiple readers to the same kinesis streams?
In your streaming job, what is your trigger interval? avoidEmptyBatches has no effect only when there is some resharding or we have reached shardEnd in one of the shards in the prrevious microbatch. (https://github.com/qubole/kinesis-sql/blob/master/src/main/scala/org/apache/spark/sql/kinesis/KinesisSource.scala#L178) which might not be true in your case. Can you check if you are using the latest connector. |
@itsvikramagr It happens only for empty streams. There is one more consumer but as it happens only for empty streams, this might not be a problem of too many read requests. We have different jobs, consuming different streams, with different trigger intervalls between 5 seconds and 15 minutes. The problem is the same for all of them, when the source streams are empty. There is no resharding going on, we have stable amount of shards all the time. I think we have reached shardEnd in this case. But why doesn't I don't understand why the connector is fireing so many read requests when it has reached shardEnd and there are no new records coming in currently. And why the LimitExceeded only appears in this situations and not when there are records in the stream (which is the normal case). |
Due to an existing logic with ShardEnd filtering, even with 'avoidEmptyBatches' 1 empty micro-batch can be created. In that micro-batch, shards which have reached the end are filtered out for further processing. We can definitely fix the logic but it can be a breaking change from existing behavior (of filtering shardEnd). So I had avoided that.
We have to look into the logs to understand that. My guess is that we are making too many getRecords call too frequently. Logs will have more details. |
I attached a log from one of our jobs. I realized that I misunderstood what But I restarted the job due to some experiments, and now I don't see any of the ReadLimitExceeded errors. I can't explain why, because I changed nothing in the code or configuration. Anyway, the problem with |
I can see empty batches are created. There is a check to see if there are new records before we start a new batch. The check is always returning true. You can find following lines in your log.
https://github.com/qubole/kinesis-sql/blob/master/src/main/scala/org/apache/spark/sql/kinesis/KinesisSource.scala#L131 seems to be reason for it. I think that even though there are no new records, ProvisionedThroughputExceededException might be the side-effect of creating empty batches. In the executors, we are making multiple getRecords call to read data. We can add delays before we make getRecords request by using following config. kinesis.executor.addIdleTimeBetweenReads = true |
I noticed that new batches are triggered even if all shards are empty and
avoidEmptyBatches
is set to true. This leads toProvisionedThroughputExceededException
s (I don't understand why at the moment). I thought this issue would resolve the problem. But it persists even after upgrading the kinsis-sql library to the newest version.As a workaround, we thought
avoidEmptyBatches
would mitigate this problem by avoiding to much API calls if no data is available in the Kinesis stream.But the exceptions still come up and we can see in the logs that new batches are triggered, always for the same sequence number, even if there is no new data at all:
This is the code we use to create the stream:
Any idea what could go wrong here?
The text was updated successfully, but these errors were encountered: