Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdks/python/apache_beam/coders/coder_impl.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ cdef class FastPrimitivesCoderImpl(StreamCoderImpl):
cdef object requires_deterministic_step_label
cdef bint warn_deterministic_fallback
cdef bint force_use_dill
cdef bint use_relative_filepaths

@cython.locals(dict_value=dict, int_value=libc.stdint.int64_t,
unicode_value=unicode)
Expand Down
14 changes: 11 additions & 3 deletions sdks/python/apache_beam/coders/coder_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@

from apache_beam.coders import observable
from apache_beam.coders.avro_record import AvroRecord
from apache_beam.internal import cloudpickle
from apache_beam.internal import cloudpickle_pickler
from apache_beam.typehints.schemas import named_tuple_from_schema
from apache_beam.utils import proto_utils
Expand Down Expand Up @@ -377,12 +378,14 @@ def __init__(
self,
fallback_coder_impl,
requires_deterministic_step_label=None,
force_use_dill=False):
force_use_dill=False,
use_relative_filepaths=True):
self.fallback_coder_impl = fallback_coder_impl
self.iterable_coder_impl = IterableCoderImpl(self)
self.requires_deterministic_step_label = requires_deterministic_step_label
self.warn_deterministic_fallback = True
self.force_use_dill = force_use_dill
self.use_relative_filepaths = use_relative_filepaths

@staticmethod
def register_iterable_like_type(t):
Expand Down Expand Up @@ -560,8 +563,13 @@ def encode_type(self, t, stream):
return self.encode_type_2_67_0(t, stream)

if t not in _pickled_types:
_pickled_types[t] = cloudpickle_pickler.dumps(
t, config=cloudpickle_pickler.NO_DYNAMIC_CLASS_TRACKING_CONFIG)
config = cloudpickle.CloudPickleConfig(
id_generator=None,
skip_reset_dynamic_type_state=True,
filepath_interceptor=cloudpickle.get_relative_path)
if not self.use_relative_filepaths:
config.filepath_interceptor = None
_pickled_types[t] = cloudpickle_pickler.dumps(t, config=config)
stream.write(_pickled_types[t], True)

def decode_type(self, stream):
Expand Down
39 changes: 30 additions & 9 deletions sdks/python/apache_beam/coders/coders.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,16 +927,24 @@ def _create_impl(self):

class DeterministicFastPrimitivesCoderV2(FastCoder):
"""Throws runtime errors when encoding non-deterministic values."""
def __init__(self, coder, step_label):
def __init__(self, coder, step_label, update_compatibility_version=None):
self._underlying_coder = coder
self._step_label = step_label
self._use_relative_filepaths = True
self._version_tag = "v2_69"
from apache_beam.transforms.util import is_v1_prior_to_v2
# Versions prior to 2.69.0 did not use relative filepaths.
if update_compatibility_version and is_v1_prior_to_v2(
v1=update_compatibility_version, v2="2.69.0"):
self._version_tag = ""
self._use_relative_filepaths = False

def _create_impl(self):

return coder_impl.FastPrimitivesCoderImpl(
self._underlying_coder.get_impl(),
requires_deterministic_step_label=self._step_label,
force_use_dill=False)
force_use_dill=False,
use_relative_filepaths=self._use_relative_filepaths)

def is_deterministic(self):
# type: () -> bool
Expand All @@ -962,6 +970,9 @@ def to_runner_api_parameter(self, context):
google.protobuf.wrappers_pb2.BytesValue(value=serialize_coder(self)),
())

def version_tag(self):
return self._version_tag


class DeterministicFastPrimitivesCoder(FastCoder):
"""Throws runtime errors when encoding non-deterministic values."""
Expand Down Expand Up @@ -993,11 +1004,8 @@ def to_type_hint(self):
return Any


def _should_force_use_dill():
from apache_beam.coders import typecoders
def _should_force_use_dill(update_compat_version):
from apache_beam.transforms.util import is_v1_prior_to_v2
update_compat_version = typecoders.registry.update_compatibility_version

if not update_compat_version:
return False

Expand All @@ -1016,9 +1024,22 @@ def _should_force_use_dill():


def _update_compatible_deterministic_fast_primitives_coder(coder, step_label):
if _should_force_use_dill():
""" Returns the update compatible version of DeterministicFastPrimitivesCoder
The differences are in how "special types" e.g. NamedTuples, Dataclasses are
deterministically encoded.

- In SDK version <= 2.67.0 dill is used to encode "special types"
- In SDK version 2.68.0 cloudpickle is used to encode "special types" with
absolute filepaths in code objects and dynamic functions.
- In SDK version 2.69.0 cloudpickle is used to encode "special types" with
relative filepaths in code objects and dynamic functions.
"""
from apache_beam.coders import typecoders
update_compat_version = typecoders.registry.update_compatibility_version
if _should_force_use_dill(update_compat_version):
return DeterministicFastPrimitivesCoder(coder, step_label)
return DeterministicFastPrimitivesCoderV2(coder, step_label)
return DeterministicFastPrimitivesCoderV2(
coder, step_label, update_compat_version)


class FastPrimitivesCoder(FastCoder):
Expand Down
57 changes: 50 additions & 7 deletions sdks/python/apache_beam/coders/coders_test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import enum
import logging
import math
import os
import pickle
import subprocess
import sys
Expand Down Expand Up @@ -248,12 +249,21 @@ def test_memoizing_pickle_coder(self):
@parameterized.expand([
param(compat_version=None),
param(compat_version="2.67.0"),
param(compat_version="2.68.0"),
])
def test_deterministic_coder(self, compat_version):
""" Test in process determinism for all special deterministic types

- In SDK version <= 2.67.0 dill is used to encode "special types"
- In SDK version 2.68.0 cloudpickle is used to encode "special types" with
absolute filepaths in code objects and dynamic functions.
- In SDK version >=2.69.0 cloudpickle is used to encode "special types"
with relative filepaths in code objects and dynamic functions.
"""

typecoders.registry.update_compatibility_version = compat_version
coder = coders.FastPrimitivesCoder()
if not dill and compat_version:
if not dill and compat_version == "2.67.0":
with self.assertRaises(RuntimeError):
coder.as_deterministic_coder(step_label="step")
self.skipTest('Dill not installed')
Expand Down Expand Up @@ -283,7 +293,7 @@ def test_deterministic_coder(self, compat_version):
# Skip this test during cloudpickle. Dill monkey patches the __reduce__
# method for anonymous named tuples (MyNamedTuple) which is not pickleable.
# Since the test is parameterized the type gets colbbered.
if compat_version:
if compat_version == "2.67.0":
self.check_coder(
deterministic_coder, [MyNamedTuple(1, 2), MyTypedNamedTuple(1, 'a')])

Expand Down Expand Up @@ -324,8 +334,18 @@ def test_deterministic_coder(self, compat_version):
@parameterized.expand([
param(compat_version=None),
param(compat_version="2.67.0"),
param(compat_version="2.68.0"),
])
def test_deterministic_map_coder_is_update_compatible(self, compat_version):
""" Test in process determinism for map coder including when a component
coder uses DeterministicFastPrimitivesCoder for "special types".

- In SDK version <= 2.67.0 dill is used to encode "special types"
- In SDK version 2.68.0 cloudpickle is used to encode "special types" with
absolute filepaths in code objects and dynamic functions.
- In SDK version >=2.69.0 cloudpickle is used to encode "special types"
with relative file.
"""
typecoders.registry.update_compatibility_version = compat_version
values = [{
MyTypedNamedTuple(i, 'a'): MyTypedNamedTuple('a', i)
Expand All @@ -335,7 +355,7 @@ def test_deterministic_map_coder_is_update_compatible(self, compat_version):
coder = coders.MapCoder(
coders.FastPrimitivesCoder(), coders.FastPrimitivesCoder())

if not dill and compat_version:
if not dill and compat_version == "2.67.0":
with self.assertRaises(RuntimeError):
coder.as_deterministic_coder(step_label="step")
self.skipTest('Dill not installed')
Expand All @@ -344,8 +364,8 @@ def test_deterministic_map_coder_is_update_compatible(self, compat_version):

assert isinstance(
deterministic_coder._key_coder,
coders.DeterministicFastPrimitivesCoderV2
if not compat_version else coders.DeterministicFastPrimitivesCoder)
coders.DeterministicFastPrimitivesCoderV2 if compat_version
in (None, "2.68.0") else coders.DeterministicFastPrimitivesCoder)

self.check_coder(deterministic_coder, *values)

Expand Down Expand Up @@ -681,11 +701,20 @@ def test_param_windowed_value_coder(self):
@parameterized.expand([
param(compat_version=None),
param(compat_version="2.67.0"),
param(compat_version="2.68.0"),
])
def test_cross_process_encoding_of_special_types_is_deterministic(
self, compat_version):
"""Test cross-process determinism for all special deterministic types"""
if compat_version:
"""Test cross-process determinism for all special deterministic types

- In SDK version <= 2.67.0 dill is used to encode "special types"
- In SDK version 2.68.0 cloudpickle is used to encode "special types" with
absolute filepaths in code objects and dynamic functions.
- In SDK version 2.69.0 cloudpickle is used to encode "special types" with
relative filepaths in code objects and dynamic functions.
"""
is_using_dill = compat_version == "2.67.0"
if is_using_dill:
pytest.importorskip("dill")

if sys.executable is None:
Expand Down Expand Up @@ -785,6 +814,7 @@ def run_subprocess():
deterministic_coder = coder.as_deterministic_coder("step")

for test_name in results1:

data1 = results1[test_name]
data2 = results2[test_name]

Expand All @@ -799,6 +829,19 @@ def run_subprocess():
logging.warning("Could not decode %s data due to %s", test_name, e)
continue

if test_name == "named_tuple_simple" and not is_using_dill:
# The absense of a compat_version means we are using the most recent
# implementation of the coder, which uses relative paths.
should_have_relative_path = not compat_version
named_tuple_type = type(decoded1)
self.assertEqual(
os.path.isabs(named_tuple_type._make.__code__.co_filename),
not should_have_relative_path)
self.assertEqual(
os.path.isabs(
named_tuple_type.__getnewargs__.__globals__['__file__']),
not should_have_relative_path)

self.assertEqual(
decoded1, decoded2, f"Cross-process decoding differs for {test_name}")
self.assertIsInstance(
Expand Down
Loading