Skip to content
2 changes: 1 addition & 1 deletion .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"modification": 2
"modification": 4
}

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 9
"modification": 11
}
10 changes: 0 additions & 10 deletions sdks/python/apache_beam/io/external/xlang_jdbcio_it_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.typehints.schemas import LogicalType
from apache_beam.typehints.schemas import MillisInstant
from apache_beam.utils.timestamp import Timestamp

# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
Expand Down Expand Up @@ -242,10 +240,6 @@ def test_xlang_jdbc_write_read(self, database):

config = self.jdbc_configs[database]

# Register MillisInstant logical type to override the mapping from Timestamp
# originally handled by MicrosInstant.
LogicalType.register_logical_type(MillisInstant)

with TestPipeline() as p:
p.not_use_test_runner_api = True
_ = (
Expand Down Expand Up @@ -356,10 +350,6 @@ def custom_row_equals(expected, actual):
classpath=config['classpath'],
))

# Register MillisInstant logical type to override the mapping from Timestamp
# originally handled by MicrosInstant.
LogicalType.register_logical_type(MillisInstant)

# Run read pipeline with custom schema
with TestPipeline() as p:
p.not_use_test_runner_api = True
Expand Down
17 changes: 15 additions & 2 deletions sdks/python/apache_beam/io/jdbc.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@

# pytype: skip-file

import contextlib
import datetime
import typing

Expand Down Expand Up @@ -257,6 +258,17 @@ def __init__(
)


@contextlib.contextmanager
def enforce_millis_instant_for_timestamp():
old_registry = LogicalType._known_logical_types
LogicalType._known_logical_types = old_registry.copy()
try:
LogicalType.register_logical_type(MillisInstant)
yield
finally:
LogicalType._known_logical_types = old_registry


class ReadFromJdbc(ExternalTransform):
"""A PTransform which reads Rows from the specified database via JDBC.

Expand Down Expand Up @@ -352,8 +364,9 @@ def __init__(

dataSchema = None
if schema is not None:
# Convert Python schema to Beam Schema proto
schema_proto = typing_to_runner_api(schema).row_type.schema
with enforce_millis_instant_for_timestamp():
# Convert Python schema to Beam Schema proto
schema_proto = typing_to_runner_api(schema).row_type.schema
# Serialize the proto to bytes for transmission
dataSchema = schema_proto.SerializeToString()

Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/typehints/row_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ def __init__(
schema_id=schema_id,
schema_options=schema_options,
field_options=field_options,
field_descriptions=field_descriptions,
**kwargs)
user_type = named_tuple_from_schema(schema, **kwargs)

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/typehints/schema_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def generate_new_id(self):
"schemas.")

def add(self, typing, schema):
if not schema.id:
if schema.id:
Copy link
Collaborator Author

@shunping shunping Jun 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know why we change this line in #28169 to disable the look up of schema registry.

Let's see if this breaks anythong.

Copy link
Collaborator Author

@shunping shunping Jun 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Abacn, do you know which test workflow covers this test?

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local tests passed with ./gradlew --info sdk:java:io:google-cloud-platform:test --tests 'org.apache.beam.sdk.io.gcp.bigquery.providers.*'.

self.by_id[schema.id] = (typing, schema)

def get_typing_by_id(self, unique_id):
Expand Down
28 changes: 19 additions & 9 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,11 @@ def named_fields_to_schema(
schema_options: Optional[Sequence[Tuple[str, Any]]] = None,
field_options: Optional[Dict[str, Sequence[Tuple[str, Any]]]] = None,
schema_registry: SchemaTypeRegistry = SCHEMA_REGISTRY,
field_descriptions: Optional[Dict[str, str]] = None,
):
schema_options = schema_options or []
field_options = field_options or {}
field_descriptions = field_descriptions or {}

if isinstance(names_and_types, dict):
names_and_types = names_and_types.items()
Expand All @@ -158,7 +160,8 @@ def named_fields_to_schema(
option_to_runner_api(option_tuple)
for option_tuple in field_options.get(name, [])
],
) for (name, type) in names_and_types
description=field_descriptions.get(name, None))
for (name, type) in names_and_types
],
options=[
option_to_runner_api(option_tuple) for option_tuple in schema_options
Expand Down Expand Up @@ -616,6 +619,13 @@ def schema_from_element_type(element_type: type) -> schema_pb2.Schema:
if isinstance(element_type, row_type.RowTypeConstraint):
return named_fields_to_schema(element_type._fields)
elif match_is_named_tuple(element_type):
if hasattr(element_type, row_type._BEAM_SCHEMA_ID):
# if the named tuple's schema is in registry, we just use it instead of
# regenerating one.
schema_id = getattr(element_type, row_type._BEAM_SCHEMA_ID)
schema = SCHEMA_REGISTRY.get_schema_by_id(schema_id)
if schema is not None:
return schema
return named_tuple_to_schema(element_type)
else:
raise TypeError(
Expand Down Expand Up @@ -1017,15 +1027,15 @@ def representation_type(cls):
def language_type(cls):
return decimal.Decimal

def to_representation_type(self, value):
# type: (decimal.Decimal) -> bytes

return DecimalLogicalType().to_representation_type(value)

def to_language_type(self, value):
# type: (bytes) -> decimal.Decimal
# from language type (decimal.Decimal) to representation type
# (the type corresponding to the coder used in DecimalLogicalType)
def to_representation_type(self, value: decimal.Decimal) -> decimal.Decimal:
return value

return DecimalLogicalType().to_language_type(value)
# from representation type (the type corresponding to the coder used in
# DecimalLogicalType) to language type
def to_language_type(self, value: decimal.Decimal) -> decimal.Decimal:
return value

@classmethod
def argument_type(cls):
Expand Down
Loading