-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
17243 rate limits part deux #27
base: master
Are you sure you want to change the base?
Conversation
This demanded some pretty major surgery. Superficially, some methods are moved from DataSet to TweetFetcher so tests and other things using that interface need to be updated. But more critically, it now makes sense to stage ALL the tweet data in Elasticsearch. Originally, we kept Tweet objects on DataSets as they kept in and did metadata extraction on those as part of the data set pipeline process. However, we're now fetching tweets one user at a time -- and those times may be pretty separated, since TweetFetcher will ultimately be a delayed job subject to Twitter rate limits. This means that tweets can't be accumulated on DataSet, but they ARE all available in Elasticsearch. Therefore it makes sense to reuse our existing extractor features which rehydrate tweets from Elasticsearch, and update it to also be used in the case where only one DataSet is fed into an extractor. This results in a small change to the Extractor API (they are now initialized with one or more DataSets instead of with Tweets). On the bright side, when we improve extractor performance on Elasticsearch, that work should automatically apply to all cases. Next up, providing for delayed jobs and rate limiting!
This sets the backoff number for a TweetFetcher based on how much is in the queue right now. This will in turn allow us to schedule jobs in a way that respect rate limits. Add an index to TweetFetcher to make the select count fast.
At heart, this: * Installs DelayedJob * Adds a TweetFetchingJob * Delegates tweet-fetching to said job * Moves logic from TweetFetcher to TweetFetchingJob as needed * Makes sure the existing tests pass. In addition, during code review, it came up that it would be good to have TweetFetcher take all user IDs related to a data set and be able to delegate to multiple potential tweet-fetching back ends, while presenting a stable interface to DataSet. The current TweetFetcher#ingest can do that without changing DataSet or TweetFetchingJob, e.g. with the addition of a parameter that defaults to TweetFetchingJob, or other logic to determine fetcher type (e.g. TweetFetcher checks an environment variable to choose a handler, and the internals of #ingest are rewritten accordingly).
This really should be using the native delayed_job retry functions and the #error hook, but I just can't get those to work; exceptions I throw in spec don't get caught by #error.
I'm testing this out on the dev server with your real cohort data and I've found two bugs, one of which I'll deal with, one of which is inside your scope. Mine: sometimes it double-processes the same user id, leading to errors with Yours: with large data sets the number of tweets returned by Elasticsearch is too large to process with the current logic (exceeds the scroll window) -- this should be addressed as part of your efficiency work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a few thoughts. Much cleaner, but I got confused in a few places I think.
# In addition, since this looks at all enqueued TweetFetchers and not just | ||
# those belonging to a given DataSet, when we are fetching data for multiple | ||
# DataSets at once, they will cooperate to avoid hitting the window. | ||
[2*window - (2.0**(-2*enqueued/limit))*4*window, 0].max |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this assuming that the queue will only consist of TweetFetchingJobs? It's conceivable we'll use this infrastructure for other things in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't traced through how this works fully, so this might be moot, but depending solely on the number of jobs in the queue frightens me a little. Is it possible jobs can end up waiting longer because there are jobs after them in the queue?
You're doing as exponential rather than linear so it's faster for small cohorts, correct? I think that's beneficial as long as the size of cohorts themselves is something like an Exponential. But most of the cohorts we'll be dealing with are n=1000, and an expo backoff is unboundedly bad relative to maximal throughput as cohorts grow to infinity. This might all be irrelevant given the limits, but how long will we end up waiting between i=997, 998, and 999? I think this expo needs to be capped or made linear or we store the n for the last RATE_LIMIT_WINDOW somewhere and have all requests call a throttle
function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is assuming that the queue will consist only of TweetFetchingJobs. We can enforce that if we need to in future by updating `self.enqueued. I'll comment it. I am not inclined to prematurely optimize around job types that don't exist.
The exponential makes it fast for small cohorts, yes, and fairly fast for medium-sized ones, while stretching it out for long ones. In the 997-999 range, we're waiting 1-2 seconds between jobs.
It builds in a cap of about twice the rate-limiting window (here, about 30 minutes). But I actually think capping is a problem, now that I think about it -- if we had, say, a 10K cohort, we'd end up stacking most of that cohort at almost 30 minutes out, and then we'd end up rate-limiting there.
I've just convinced myself we can't have all 3 of [cap on length of time, rate limit avoidance, arbitrary cohort sizes]. Right now we have the first two. I could do the second two instead, e.g. via a linear function (or better yet a relu), but I can't do all 3.
I can also periodically check the rate-limit-status endpoint and use data from there to reschedule everything, but I couldn't figure out a safe way to do that.
# Stop execution of jobs; restart it after the rate limiting window has | ||
# elapsed. | ||
stop_cmd = "RAILS_ENV=#{Rails.env} #{Rails.root}/bin/delayed_job stop" | ||
start_cmd = "RAILS_ENV=#{Rails.env} #{Rails.configuration.delayed_job_command} | at now + #{Rails.configuration.rate_limit_window} minutes" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is more complex and less portable than modifying the run_at
s in the delayed_jobs
table. Does it have advantages?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modifying run_at
won't affect workers which have already reserved jobs (they can reserve multiple jobs at a time, and they check the value of run_at
at the time of reservation).
That said if this is a graceful exit the workers are going to finish their queue before stopping, so I should check that behavior.
(Honestly the whole rate-limiting handling has been very challenging -- Twitter says "just check the endpoint periodically" but I couldn't think of a strategy that seemed sound for that, and even when I do that I need to stop everything once we hit the rate-limiting window anyway -- so if you have better ideas I am happy to hear them.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spit balling:
The fastest way to fulfill an arbitrary number of requests is to immediately use up all allowed requests in this 15 minute window, wait for window reset, and then immediately use up all allowed requests in the next 15 minute window, etc. To me, that suggests a single queue that maintains a running tally and that can be paused and resumed at any moment rather than maintaining a schedule. The architecture that falls out of that is a single, long running coordinator process that handles throttling the requests that get submitted to it. This can be a single worker running off a single queue. That worker doesn't actually run the requests. Instead, it creates request jobs that run out of a different queue on a different set of workers. If there are no requests left in this window's budget, the coordinator process sleeps until the window resets. Requests can still happen in parallel, but no request jobs get created if there is no budget for them.
I dunno. Does that sound reasonable? Things are way easier for me to reason about if there's a single choke point to handle throttling than trying to make decentralized workers coordinate.
This answers the feedback from #25 (which I'm going to close in favor of this PR) and completes the background processing, thus fully addressing the scope of https://cyber.harvard.edu/projectmanagement/issues/17243 .