Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import grpc
from google.protobuf import text_format

from apache_beam import coders
from apache_beam.internal import pickler
from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.portability.api import beam_job_api_pb2_grpc
Expand All @@ -55,7 +57,11 @@ class UniversalLocalRunner(runner.PipelineRunner):
the runner and worker(s).
"""

def __init__(self, use_grpc=True, use_subprocesses=False):
def __init__(
self,
use_grpc=True,
use_subprocesses=False,
runner_api_address=None):
if use_subprocesses and not use_grpc:
raise ValueError("GRPC must be used with subprocesses")
super(UniversalLocalRunner, self).__init__()
Expand All @@ -65,6 +71,7 @@ def __init__(self, use_grpc=True, use_subprocesses=False):
self._job_service = None
self._job_service_lock = threading.Lock()
self._subprocess = None
self._runner_api_address = runner_api_address

def __del__(self):
# Best effort to not leave any dangling processes around.
Expand All @@ -79,7 +86,10 @@ def cleanup(self):
def _get_job_service(self):
with self._job_service_lock:
if not self._job_service:
if self._use_subprocesses:
if self._runner_api_address:
self._job_service = beam_job_api_pb2_grpc.JobServiceStub(
grpc.insecure_channel(self._runner_api_address))
elif self._use_subprocesses:
self._job_service = self._start_local_runner_subprocess_job_service()

elif self._use_grpc:
Expand Down Expand Up @@ -137,11 +147,23 @@ def _start_local_runner_subprocess_job_service(self):
return job_service

def run_pipeline(self, pipeline):
# Java has different expectations about coders
# (windowed in Fn API, but *un*windowed in runner API), whereas the
# FnApiRunner treats them consistently, so we must guard this.
# See also BEAM-2717.
proto_pipeline, proto_context = pipeline.to_runner_api(return_context=True)
if self._runner_api_address:
for pcoll in proto_pipeline.components.pcollections.values():
if pcoll.coder_id not in proto_context.coders:
coder = coders.registry.get_coder(pickler.loads(pcoll.coder_id))
pcoll.coder_id = proto_context.coders.get_id(coder)
proto_context.coders.populate_map(proto_pipeline.components.coders)

job_service = self._get_job_service()
prepare_response = job_service.Prepare(
beam_job_api_pb2.PrepareJobRequest(
job_name='job',
pipeline=pipeline.to_runner_api()))
pipeline=proto_pipeline))
run_response = job_service.Run(beam_job_api_pb2.RunJobRequest(
preparation_id=prepare_response.preparation_id))
return PipelineResult(job_service, run_response.job_id)
Expand Down