Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Endpoint.predict method async #1998

Open
igonro opened this issue Mar 3, 2023 · 2 comments
Open

Make Endpoint.predict method async #1998

igonro opened this issue Mar 3, 2023 · 2 comments
Labels
api: vertex-ai Issues related to the googleapis/python-aiplatform API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.

Comments

@igonro
Copy link

igonro commented Mar 3, 2023

Problem

I want to request predictions on my image classifier endpoint. Since there is a limit of 1.5 MB per request, if I want to get predictions for several images I have to do the following:

import base64
import time
from io import BytesIO

from PIL import Image
from pympler.asizeof import asizeof


def sync_predict(endpoint, instances):
    return endpoint.predict(instances=instances).predictions


def make_sync_predictions(image_list, endpoint, max_mbs=1.5):
    predictions = []
    batch, batch_mbs = [], 0.0
    for i, image_path in enumerate(image_list):
        image = Image.open(image_path).convert("RGB")
        buffer = BytesIO()
        image.save(buffer, format="JPEG")
        enc_image = base64.b64encode(buffer.getvalue())
        b64_image = str(enc_image.decode("utf-8"))
        image_mbs = asizeof(b64_image) / (1000**2)

        if batch_mbs + image_mbs >= max_mbs:
            predictions += sync_predict(endpoint=endpoint, instances=batch)
            batch, batch_mbs = [{"data": {"b64": b64_image}}], image_mbs
        else:
            batch.append({"data": {"b64": b64_image}})
            batch_mbs += image_mbs

        if i == len(image_list) - 1:
            predictions += sync_predict(endpoint=endpoint, instances=batch)

    image_names = [image.name for image in image_list]
    return zip(image_names, predictions)


start = time.perf_counter()
result = make_sync_predictions(image_list=test_images, endpoint=endpoint)
print(f"Predictions: {[item for item in result]}")
print(f"Total elapsed time: {time.perf_counter() - start} s")

But obviously, this way I cannot benefit from having multiple replicas, for example an deployed model with min_replica_count=2. So I change to this:

import asyncio
import base64
import time
from io import BytesIO

from PIL import Image
from pympler.asizeof import asizeof


async def async_predict(endpoint, batch, queue):
    image_names, instances = batch["image_names"], batch["instances"]
    response = await endpoint.predict(instances=instances) # this doesn't work because it isn't async
    for image_name, prediction in zip(image_names, response.predictions):
        await queue.put((image_name, prediction))


async def make_async_predictions(image_list, endpoint, max_mbs=1.5):
    batches = []
    batch_names, batch_data, batch_mbs = [], [], 0.0
    for i, image_path in enumerate(image_list):
        image = Image.open(image_path).convert("RGB")
        buffer = BytesIO()
        image.save(buffer, format="JPEG")
        enc_image = base64.b64encode(buffer.getvalue())
        b64_image = str(enc_image.decode("utf-8"))
        image_mbs = asizeof(b64_image) / (1000**2)

        if batch_mbs + image_mbs >= max_mbs:
            batches.append({"image_names": batch_names, "instances": batch_data})
            batch_names, batch_data, batch_mbs = [image_path.name], [{"data": {"b64": b64_image}}], image_mbs
        else:
            batch_names.append(image_path.name)
            batch_data.append({"data": {"b64": b64_image}})
            batch_mbs += image_mbs

        if i == len(image_list) - 1:
            batches.append({"image_names": batch_names, "instances": batch_data})

    predictions_queue = asyncio.Queue()
    predictors = [asyncio.create_task(async_predict(endpoint, batch, predictions_queue)) for batch in batches]
    await asyncio.gather(*predictors)
    result = []
    while not predictions_queue.empty():
        item = await predictions_queue.get()
        result.append(item)
        predictions_queue.task_done()

    return result


start = time.perf_counter()
result = await make_async_predictions(image_list=test_images, endpoint=endpoint)
print(f"Predictions: {[item for item in result]}")
print(f"Total elapsed time: {time.perf_counter() - start} s")

Workaround

I can solve this by changing the endpoint.predict line to:

response = await asyncio.get_event_loop().run_in_executor(None, endpoint.predict, instances)

But I think there should be an async_predict method, or maybe there should be a parameter sync: bool that would make the call blocking or non-blocking depending on the parameter.

@product-auto-label product-auto-label bot added the api: vertex-ai Issues related to the googleapis/python-aiplatform API. label Mar 3, 2023
@jaycee-li jaycee-li added the type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design. label Mar 14, 2023
@trevdoz
Copy link

trevdoz commented Nov 15, 2023

any update on this? I see there is a predict_async method but when I currently use it on an open source LLM deployed to vertex ai it only returns a single prediction even though I'm passing multiple instances

@ishaan-jaff
Copy link

@trevdoz I'm the maintainer of LiteLLM we allow to benefit from having multiple deployments using the LiteLLM load balancer (Compatible with Vertex AI, 100+ LLMs)

I'd love to be helpful here if you're trying to use multiple deployments - let me know if there's something missing in LiteLLM for your use case

Here's an example on using it with GPT-3.5 on Azure + OpenAI

from litellm import Router

model_list = [{ # list of model deployments 
    "model_name": "gpt-3.5-turbo", # model alias 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-v-2", # actual model name
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE")
    }
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "azure/chatgpt-functioncalling", 
        "api_key": os.getenv("AZURE_API_KEY"),
        "api_version": os.getenv("AZURE_API_VERSION"),
        "api_base": os.getenv("AZURE_API_BASE")
    }
}, {
    "model_name": "gpt-3.5-turbo", 
    "litellm_params": { # params for litellm completion/embedding call 
        "model": "gpt-3.5-turbo", 
        "api_key": os.getenv("OPENAI_API_KEY"),
    }
}]

router = Router(model_list=model_list)

# openai.ChatCompletion.create replacement
response = await router.acompletion(model="gpt-3.5-turbo", 
                messages=[{"role": "user", "content": "Hey, how's it going?"}])

print(response)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: vertex-ai Issues related to the googleapis/python-aiplatform API. type: feature request ‘Nice-to-have’ improvement, new feature or different behavior or design.
Projects
None yet
Development

No branches or pull requests

4 participants