-
Notifications
You must be signed in to change notification settings - Fork 0
feat: Added s3 support for mediaWiki ETL! #44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughThe changes introduce S3-compatible storage integration using MinIO for the MediaWiki ETL workflow. Configuration templates and Docker Compose are updated to support MinIO, and a new S3 client is added for storing and retrieving intermediate data. The ETL activities and workflow are modified to persist and load data via S3, and new dependencies for AWS SDK libraries are included. Changes
Sequence Diagram(s)sequenceDiagram
participant Extractor as extract_mediawiki
participant S3 as S3Client
participant Transformer as transform_mediawiki_data
participant Loader as load_mediawiki_data
Extractor->>S3: store_extracted_data()
Transformer->>S3: store_transformed_data()
Transformer->>Extractor: return transformed_data_key
Loader->>S3: get_data_by_key(transformed_data_key)
S3-->>Loader: Return transformed data
Loader->>Loader: Load data into target
Poem
Note ⚡️ AI Code Reviews for VS Code, Cursor, WindsurfCodeRabbit now has a plugin for VS Code, Cursor and Windsurf. This brings AI code reviews directly in the code editor. Each commit is reviewed immediately, finding bugs before the PR is raised. Seamless context handoff to your AI code agent ensures that you can easily incorporate review feedback. Note ⚡️ Faster reviews with cachingCodeRabbit now supports caching for code and dependencies, helping speed up reviews. This means quicker feedback, reduced wait times, and a smoother review experience overall. Cached data is encrypted and stored securely. This feature will be automatically enabled for all accounts on May 16th. To opt out, configure ✨ Finishing Touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🔭 Outside diff range comments (1)
hivemind_etl/mediawiki/workflows.py (1)
61-82: 💡 Verification agent🧩 Analysis chain
Verify error handling for S3 operations.
While the code structure looks good, S3 operations can fail for various reasons (network issues, permissions, etc.). The workflow has retry policies, but you might want to consider specific error handling for S3-related failures.
🏁 Script executed:
#!/bin/bash # Check if there are specific S3 error handling mechanisms in the activities echo "Checking S3 error handling in activities.py..." rg -A 3 -B 3 "except.*boto|S3|AWS" hivemind_etl/mediawiki/activities.py echo "Checking for S3 exception types in S3 client implementation..." rg -A 2 "except" hivemind_etl/storage/s3_client.pyLength of output: 2692
Add Comprehensive S3 Failure Handling
While we already catch specific
ClientErrorcodes (404 bucket-not-found,NoSuchKey) inS3Client, there are other common S3/AWS failure modes that should be handled or surfaced to Temporal so your retry policies can take effect:• hivemind_etl/storage/s3_client.py
– Instore_transformed_dataandget_data_by_key, add catches for:
• AccessDenied (403)
• Throttling (ThrottlingorThrottlingException)
• Internal errors (500,ServiceUnavailable)
• Endpoint/network errors (EndpointConnectionError)
– For retryable errors, either let the exception bubble up (so the activity fails and Temporal retries) or implement a backoff/retry loop.
– For non-recoverable errors, log with clear messages and fail fast.• hivemind_etl/mediawiki/activities.py
– The broadexcept Exceptionaround your S3 calls can swallow root causes.
– Replace with narrowerexcept ClientError as e:where you log the AWS error code/message and thenraiseto allow Temporal to retry per yourRetryPolicy.
– Only catch and handle truly recoverable errors locally; let everything else propagate.This will ensure you don’t silently drop permissions/network errors, and that Temporal’s retry policies actually trigger on S3 failures.
🧹 Nitpick comments (3)
hivemind_etl/storage/s3_client.py (1)
46-59: Optional: explicitly setuse_sslinstead of overloadingverifyYou already derive
self.securefromAWS_SECURE; consider passinguse_ssl=self.secure, verify=self.secure,to make the intention clear and avoid certificate-validation attempts when using
http://.hivemind_etl/mediawiki/activities.py (1)
112-121: Avoid repeated S3 client instantiation & propagate key downstream
S3Client()is instantiated on every activity; that’s unnecessary overhead and complicates unit
testing. Pass a client instance from the workflow context or makeS3Clienta lightweight
singleton.Example:
s3_client = s3_client or S3Client()Also ensure the returned
transformed_keyis injected into the args forload_mediawiki_data
(otherwisetransformed_data_keywill be missing).docker-compose.dev.yml (1)
21-27: Do not store credentials in source-controlled compose filesHard-coding
AWS_ACCESS_KEY_ID/AWS_SECRET_ACCESS_KEYinside the compose file risks accidental
exposure. It is safer to reference only${...}placeholders and keep defaults in.env, e.g.:environment: - AWS_ENDPOINT_URL=${AWS_ENDPOINT_URL} - AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID} - AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (7)
.env.example(1 hunks).gitignore(1 hunks)docker-compose.dev.yml(2 hunks)hivemind_etl/mediawiki/activities.py(5 hunks)hivemind_etl/mediawiki/workflows.py(3 hunks)hivemind_etl/storage/s3_client.py(1 hunks)requirements.txt(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
hivemind_etl/mediawiki/activities.py (3)
hivemind_etl/storage/s3_client.py (3)
S3Client(13-145)store_transformed_data(94-110)get_data_by_key(112-122)hivemind_etl/website/website_etl.py (1)
transform(71-99)hivemind_etl/mediawiki/etl.py (1)
transform(48-96)
🪛 Ruff (0.11.9)
hivemind_etl/storage/s3_client.py
120-120: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
⏰ Context from checks skipped due to timeout of 90000ms (1)
- GitHub Check: ci / build-push / Build + Push Image
🔇 Additional comments (8)
requirements.txt (1)
12-13: Good addition of AWS SDK dependencies for S3 integration.Adding
boto3andbotocorewith minimum version constraints provides the necessary libraries for S3 integration. This aligns well with the PR's goal of adding S3 support for MediaWiki ETL.Note that
boto3already includesbotocoreas a dependency, so specifying both is technically redundant but can be useful for version pinning..gitignore (1)
168-171: Appropriate additions to .gitignore for S3 storage.The additions properly exclude:
dump_*files - temporary or output dump filesminio_data/- local S3-compatible storage data directorydumps/*- all files in the dumps directoryThese exclusions prevent committing large data files to the repository, which is a good practice.
.env.example (1)
33-38: Well-structured AWS S3 configuration environment variables.The AWS environment variables follow standard naming conventions and include all the necessary parameters for S3 configuration:
- Endpoint URL for custom/MinIO endpoints
- Access credentials
- Bucket name
- Region
- Secure connection flag
This provides a good template for users setting up the application.
hivemind_etl/mediawiki/workflows.py (3)
49-51: Clear documentation of S3 storage implementation.The updated comment correctly indicates that the extraction step now stores data in S3 rather than just extracting it, which helps other developers understand the workflow's data flow.
61-63: Good adaptation of transformation step for S3 integration.The comment and return value handling are updated to reflect that transformed data is now stored in S3, with the activity returning a key reference instead of the actual data objects.
72-74: Proper data flow implementation for S3 integration.The workflow now:
- Stores the transformed data key in the mediawiki_platform dictionary
- Updates the comment to clarify that data is loaded from S3
- Passes the necessary reference information to the load activity
This approach properly decouples the data storage from the workflow logic.
hivemind_etl/storage/s3_client.py (1)
112-123: Preserve original stack-trace when re-raising & correct return typing
raisewithoutfromhides the originating exception context; Ruff flagged this (B904).
Additionally,get_data_by_keycan return either adictor alist(for transformed docs),
so the return type should beAny | None.- raise + raise e from Noneand
-def get_data_by_key(self, key: str) -> Dict[str, Any]: +from typing import Any, Optional + +def get_data_by_key(self, key: str) -> Optional[Any]:Consider updating the docstring accordingly.
[ suggest_essential_refactor ]🧰 Tools
🪛 Ruff (0.11.9)
120-120: Within an
exceptclause, raise exceptions withraise ... from errorraise ... from Noneto distinguish them from errors in exception handling(B904)
docker-compose.dev.yml (1)
132-140: Future-dated MinIO image tag looks suspicious
minio/minio:RELEASE.2025-04-22T22-12-26Zreferences a future release date. Pinning to a
non-existent tag will breakdocker compose pull. Prefer the latest stable or an existing
SHA-pinned tag, e.g.:image: minio/minio:RELEASE.2024-04-19T02-46-22Zor simply
minio/minio:latestfor dev.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
hivemind_etl/storage/s3_client.py (1)
66-71:⚠️ Potential issueFix bucket creation for
us-east-1regionAWS requires omitting
CreateBucketConfigurationwhen the region isus-east-1; passing it causes aInvalidLocationConstraint(400) error. MinIO (used in dev) might ignore this field, but the AWS backend will fail.- self.s3_client.create_bucket( - Bucket=self.bucket_name, - CreateBucketConfiguration={"LocationConstraint": self.region}, - ) + if self.region == "us-east-1": + self.s3_client.create_bucket(Bucket=self.bucket_name) + else: + self.s3_client.create_bucket( + Bucket=self.bucket_name, + CreateBucketConfiguration={"LocationConstraint": self.region}, + )
🧹 Nitpick comments (3)
hivemind_etl/storage/s3_client.py (3)
120-120: Improve exception handling with explicit chainingWhen re-raising exceptions inside an except block, it's best practice to use explicit exception chaining with
raise ... fromto maintain the exception context.- raise ValueError(f"No data found for key: {key}") + raise ValueError(f"No data found for key: {key}") from None🧰 Tools
🪛 Ruff (0.11.9)
120-120: Within an
exceptclause, raise exceptions withraise ... from errorraise ... from Noneto distinguish them from errors in exception handling(B904)
112-123: Missing method to retrieve latest data by prefixThe class lacks a method to retrieve the latest stored data for a given prefix. This functionality would be valuable for the ETL workflow to retrieve the most recently extracted or transformed data.
Would you like me to provide an implementation for a
get_latest_datamethod that correctly retrieves the latest object by timestamp? This method should:
- List all objects with a given prefix
- Sort by last modified date (descending)
- Return the most recent object's data
🧰 Tools
🪛 Ruff (0.11.9)
120-120: Within an
exceptclause, raise exceptions withraise ... from errorraise ... from Noneto distinguish them from errors in exception handling(B904)
13-76: Consider adding documentation for the class and logging successful operationsThe class would benefit from docstring documentation explaining its purpose and usage. Additionally, consider adding logging for successful store operations to maintain consistent logging throughout the class.
class S3Client: + """ + Client for interacting with S3-compatible storage services. + + This class handles operations for storing and retrieving data in an S3 bucket + for the ETL pipeline, managing extracted and transformed data by community ID. + It requires AWS credentials and connection details provided via environment variables. + """ def __init__(self):And for store operations:
) + logging.info(f"Successfully stored extracted data at key: {key}") return key
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (1)
hivemind_etl/storage/s3_client.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.11.9)
hivemind_etl/storage/s3_client.py
120-120: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: ci / test / Test
- GitHub Check: ci / lint / Lint
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🔭 Outside diff range comments (1)
hivemind_etl/storage/s3_client.py (1)
125-125: 🛠️ Refactor suggestionAdd missing
get_latest_datamethodBased on the previous review comments, a
get_latest_datamethod was planned but is missing from this implementation.Here's an implementation that correctly returns the latest data by using the
LastModifiedtimestamp:def get_latest_data(self, community_id: str, activity_type: str) -> Dict[str, Any]: """Get the latest data for a community_id and activity_type.""" prefix = f"{community_id}/{activity_type}/" try: response = self.s3_client.list_objects_v2( Bucket=self.bucket_name, Prefix=prefix, ) if "Contents" not in response or len(response["Contents"]) == 0: logging.error(f"No data found for prefix: {prefix}") return None latest_key = max( response["Contents"], key=lambda obj: obj["LastModified"], )["Key"] logging.info(f"Found latest key: {latest_key}") return self.get_data_by_key(latest_key) except ClientError as e: logging.error(f"Error listing objects with prefix {prefix}: {str(e)}") raise
♻️ Duplicate comments (1)
hivemind_etl/storage/s3_client.py (1)
70-73:⚠️ Potential issueFix bucket creation for
us-east-1regionAWS S3 requires omitting
CreateBucketConfigurationwhen the region isus-east-1. Although the previous review comment was marked as fixed, the code still doesn't handle this special case.- self.s3_client.create_bucket( - Bucket=self.bucket_name, - CreateBucketConfiguration={"LocationConstraint": self.region}, - ) + if self.region == "us-east-1": + self.s3_client.create_bucket(Bucket=self.bucket_name) + else: + self.s3_client.create_bucket( + Bucket=self.bucket_name, + CreateBucketConfiguration={"LocationConstraint": self.region}, + )
🧹 Nitpick comments (1)
hivemind_etl/storage/s3_client.py (1)
122-122: Use exception chaining withraise ... from eTo properly maintain the exception chain for debugging, use
raise ... from esyntax.- raise ValueError(f"No data found for key: {key}") + raise ValueError(f"No data found for key: {key}") from e🧰 Tools
🪛 Ruff (0.11.9)
122-122: Within an
exceptclause, raise exceptions withraise ... from errorraise ... from Noneto distinguish them from errors in exception handling(B904)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Cache: Disabled due to data retention organization setting
Knowledge Base: Disabled due to data retention organization setting
📒 Files selected for processing (1)
hivemind_etl/storage/s3_client.py(1 hunks)
🧰 Additional context used
🪛 Ruff (0.11.9)
hivemind_etl/storage/s3_client.py
122-122: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling
(B904)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: ci / lint / Lint
- GitHub Check: ci / test / Test
Summary by CodeRabbit
New Features
Chores
boto3,botocore)..gitignoreto exclude MinIO data and dump files.