Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Realtime Inference Writes #558

Closed
wants to merge 7 commits into from
Closed

Conversation

jgreer013
Copy link
Contributor

@jgreer013 jgreer013 commented Sep 26, 2024

Updated inference engines to write conversations to file once they have them. Writing occurs in a separate thread so inference does not stall.

Towards OPE-320

…ve them. Writing occurs in a separate thread so inference does not stall.
Copy link

linear bot commented Sep 26, 2024

OPE-320 FR: Update `lema.infer` to support batch inference

  1. Read prompts from files
  2. Write responses to files

@jgreer013 jgreer013 changed the title Updated inference engines to write conversations to file once they have them. Writing occurs in a separate thread so inference does not stall. Realtime Async Inference Writes Sep 26, 2024
@jgreer013 jgreer013 requested a review from a team September 26, 2024 18:05
@@ -184,6 +185,11 @@ async def _query_api(
response_json, conversation
)
await asyncio.sleep(remote_params.politeness_policy)
if generation_config.output_filepath:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can write to the file before sleeping here.

One additional note: this approach will not preserve ordering as each request may complete at different times. Likely OK for now since we're writing the entire conversation (including the input), but something we should keep in mind as a potential fix for later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgreer013 it would be good to open an issue and log a comment with OPE number. I think saving the results out of order will cause a lot of confusion so it would be good to fix it.

@taenin even if we log the conversation, it's not trivial to build a unique id from the conversation content + join with any metadata the user has. It's a decently big burden on the user

@taenin taenin requested a review from oelachqar September 26, 2024 18:21
@jgreer013 jgreer013 requested a review from taenin September 26, 2024 21:24
@taenin taenin requested a review from nikg4 September 26, 2024 22:38
@jgreer013 jgreer013 changed the title Realtime Async Inference Writes Realtime Inference Writes Sep 27, 2024
conversation: A single conversation to save.
output_filepath: The filepath to where the conversation should be saved.
"""
return self._save_conversation(conversation, output_filepath)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(not an expert on this J) Found this function asyncio.to_thread() https://docs.python.org/3/library/asyncio-task.html#running-in-threads
which can be used to make IO-bound functions non-blocking.

It's only available in Python 3.9+ though, while we also support 3.8. There exist workarounds like this: https://github.com/playht/pyht/blob/c8cc319d9d6df818154f1337fa64e0c8385aea9f/pyht/async_client.py#L33

Also, I don't have context on the importance of supporting v3.8 in oumi and whether it can be dropped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually tried using asyncio.create_task, which is hypothetically what we'd want, but I kept running into errors when trying to run it in non-async function calls (mentioning that no event loop was running).

Then when I tried creating my own event loop, I ran into other issues related to the parent caller being on a different loop from the child task (even when I explicitly tell it to be on the same one).

While I believe there's a way to do it, I suspect it'll wind up violating best practices/be an anti-pattern compared to simply making all the engines async.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any luck with wrapping _save_conversation() into asyncio.to_thread(...) from _async method ?asyncio.to_thread(_save_conversation, conversation, output_filepath)

@@ -68,22 +68,30 @@ def _read_conversations(self, input_filepath: str) -> List[Conversation]:
conversations.append(conversation)
return conversations

def _save_conversations(
self, conversations: List[Conversation], output_filepath: str
def _save_conversation(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can actually be problematic - asyncio.run should only be used at the top level, and nested calls wind up throwing errors. Embedding this into the class winds up breaking things higher up the hierarchy.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should probably wrap top-level entry points (e.g., main functions) in asyncio.run https://docs.python.org/3/library/asyncio-task.html#coroutines

@taenin WDYT ? (you're the most familiar with the inference implementation ) This is probably out-of-scope for this PR but would be good to set it up properly

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to give this some thought over the weekend

for conversation in conversations:
json_obj = conversation.model_dump()
writer.write(json_obj)
with jsonlines.open(output_filepath, mode="a") as writer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can multiple threads use this function? How do you prevent them from concurrently writing to this file?

for conversation in conversations:
json_obj = conversation.model_dump()
writer.write(json_obj)
with jsonlines.open(output_filepath, mode="a") as writer:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opening the file + seek to the end for every conversation is not ideal...

@@ -184,6 +185,11 @@ async def _query_api(
response_json, conversation
)
await asyncio.sleep(remote_params.politeness_policy)
if generation_config.output_filepath:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jgreer013 it would be good to open an issue and log a comment with OPE number. I think saving the results out of order will cause a lot of confusion so it would be good to fix it.

@taenin even if we log the conversation, it's not trivial to build a unique id from the conversation content + join with any metadata the user has. It's a decently big burden on the user

taenin added a commit that referenced this pull request Oct 3, 2024
This PR is based on #558

This PR updates how writes are done for inference.

- If all requests are successful, the final written file will have all
responses in the same order as the provided input for all engines.
- During inference, all requests are written to a `/scratch` directory
containing a file with the same name. There is no guarantee of order on
the values written to this file (this truly only matters for the
InferenceEngines that leverage parallelism).

Non-parallel engines will write to disk in-line (blocking). I've
benchmarked this for medium sized files (100s of MB): appending a line
of text to these files takes on average 1.788e-05 seconds
@taenin
Copy link
Collaborator

taenin commented Oct 3, 2024

Closing this as the related PR #574 is submitted

@taenin taenin closed this Oct 3, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants