-
Notifications
You must be signed in to change notification settings - Fork 570
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Realtime Inference Writes #558
Conversation
…ve them. Writing occurs in a separate thread so inference does not stall.
OPE-320 FR: Update `lema.infer` to support batch inference
|
@@ -184,6 +185,11 @@ async def _query_api( | |||
response_json, conversation | |||
) | |||
await asyncio.sleep(remote_params.politeness_policy) | |||
if generation_config.output_filepath: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can write to the file before sleeping here.
One additional note: this approach will not preserve ordering as each request may complete at different times. Likely OK for now since we're writing the entire conversation (including the input), but something we should keep in mind as a potential fix for later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jgreer013 it would be good to open an issue and log a comment with OPE number. I think saving the results out of order will cause a lot of confusion so it would be good to fix it.
@taenin even if we log the conversation, it's not trivial to build a unique id from the conversation content + join with any metadata the user has. It's a decently big burden on the user
conversation: A single conversation to save. | ||
output_filepath: The filepath to where the conversation should be saved. | ||
""" | ||
return self._save_conversation(conversation, output_filepath) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(not an expert on this J) Found this function asyncio.to_thread()
https://docs.python.org/3/library/asyncio-task.html#running-in-threads
which can be used to make IO-bound functions non-blocking.
It's only available in Python 3.9+ though, while we also support 3.8. There exist workarounds like this: https://github.com/playht/pyht/blob/c8cc319d9d6df818154f1337fa64e0c8385aea9f/pyht/async_client.py#L33
Also, I don't have context on the importance of supporting v3.8 in oumi
and whether it can be dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually tried using asyncio.create_task
, which is hypothetically what we'd want, but I kept running into errors when trying to run it in non-async function calls (mentioning that no event loop was running).
Then when I tried creating my own event loop, I ran into other issues related to the parent caller being on a different loop from the child task (even when I explicitly tell it to be on the same one).
While I believe there's a way to do it, I suspect it'll wind up violating best practices/be an anti-pattern compared to simply making all the engines async.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any luck with wrapping _save_conversation()
into asyncio.to_thread(...)
from _async
method ?asyncio.to_thread(_save_conversation, conversation, output_filepath)
@@ -68,22 +68,30 @@ def _read_conversations(self, input_filepath: str) -> List[Conversation]: | |||
conversations.append(conversation) | |||
return conversations | |||
|
|||
def _save_conversations( | |||
self, conversations: List[Conversation], output_filepath: str | |||
def _save_conversation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about moving all heavy-lifting into _save_conversation_async()
(see https://pypi.org/project/aiofiles/ ) ? and then you could invoke async
function from sync
one https://stackoverflow.com/questions/55647753/call-async-function-from-sync-function-while-the-synchronous-function-continues
Related example: https://github.com/fgka/py-yaas/blob/c1edc33cbf2258f7d5ba3bd0cde9797480fa09cd/code/core/src/yaas_caching/file.py#L275
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can actually be problematic - asyncio.run
should only be used at the top level, and nested calls wind up throwing errors. Embedding this into the class winds up breaking things higher up the hierarchy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should probably wrap top-level entry points (e.g., main
functions) in asyncio.run
https://docs.python.org/3/library/asyncio-task.html#coroutines
@taenin WDYT ? (you're the most familiar with the inference implementation ) This is probably out-of-scope for this PR but would be good to set it up properly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to give this some thought over the weekend
for conversation in conversations: | ||
json_obj = conversation.model_dump() | ||
writer.write(json_obj) | ||
with jsonlines.open(output_filepath, mode="a") as writer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can multiple threads use this function? How do you prevent them from concurrently writing to this file?
for conversation in conversations: | ||
json_obj = conversation.model_dump() | ||
writer.write(json_obj) | ||
with jsonlines.open(output_filepath, mode="a") as writer: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opening the file + seek to the end for every conversation is not ideal...
@@ -184,6 +185,11 @@ async def _query_api( | |||
response_json, conversation | |||
) | |||
await asyncio.sleep(remote_params.politeness_policy) | |||
if generation_config.output_filepath: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jgreer013 it would be good to open an issue and log a comment with OPE number. I think saving the results out of order will cause a lot of confusion so it would be good to fix it.
@taenin even if we log the conversation, it's not trivial to build a unique id from the conversation content + join with any metadata the user has. It's a decently big burden on the user
This PR is based on #558 This PR updates how writes are done for inference. - If all requests are successful, the final written file will have all responses in the same order as the provided input for all engines. - During inference, all requests are written to a `/scratch` directory containing a file with the same name. There is no guarantee of order on the values written to this file (this truly only matters for the InferenceEngines that leverage parallelism). Non-parallel engines will write to disk in-line (blocking). I've benchmarked this for medium sized files (100s of MB): appending a line of text to these files takes on average 1.788e-05 seconds
Closing this as the related PR #574 is submitted |
Updated inference engines to write conversations to file once they have them. Writing occurs in a separate thread so inference does not stall.
Towards OPE-320