Skip to content

Commit f7071ef

Browse files
authored
Enable prism by default (where supported) (#34612)
* Enable prism by default * Clean up fallback code * Clean up fallback code * Add missing base case * Add missing base case * Fix tests * Fix tests * exclude unsupported state * Exclude locally materialized results * Fix fallback/dupe execution * Fix some snippet tests * Disable argparse abbreviation * Some test fixes for prism switch * lint * row fixes * Make row comp logic default * Add more yaml examples * Fix dataframes tests * Examples fixes * type hints * Some more transform test fixes * more test fixes * More generic error checks * Reshuffle tests * enrichment error catching * more test fixes * Scope out external transforms * simplify test * Fix more tests * Clean up test * correct runner mode * ML tests * ib collect test * Make sure assertions dont fire in incorrect order * ML test fixes * typing * More fixes * some more fixes * Another error fix * Temporarily set log level to debug * yapf * More error regex fixes * Fix some error messages/metric tests * more generic tests * Upgrade logging to warning to see what is happening * Some more patches * Wait until finish for test pipelines * fix test_always * A few more small fixes * Temporarily update logging * clean up merge * Some more exclusions * regex isssues * Propogate original failure when using instruction cache * Batching fix * Trigger some postcommits * fmt * More test fixes * Fix a few more tests * linting * Bump workflow timeout (#35420) * Bump workflow timeout * Update beam_PostCommit_Python_Xlang_Gcp_Direct.yml * linting/fixes * linting * Avoid problem with temp file getting deleted * Avoid problem with temp file getting deleted * minor cleanup * Some more minor fixes * Fix test * Fix another test with questionable assumptions * Dont wait on tmpfile being destroyed * More regex checks * Add more postcommits, clean up some error messages * Clean up a few tests * simplify test * Clean up test errors
1 parent def8664 commit f7071ef

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

47 files changed

+378
-399
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 1
3+
"modification": 2
44
}
55

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 1
3+
"modification": 2
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 8
3+
"modification": 9
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 5
3+
"modification": 6
44
}

.github/workflows/beam_PostCommit_Python_Dependency.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ jobs:
6060
job_name: ['beam_PostCommit_Python_Dependency']
6161
job_phrase: ['Run Python PostCommit Dependency']
6262
python_version: ['3.9','3.12']
63-
timeout-minutes: 120
63+
timeout-minutes: 180
6464
if: |
6565
github.event_name == 'workflow_dispatch' ||
6666
github.event_name == 'pull_request_target' ||

.github/workflows/beam_PostCommit_Python_Xlang_Gcp_Direct.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ jobs:
5757
(github.event_name == 'schedule' && github.repository == 'apache/beam') ||
5858
github.event.comment.body == 'Run Python_Xlang_Gcp_Direct PostCommit'
5959
runs-on: [self-hosted, ubuntu-20.04, highmem]
60-
timeout-minutes: 100
60+
timeout-minutes: 160
6161
name: ${{ matrix.job_name }} (${{ matrix.job_phrase }})
6262
strategy:
6363
matrix:
@@ -98,4 +98,4 @@ jobs:
9898
commit: '${{ env.prsha || env.GITHUB_SHA }}'
9999
comment_mode: ${{ github.event_name == 'issue_comment' && 'always' || 'off' }}
100100
files: '**/pytest*.xml'
101-
large_files: true
101+
large_files: true

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565

6666
* New highly anticipated feature X added to Python SDK ([#X](https://github.com/apache/beam/issues/X)).
6767
* New highly anticipated feature Y added to Java SDK ([#Y](https://github.com/apache/beam/issues/Y)).
68+
* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.
6869

6970
## I/Os
7071

@@ -85,6 +86,7 @@
8586

8687
## Breaking Changes
8788

89+
* [Python] Prism runner now enabled by default for most Python pipelines using the direct runner ([#34612](https://github.com/apache/beam/pull/34612)). This may break some tests, see https://github.com/apache/beam/pull/34612 for details on how to handle issues.
8890
* X behavior was changed ([#X](https://github.com/apache/beam/issues/X)).
8991
* Go: The pubsubio.Read transform now accepts ReadOptions as a value type instead of a pointer, and requires exactly one of Topic or Subscription to be set (they are mutually exclusive). Additionally, the ReadOptions struct now includes a Topic field for specifying the topic directly, replacing the previous topic parameter in the Read function signature ([#35369])(https://github.com/apache/beam/pull/35369).
9092
* SQL: The `ParquetTable` external table provider has changed its handling of the `LOCATION` property. To read from a directory, the path must now end with a trailing slash (e.g., `LOCATION '/path/to/data/'`). Previously, a trailing slash was not required. This change was made to enable support for glob patterns and single-file paths ([#35582])(https://github.com/apache/beam/pull/35582).

sdks/python/apache_beam/coders/coders_test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ def test_numpy_int(self):
267267
# this type is not supported as the key
268268
import numpy as np
269269

270-
with self.assertRaises(TypeError):
270+
with self.assertRaisesRegex(Exception, "Unable to deterministically"):
271271
with TestPipeline() as p:
272272
indata = p | "Create" >> beam.Create([(a, int(a))
273273
for a in np.arange(3)])

sdks/python/apache_beam/examples/snippets/snippets_test.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,8 @@ def test_bad_types(self):
307307
# When running this pipeline, you'd get a runtime error,
308308
# possibly on a remote machine, possibly very late.
309309

310-
with self.assertRaises(TypeError):
311-
p.run()
310+
with self.assertRaisesRegex(Exception, "not all arguments converted"):
311+
p.run().wait_until_finish()
312312

313313
# To catch this early, we can assert what types we expect.
314314
with self.assertRaises(typehints.TypeCheckError):
@@ -372,8 +372,8 @@ def process(self, element):
372372
# When running this pipeline, you'd get a runtime error,
373373
# possibly on a remote machine, possibly very late.
374374

375-
with self.assertRaises(TypeError):
376-
p.run()
375+
with self.assertRaisesRegex(Exception, "not all arguments converted"):
376+
p.run().wait_until_finish()
377377

378378
# To catch this early, we can annotate process() with the expected types.
379379
# Beam will then use these as type hints and perform type checking before
@@ -439,12 +439,13 @@ def test_runtime_checks_off(self):
439439

440440
def test_runtime_checks_on(self):
441441
# pylint: disable=expression-not-assigned
442-
with self.assertRaises(typehints.TypeCheckError):
442+
with self.assertRaisesRegex(Exception, "According to type-hint"):
443443
# [START type_hints_runtime_on]
444444
p = TestPipeline(options=PipelineOptions(runtime_type_check=True))
445445
p | beam.Create(['a']) | beam.Map(lambda x: 3).with_output_types(str)
446-
p.run()
446+
result = p.run()
447447
# [END type_hints_runtime_on]
448+
result.wait_until_finish()
448449

449450
def test_deterministic_key(self):
450451
with TestPipeline() as p:

sdks/python/apache_beam/examples/snippets/transforms/elementwise/enrichment_test.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,28 +60,52 @@ def validate_enrichment_with_vertex_ai_legacy():
6060
return expected
6161

6262

63+
def std_out_to_dict(stdout_lines, row_key):
64+
output_dict = {}
65+
for stdout_line in stdout_lines:
66+
# parse the stdout in a dictionary format so that it can be
67+
# evaluated/compared as one. This allows us to compare without
68+
# considering the order of the stdout or the order that the fields of the
69+
# row are arranged in.
70+
fmtd = '{\"' + stdout_line[4:-1].replace('=', '\": ').replace(
71+
', ', ', \"').replace('\"\'', '\'') + "}"
72+
stdout_dict = eval(fmtd) # pylint: disable=eval-used
73+
output_dict[stdout_dict[row_key]] = stdout_dict
74+
return output_dict
75+
76+
6377
@mock.patch('sys.stdout', new_callable=StringIO)
6478
class EnrichmentTest(unittest.TestCase):
6579
def test_enrichment_with_bigtable(self, mock_stdout):
6680
enrichment_with_bigtable()
6781
output = mock_stdout.getvalue().splitlines()
6882
expected = validate_enrichment_with_bigtable()
69-
self.assertEqual(output, expected)
83+
84+
self.assertEqual(len(output), len(expected))
85+
self.assertEqual(
86+
std_out_to_dict(output, 'sale_id'),
87+
std_out_to_dict(expected, 'sale_id'))
7088

7189
def test_enrichment_with_vertex_ai(self, mock_stdout):
7290
enrichment_with_vertex_ai()
7391
output = mock_stdout.getvalue().splitlines()
7492
expected = validate_enrichment_with_vertex_ai()
7593

76-
for i in range(len(expected)):
77-
self.assertEqual(set(output[i].split(',')), set(expected[i].split(',')))
94+
self.assertEqual(len(output), len(expected))
95+
self.assertEqual(
96+
std_out_to_dict(output, 'user_id'),
97+
std_out_to_dict(expected, 'user_id'))
7898

7999
def test_enrichment_with_vertex_ai_legacy(self, mock_stdout):
80100
enrichment_with_vertex_ai_legacy()
81101
output = mock_stdout.getvalue().splitlines()
82102
expected = validate_enrichment_with_vertex_ai_legacy()
83103
self.maxDiff = None
84-
self.assertEqual(output, expected)
104+
105+
self.assertEqual(len(output), len(expected))
106+
self.assertEqual(
107+
std_out_to_dict(output, 'entity_id'),
108+
std_out_to_dict(expected, 'entity_id'))
85109

86110

87111
if __name__ == '__main__':

0 commit comments

Comments
 (0)