Skip to content

Conversation

sushmullur
Copy link

Summary

Adds support for Beam's EmptyMatchTreatment parameter in SMB read
operations, allowing pipelines to handle empty directories gracefully
instead of failing.

Changes

  • Added withEmptyMatchTreatment() method to AvroSortedBucketIO.Read
    and ParquetTypeSortedBucketIO.Read
  • Updated SortedBucketSource to pass through the parameter to
    FileSystems.match() calls
  • Maintains full backward compatibility (defaults to DISALLOW behavior)
  • Added tests

Usage Example

import org.apache.beam.sdk.io.fs.EmptyMatchTreatment

val read = AvroSortedBucketIO
  .read[MyRecord](new TupleTag[MyRecord]("input"))
  .from("/path/to/potentially/empty/directory")
  .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)

Testing

  • All existing tests pass
  • Added new test cases for the EmptyMatchTreatment functionality
  • Verified backward compatibility with existing code

Closes #5759

Copy link
Contributor

@clairemcginty clairemcginty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, thanks for the PR! :)

I think this will need a few more changes to get it working end-to-end - it's more involved than it seems since the scio-smb implementation bypasses a lot of the usual Beam filesystem abstractions. Off the top of my head:

  • BucketedInput#getOrSampleByteSize would need to be set up to handle empty input
  • MultiSourceKeyGroupReader would need to be updated to skip empty directories when instantiating bucketedInputs

Adding tests in SortedBucketSourceTest/SortedBucketTransformTest should reveal these issues (and anything else I haven't thought of). Let me know if you need more pointers!

@sushmullur sushmullur force-pushed the feature/smb-empty-match-treatment branch 3 times, most recently from d27f5e8 to 0e63b5d Compare September 20, 2025 21:20
This change adds comprehensive EmptyMatchTreatment support for Sort Merge Bucket (SMB) operations:

API-level changes:
- Add EmptyMatchTreatment parameter to all SMB IO read operations (JSON, TensorFlow, ParquetAvro, ParquetType)
- Update BucketedInput.of() methods to accept EmptyMatchTreatment parameter
- Maintain backward compatibility with existing code

End-to-end implementation:
- Fix BucketedInput#getOrSampleByteSize to handle empty directories gracefully when EmptyMatchTreatment.ALLOW is used
- Enhance MultiSourceKeyGroupReader to filter out sources without valid metadata, preventing failures with empty directories
- Add comprehensive tests for both API and integration functionality

This addresses the SMB filesystem abstraction bypasses by ensuring EmptyMatchTreatment works throughout the entire SMB pipeline.
@sushmullur sushmullur force-pushed the feature/smb-empty-match-treatment branch from 0e63b5d to c408578 Compare September 20, 2025 21:21
@sushmullur
Copy link
Author

Thanks for catching that! Made the changes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support EmptyMatchTreatment param for SMB reads

2 participants