Skip to content

Commit

Permalink
feat: Document by Document streaming for gRPC protocol (#5921)
Browse files Browse the repository at this point in the history
Co-authored-by: Joan Fontanals <joan.martinez@jina.ai>
  • Loading branch information
alaeddine-13 and JoanFM authored Jul 25, 2023
1 parent 8141d99 commit c1377d9
Show file tree
Hide file tree
Showing 34 changed files with 1,985 additions and 676 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ jobs:
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/unit/serve/runtimes/test_helper.py
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/deployment_http_composite
pytest --suppress-no-test-exit-code --force-flaky --min-passes 1 --max-runs 5 --cov=jina --cov-report=xml --timeout=600 -v -s --ignore-glob='tests/integration/hub_usage/dummyhub*' tests/integration/docarray_v2/test_streaming.py
echo "flag it as jina for codeoverage"
echo "codecov_flag=jina" >> $GITHUB_OUTPUT
timeout-minutes: 30
Expand Down
2 changes: 1 addition & 1 deletion Dockerfiles/protogen-3.21.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.7-slim
FROM --platform=linux/amd64 python:3.7-slim

RUN apt-get update && apt-get install --no-install-recommends -y

Expand Down
2 changes: 1 addition & 1 deletion Dockerfiles/protogen.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM python:3.7-slim
FROM --platform=linux/amd64 python:3.7-slim

RUN apt-get update && apt-get install --no-install-recommends -y

Expand Down
6 changes: 3 additions & 3 deletions docs/concepts/serving/executor/add-endpoints.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ Streaming endpoints receive one Document as input and yields one Document at a t
```{admonition} Note
:class: note

Streaming endpoints are only supported for HTTP protocol and for Deployment.
Streaming endpoints are only supported for HTTP and gRPC protocols and for Deployment.
```

A streaming endpoint has the following signature:
Expand All @@ -284,7 +284,7 @@ class MyExecutor(Executor):
with Deployment(
uses=MyExecutor,
port=12345,
protocol='http',
protocol='http', # or 'grpc'
cors=True,
include_gateway=False,
) as dep:
Expand All @@ -296,7 +296,7 @@ Jina offers a standard python client for using the streaming endpoint:

```python
from jina import Client
client = Client(port=12345, protocol='http', cors=True, asyncio=True)
client = Client(port=12345, protocol='http', cors=True, asyncio=True) # or protocol='grpc'
async for doc in client.stream_doc(
on='/hello', inputs=MyDocument(text='hello world'), return_type=DocList[MyDocument]
):
Expand Down
51 changes: 50 additions & 1 deletion jina/clients/base/grpc.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import asyncio
import json
import threading
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Type

import grpc
from grpc import RpcError

from jina._docarray import Document
from jina.clients.base import BaseClient
from jina.clients.base.stream_rpc import StreamRpc
from jina.clients.base.unary_rpc import UnaryRpc
from jina.clients.helper import callback_exec
from jina.excepts import BadClientInput, BadServerFlow, InternalNetworkError
from jina.logging.profile import ProgressBar
from jina.proto import jina_pb2, jina_pb2_grpc
Expand Down Expand Up @@ -211,6 +213,53 @@ async def _handle_error_and_metadata(self, err):
else:
raise BadServerFlow(msg) from err

async def stream_doc_endpoint(
self,
request: jina_pb2.SingleDocumentRequestProto,
timeout: Optional[float] = None,
):
"""
Use the stream_doc stub to send one document and stream documents back from the Executor
:param request: The request to be sent
:param timeout: defines timeout for sending request
:yields: response document
"""
async with get_grpc_channel(
f'{self.args.host}:{self.args.port}',
asyncio=True,
tls=self.args.tls,
aio_tracing_client_interceptors=self.aio_tracing_client_interceptors(),
) as channel:
stub = jina_pb2_grpc.JinaSingleDocumentRequestRPCStub(channel)
try:
async for response in stub.stream_doc(request, timeout=timeout):
callback_exec(
response=response,
logger=self.logger,
)
yield response
except (grpc.aio.AioRpcError, InternalNetworkError) as err:
await self._handle_error_and_metadata(err)

async def _get_streaming_results(
self,
on: str,
inputs: 'Document',
parameters: Optional[Dict] = None,
return_type: Type[Document] = Document,
timeout: Optional[int] = None,
**kwargs,
):
request_proto = jina_pb2.SingleDocumentRequestProto(
header=jina_pb2.HeaderProto(exec_endpoint=on), document=inputs.to_protobuf()
)
async for response in self.stream_doc_endpoint(
request=request_proto, timeout=timeout
):
yield return_type.from_protobuf(response.document)


def client_grpc_options(
backoff_multiplier: float,
Expand Down
2 changes: 1 addition & 1 deletion jina/clients/base/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,11 @@ def _result_handler(result):

callback_exec(
response=resp,
logger=self.logger,
on_error=on_error,
on_done=on_done,
on_always=on_always,
continue_on_error=self.continue_on_error,
logger=self.logger,
)
if self.show_progress:
p_bar.update()
Expand Down
2 changes: 1 addition & 1 deletion jina/clients/base/stream_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ async def stream_rpc_with_retry(self):
):
callback_exec(
response=resp,
logger=self.logger,
on_error=self.on_error,
on_done=self.on_done,
on_always=self.on_always,
continue_on_error=self.continue_on_error,
logger=self.logger,
)
if self.show_progress:
self.p_bar.update()
Expand Down
2 changes: 1 addition & 1 deletion jina/clients/base/unary_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,11 @@ async def _with_retry(req: 'Request'):
def _result_handler(resp):
callback_exec(
response=resp,
logger=self.logger,
on_error=self.on_error,
on_done=self.on_done,
on_always=self.on_always,
continue_on_error=self.continue_on_error,
logger=self.logger,
)
return resp

Expand Down
2 changes: 1 addition & 1 deletion jina/clients/base/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,11 @@ def _request_handler(
):
callback_exec(
response=response,
logger=self.logger,
on_error=on_error,
on_done=on_done,
on_always=on_always,
continue_on_error=self.continue_on_error,
logger=self.logger,
)
if self.show_progress:
p_bar.update()
Expand Down
10 changes: 5 additions & 5 deletions jina/clients/helper.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""Helper functions for clients in Jina."""

from functools import wraps
from typing import Callable
from typing import Callable, Optional

from jina.excepts import BadClientCallback, BadServer
from jina.helper import get_rich_console
Expand Down Expand Up @@ -57,11 +57,11 @@ def _arg_wrapper(*args, **kwargs):

def callback_exec(
response,
on_done: Callable,
on_error: Callable,
on_always: Callable,
continue_on_error: bool,
logger: JinaLogger,
on_done: Optional[Callable] = None,
on_error: Optional[Callable] = None,
on_always: Optional[Callable] = None,
continue_on_error: bool = False,
) -> None:
"""Execute the callback with the response.
Expand Down
5 changes: 4 additions & 1 deletion jina/proto/docarray_v1/docarray.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ package docarray;
*/
message DocumentArrayProto {

}
}
message DocumentProto {

}
26 changes: 26 additions & 0 deletions jina/proto/docarray_v1/jina.proto
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,22 @@ message DataRequestProto {
DataContentProto data = 4; // container for docs and groundtruths
}


/**
* Represents a Single Document DataRequest
*/
message SingleDocumentRequestProto {

HeaderProto header = 1; // header contains meta info defined by the user

google.protobuf.Struct parameters = 2; // extra kwargs that will be used in executor

repeated RouteProto routes = 3; // status info on every routes

docarray.DocumentProto document = 4; // the document in this request

}

message DataRequestProtoWoData {

HeaderProto header = 1; // header contains meta info defined by the user
Expand Down Expand Up @@ -162,6 +178,16 @@ service JinaSingleDataRequestRPC {
}
}

/**
* jina gRPC service for DataRequests.
* This is used to send requests to Executors when a list of requests is not needed
*/
service JinaSingleDocumentRequestRPC {
// Used for streaming one document to the Executors
rpc stream_doc (SingleDocumentRequestProto) returns (stream SingleDocumentRequestProto) {
}
}

/**
* jina streaming gRPC service.
*/
Expand Down
Loading

0 comments on commit c1377d9

Please sign in to comment.