Skip to content

Conversation

@devin-ai-integration
Copy link

@devin-ai-integration devin-ai-integration bot commented Dec 3, 2025

Summary

Updates the ETL pipeline to output data in Parquet format instead of CSV. The changes include:

  • Parquet format with Snappy compression: Replaced CSV output with Parquet using Snappy compression via pyarrow
  • Partitioning by region: Data is now partitioned using Hive-style partitioning (region=<value>/)
  • Timestamp-based file naming: Files are named with the current timestamp (YYYYMMDD_HHMMSS.parquet)

The new S3 path structure will be: auto_oem/etl/vehicle_sales_deduped/region=West/20241203_150000.parquet

Updates since last revision

Added comprehensive test suite (tests/test_load_data_to_s3.py) with 7 test cases verifying:

  • Valid Parquet format output
  • Snappy compression is applied to all columns
  • Correct region partitioning with Hive-style paths
  • Each partition contains only data for that region
  • Timestamp filename format (YYYYMMDD_HHMMSS.parquet)
  • All input rows are preserved across partitions
  • S3 client configuration

Run tests locally with: pytest tests/test_load_data_to_s3.py -v

Review & Testing Checklist for Human

  • Run the pipeline end-to-end with real database and S3 credentials - unit tests mock S3 and don't verify actual upload behavior
  • Investigate Snyk security check failure - determine if it's related to the new pyarrow dependency or pre-existing
  • Verify downstream consumers are updated to handle the new S3 path structure (changed from single CSV file to partitioned Parquet directories)
  • Confirm pyarrow installs correctly in the Docker build environment

Recommended test plan: Run docker-compose up -d postgres then python main.py with valid AWS credentials, and verify the S3 bucket contains the expected partitioned Parquet files.

Notes

  • The old df_to_s3 function was replaced with df_to_s3_parquet - this is a breaking change if any other code references the old function
  • The code assumes the DataFrame contains a 'region' column (this comes from the dealerships table in the SQL query)
  • Added pytest to requirements.txt for running tests

Link to Devin run: https://app.devin.ai/sessions/7ed04133f4b0417497cab5462536443c
Requested by: Abhay Aggarwal (@abhay-codeium)

- Update load_data_to_s3.py to output Parquet format with Snappy compression
- Partition output by region using Hive-style partitioning (region=<value>)
- Name files using current timestamp (YYYYMMDD_HHMMSS.parquet)
- Add pyarrow dependency to requirements.txt
- Update main.py to use new df_to_s3_parquet function

Co-Authored-By: Abhay Aggarwal <abhay.aggarwal@codeium.com>
@devin-ai-integration
Copy link
Author

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@abhay-codeium
Copy link

Can you add testing so we can verify that the outputs are correct?

…gion partitioning

- Add pytest to requirements.txt
- Create tests/test_load_data_to_s3.py with 7 test cases:
  - test_parquet_format_output: Verifies output is valid Parquet format
  - test_snappy_compression: Verifies Snappy compression is applied
  - test_region_partitioning: Verifies data is partitioned by region
  - test_partition_data_correctness: Verifies each partition contains correct data
  - test_timestamp_filename: Verifies filename uses YYYYMMDD_HHMMSS format
  - test_all_data_preserved: Verifies all rows are preserved across partitions
  - test_connect_to_s3_returns_client: Verifies S3 connection function

Co-Authored-By: Abhay Aggarwal <abhay.aggarwal@codeium.com>
@devin-ai-integration
Copy link
Author

Added comprehensive tests to verify the Parquet output. The test suite includes 7 test cases that verify:

  1. Parquet format: Output files are valid Parquet format
  2. Snappy compression: All columns use Snappy compression
  3. Region partitioning: Data is correctly partitioned by region with Hive-style paths (region=<value>/)
  4. Partition data correctness: Each partition contains only data for that region
  5. Timestamp filename: Filenames use the expected YYYYMMDD_HHMMSS.parquet format
  6. Data preservation: All rows from input are preserved across partitions
  7. S3 connection: S3 client is properly configured

All tests pass locally. You can run them with: pytest tests/test_load_data_to_s3.py -v

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants