Skip to content

Commit

Permalink
fix: remove merge problems
Browse files Browse the repository at this point in the history
  • Loading branch information
JoanFM committed Jan 2, 2023
1 parent 98d036e commit 808eaf4
Show file tree
Hide file tree
Showing 12 changed files with 34 additions and 222 deletions.
2 changes: 1 addition & 1 deletion jina/clients/base/grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from jina.serve.stream import RequestStreamer
from jina.types.request.data import Request

if TYPE_CHECKING: # pragma: no cover
if TYPE_CHECKING: # pragma: no cover
from jina.clients.base import CallbackFnType, InputType


Expand Down
8 changes: 4 additions & 4 deletions jina/orchestrate/flow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def __init__(
compression: Optional[str] = None,
cors: Optional[bool] = False,
deployments_addresses: Optional[str] = '{}',
deployments_disable_reduce: Optional[str] = '[]',
deployments_no_reduce: Optional[str] = '[]',
deployments_metadata: Optional[str] = '{}',
description: Optional[str] = None,
disable_auto_volume: Optional[bool] = False,
Expand Down Expand Up @@ -224,7 +224,7 @@ def __init__(
:param compression: The compression mechanism used when sending requests from the Head to the WorkerRuntimes. For more details, check https://grpc.github.io/grpc/python/grpc.html#compression.
:param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
:param deployments_addresses: JSON dictionary with the input addresses of each Deployment
:param deployments_disable_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed
:param deployments_no_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed
:param deployments_metadata: JSON dictionary with the request metadata for each Deployment
:param description: The description of this HTTP server. It will be used in automatics docs such as Swagger UI.
:param docker_kwargs: Dictionary of kwargs arguments that will be passed to Docker SDK when starting the docker '
Expand Down Expand Up @@ -404,7 +404,7 @@ def __init__(
:param cors: If set, a CORS middleware is added to FastAPI frontend to allow cross-origin access.
:param deployments_addresses: JSON dictionary with the input addresses of each Deployment
<<<<<<< HEAD
:param deployments_disable_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed
:param deployments_no_reduce: list JSON disabling the built-in merging mechanism for each Deployment listed
:param deployments_metadata: JSON dictionary with the request metadata for each Deployment
=======
:param deployments_metadata: JSON dictionary with the request metadata for each Deployment
Expand Down Expand Up @@ -658,7 +658,7 @@ def _add_gateway(
args.graph_conditions = json.dumps(graph_conditions)
args.deployments_addresses = json.dumps(deployments_addresses)
args.deployments_metadata = json.dumps(deployments_metadata)
args.deployments_disable_reduce = json.dumps(deployments_disabled_reduce)
args.deployments_no_reduce = json.dumps(deployments_disabled_reduce)
self._deployment_nodes[GATEWAY_NAME] = Deployment(args, needs)

def _get_deployments_metadata(self) -> Dict[str, Dict[str, str]]:
Expand Down
2 changes: 1 addition & 1 deletion jina/parsers/deprecated.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
'host_in': 'host',
'https': 'tls',
'disable_reduce': 'no_reduce',
'deployments_disable_reduce': 'deployments_no_reduce',
'deployments_no_reduce': 'deployments_no_reduce',
}


Expand Down
15 changes: 15 additions & 0 deletions jina/parsers/orchestrate/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,18 @@ def mixin_pod_runtime_args_parser(arg_group, pod_type='worker'):
default=None,
help='A parent directory for storing all snapshots.',
)


def mixin_hub_pull_options_parser(parser):
"""Add the arguments for hub pull options to the parser
:param parser: the parser configure
"""

gp = add_arg_group(parser, title='Pull')
gp.add_argument(
'--force-update',
'--force',
action='store_true',
default=False,
help='If set, always pull the latest Hub Executor bundle even it exists on local',
)
13 changes: 0 additions & 13 deletions jina/parsers/orchestrate/runtimes/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,6 @@ def mixin_gateway_protocol_parser(parser):
:param parser: the parser configure
"""

parser.add_argument(
'--deployments-addresses',
type=str,
help='JSON dictionary with the input addresses of each Deployment',
default='{}',
)

parser.add_argument(
'--deployments-metadata',
type=str,
help='JSON dictionary with the request metadata for each Deployment',
default='{}',
)
from jina.enums import GatewayProtocolType

parser.add_argument(
Expand Down
2 changes: 1 addition & 1 deletion jina/proto/build-proto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ SRC_NAME="${MODULE}.proto"

COMP_PROTO_OUT_NAME="${MODULE}_pb2.py"
COMP_GRPC_OUT_NAME="${MODULE}_pb2_grpc.py"
OUT_FOLDER="${2:-pb}/"
OUT_FOLDER="${2:-pb2}/"

VER_FILE=../__init__.py

Expand Down
129 changes: 4 additions & 125 deletions jina/proto/docarray.proto
Original file line number Diff line number Diff line change
@@ -1,131 +1,10 @@
syntax = "proto3";
import "google/protobuf/struct.proto";

package docarray;
option go_package = "github.com/jina-ai/jina-raft/docarray";

/**
* Represents a (quantized) dense n-dim array
/*
this file is just a placeholder for the DA coming from docarray dependency
*/
message DenseNdArrayProto {
// the actual array data, in bytes
bytes buffer = 1;

// the shape (dimensions) of the array
repeated uint32 shape = 2;

// the data type of the array
string dtype = 3;
}

/**
* Represents a general n-dim array, can be either dense or sparse
*/
message NdArrayProto {
oneof content {
DenseNdArrayProto dense = 1; // dense representation of the ndarray
SparseNdArrayProto sparse = 2; // sparse representation of the ndarray
}

// the name of the ndarray class
string cls_name = 3;

google.protobuf.Struct parameters = 4;
}

/**
* Represents a sparse ndarray
*/
message SparseNdArrayProto {
// A 2-D int64 tensor of shape [N, ndims], which specifies the indices of the elements in the sparse tensor that contain nonzero values (elements are zero-indexed)
DenseNdArrayProto indices = 1;

// A 1-D tensor of any type and shape [N], which supplies the values for each element in indices.
DenseNdArrayProto values = 2;

// A 1-D int64 tensor of shape [ndims], which specifies the shape of the sparse tensor.
repeated uint32 shape = 3;
}

/**
* Represents the relevance model to `ref_id`
*/
message NamedScoreProto {
float value = 1; // value
string op_name = 2; // the name of the operator/score function
string description = 3; // text description of the score
string ref_id = 4; // the score is computed between doc `id` and `ref_id`
}


/**
* Represents a Document
*/
message DocumentProto {
// A hexdigest that represents a unique document ID
string id = 1;

oneof content {
// the raw binary content of this document, which often represents the original document when comes into jina
bytes blob = 2;

// the ndarray of the image/audio/video document
NdArrayProto tensor = 3;

// a text document
string text = 4;
}

// the depth of the recursive chunk structure
uint32 granularity = 5;

// the width of the recursive match structure
uint32 adjacency = 6;

// the parent id from the previous granularity
string parent_id = 7;

// The weight of this document
float weight = 8;

// a uri of the document could be: a local file path, a remote url starts with http or https or data URI scheme
string uri = 9;

// modality, an identifier to the modality this document belongs to. In the scope of multi/cross modal search
string modality = 10;

// mime type of this document, for buffer content, this is required; for other contents, this can be guessed
string mime_type = 11;

// the offset of the doc
float offset = 12;

// the position of the doc, could be start and end index of a string; could be x,y (top, left) coordinate of an image crop; could be timestamp of an audio clip
repeated float location = 13;

// list of the sub-documents of this document (recursive structure)
repeated DocumentProto chunks = 14;

// the matched documents on the same level (recursive structure)
repeated DocumentProto matches = 15;

// the embedding of this document
NdArrayProto embedding = 16;

// a structured data value, consisting of field which map to dynamically typed values.
google.protobuf.Struct tags = 17;

// Scores performed on the document, each element corresponds to a metric
map<string, NamedScoreProto> scores = 18;

// Evaluations performed on the document, each element corresponds to a metric
map<string, NamedScoreProto> evaluations = 19;

// system-defined meta attributes represented in a structured data value.
google.protobuf.Struct _metadata = 20;

}

message DocumentArrayProto {
repeated DocumentProto docs = 1; // a list of Documents
}

}
6 changes: 3 additions & 3 deletions jina/serve/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ def __init__(
graph_conditions = json.loads(runtime_args.graph_conditions)
deployments_addresses = json.loads(runtime_args.deployments_addresses)
deployments_metadata = json.loads(runtime_args.deployments_metadata)
deployments_disable_reduce = json.loads(runtime_args.deployments_disable_reduce)
deployments_no_reduce = json.loads(runtime_args.deployments_no_reduce)

self.streamer = GatewayStreamer(
graph_representation=graph_description,
executor_addresses=deployments_addresses,
graph_conditions=graph_conditions,
deployments_metadata=deployments_metadata,
deployments_disable_reduce=deployments_disable_reduce,
deployments_no_reduce=deployments_no_reduce,
timeout_send=timeout_send,
retries=self.runtime_args.retries,
compression=self.runtime_args.compression,
Expand All @@ -106,7 +106,7 @@ def __init__(
executor_addresses=deployments_addresses,
graph_conditions=graph_conditions,
deployments_metadata=deployments_metadata,
deployments_disable_reduce=deployments_disable_reduce,
deployments_no_reduce=deployments_no_reduce,
timeout_send=self.runtime_args.timeout_send,
retries=self.runtime_args.retries,
compression=self.runtime_args.compression,
Expand Down
67 changes: 0 additions & 67 deletions jina/serve/networking.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,43 +776,6 @@ def __init__(
)
self._deployment_address_map = {}

def send_request(
self,
request: Request,
deployment: str,
head: bool = False,
shard_id: Optional[int] = None,
polling_type: PollingType = PollingType.ANY,
endpoint: Optional[str] = None,
metadata: Optional[Dict[str, str]] = None,
timeout: Optional[float] = None,
retries: Optional[int] = -1,
) -> List[asyncio.Task]:
"""Send a single message to target via one or all of the pooled connections, depending on polling_type. Convenience function wrapper around send_request.
:param request: a single request to send
:param deployment: name of the Jina deployment to send the message to
:param head: If True it is send to the head, otherwise to the worker pods
:param shard_id: Send to a specific shard of the deployment, ignored for polling ALL
:param polling_type: defines if the message should be send to any or all pooled connections for the target
:param endpoint: endpoint to target with the request
:param metadata: metadata to send with the request
:param timeout: timeout for sending the requests
:param retries: number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
:return: list of asyncio.Task items for each send call
"""
return self.send_requests(
requests=[request],
deployment=deployment,
head=head,
shard_id=shard_id,
polling_type=polling_type,
endpoint=endpoint,
metadata=metadata,
timeout=timeout,
retries=retries,
)


def send_requests(
self,
requests: List[Request],
Expand Down Expand Up @@ -894,36 +857,6 @@ def send_discover_endpoint(
)
return None

def send_request_once(
self,
request: Request,
deployment: str,
metadata: Optional[Dict[str, str]] = None,
head: bool = False,
shard_id: Optional[int] = None,
timeout: Optional[float] = None,
retries: Optional[int] = -1,
) -> asyncio.Task:
"""Send msg to target via only one of the pooled connections
:param request: request to send
:param deployment: name of the Jina deployment to send the message to
:param metadata: metadata to send with the request
:param head: If True it is send to the head, otherwise to the worker pods
:param shard_id: Send to a specific shard of the deployment, ignored for polling ALL
:param timeout: timeout for sending the requests
:param retries: number of retries per gRPC call. If <0 it defaults to max(3, num_replicas)
:return: asyncio.Task representing the send call
"""
return self.send_requests_once(
[request],
deployment=deployment,
metadata=metadata,
head=head,
shard_id=shard_id,
timeout=timeout,
retries=retries,
)

def send_requests_once(
self,
requests: List[Request],
Expand Down
4 changes: 2 additions & 2 deletions jina/serve/runtimes/gateway/graph/topology_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def __init__(
graph_representation: Dict,
graph_conditions: Dict = {},
deployments_metadata: Dict = {},
deployments_disable_reduce: List[str] = [],
deployments_no_reduce: List[str] = [],
timeout_send: Optional[float] = 1.0,
retries: Optional[int] = -1,
logger: Optional[JinaLogger] = None,
Expand Down Expand Up @@ -373,7 +373,7 @@ def __init__(
floating=node_name in floating_deployment_set,
filter_condition=condition,
metadata=metadata,
reduce=node_name not in deployments_disable_reduce,
reduce=node_name not in deployments_no_reduce,
timeout_send=timeout_send,
retries=retries,
logger=self.logger,
Expand Down
2 changes: 0 additions & 2 deletions jina/serve/runtimes/worker/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,6 @@ async def _async_setup_grpc_server(self):
self._data_request_handler._executor, self._grpc_server
)

for service in service_names:
self._health_servicer.set(service, health_pb2.HealthCheckResponse.SERVING)
reflection.enable_server_reflection(service_names, self._grpc_server)
bind_addr = f'0.0.0.0:{self.args.port}'
self.logger.debug(f'start listening on {bind_addr}')
Expand Down
Loading

0 comments on commit 808eaf4

Please sign in to comment.