Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,11 @@ POSTGRES_PWD=
POSTGRES_SEEDS=
POSTGRES_USER=

PROXY_URL=
PROXY_URL=

AWS_ENDPOINT_URL=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_S3_BUCKET=
AWS_REGION=
AWS_SECURE=
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,4 +165,7 @@ main.ipynb

*.xml

dump_*
dump_*

minio_data/
dumps/*
41 changes: 41 additions & 0 deletions docker-compose.dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,17 @@ services:
condition: service_healthy
redis:
condition: service_healthy
minio:
condition: service_healthy
networks:
- temporal-network
environment:
- AWS_ENDPOINT_URL=http://minio:9000
- AWS_ACCESS_KEY_ID=minioadmin
- AWS_SECRET_ACCESS_KEY=minioadmin
- AWS_S3_BUCKET=hivemind-etl
- AWS_REGION=us-east-1
- AWS_SECURE=false

temporal:
image: temporalio/auto-setup:1.25.2.0
Expand Down Expand Up @@ -120,6 +129,38 @@ services:
networks:
- temporal-network

minio:
image: minio/minio:RELEASE.2025-04-22T22-12-26Z
ports:
- "9000:9000" # API
- "9001:9001" # Console
environment:
MINIO_ROOT_USER: ${AWS_ACCESS_KEY_ID:-minioadmin}
MINIO_ROOT_PASSWORD: ${AWS_SECRET_ACCESS_KEY:-minioadmin}
volumes:
- ./minio_data:/data
command: server /data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
networks:
- temporal-network

minio-healthcheck:
image: curlimages/curl:8.11.0
entrypoint: ["/bin/sh", "-c", "--", "while true; do sleep 30; done;"]
depends_on:
- minio
healthcheck:
test: ["CMD", "curl", "-f", "http://minio:9000/minio/health/live"]
interval: 10s
timeout: 2s
retries: 5
networks:
- temporal-network

networks:
temporal-network:
driver: bridge
Expand Down
65 changes: 53 additions & 12 deletions hivemind_etl/mediawiki/activities.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
with workflow.unsafe.imports_passed_through():
from hivemind_etl.mediawiki.module import ModulesMediaWiki
from hivemind_etl.mediawiki.etl import MediawikiETL
from hivemind_etl.storage.s3_client import S3Client
from llama_index.core import Document


Expand Down Expand Up @@ -53,7 +54,9 @@ async def get_hivemind_mediawiki_platforms(

@activity.defn
async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
"""Extract data from MediaWiki API URL."""
"""
Extract data from MediaWiki API URL
"""
try:
community_id = mediawiki_platform["community_id"]
api_url = mediawiki_platform["base_url"]
Expand All @@ -69,7 +72,8 @@ async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
platform_id=platform_id,
)
mediawiki_etl.extract(api_url=api_url)
logging.info(f"Completed extraction for community {community_id}")

logging.info(f"Completed extraction for community {community_id}!")
except Exception as e:
community_id = mediawiki_platform["community_id"]
logging.error(f"Error in extraction for community {community_id}: {str(e)}")
Expand All @@ -79,9 +83,20 @@ async def extract_mediawiki(mediawiki_platform: dict[str, Any]) -> None:
@activity.defn
async def transform_mediawiki_data(
mediawiki_platform: dict[str, Any],
) -> list[Document]:
"""Transform the extracted MediaWiki data."""
) -> str:
"""
Transform the extracted MediaWiki data and store in S3.

Parameters
----------
mediawiki_platform : dict[str, Any]
The platform configuration

Returns
-------
str
The S3 key where the transformed data is stored
"""
community_id = mediawiki_platform["community_id"]
platform_id = mediawiki_platform["platform_id"]
try:
Expand All @@ -93,25 +108,51 @@ async def transform_mediawiki_data(
namespaces=namespaces,
platform_id=platform_id,
)
result = mediawiki_etl.transform()
logging.info(f"Completed transformation for community {community_id}")
return result

# Transform data using the extracted data from S3
documents = mediawiki_etl.transform()

s3_client = S3Client()
# Store transformed data in S3
transformed_key = s3_client.store_transformed_data(community_id, documents)

logging.info(
f"Completed transformation for community {community_id} and stored in S3 with key: {transformed_key}"
)
return transformed_key
except Exception as e:
logging.error(f"Error in transformation for community {community_id}: {str(e)}")
raise


@activity.defn
async def load_mediawiki_data(mediawiki_platform: dict[str, Any]) -> None:
"""Load the transformed MediaWiki data into the database."""
async def load_mediawiki_data(
mediawiki_platform: dict[str, Any],
) -> None:
"""
Load the transformed MediaWiki data into the database.

Parameters
----------
mediawiki_platform : dict[str, Any]
The platform configuration
"""
community_id = mediawiki_platform["community_id"]
platform_id = mediawiki_platform["platform_id"]
namespaces = mediawiki_platform["namespaces"]
transformed_data_key = mediawiki_platform["transformed_data_key"]

try:
documents_dict = mediawiki_platform["documents"]
# temporal had converted them to dicts, so we need to convert them back to Document objects
documents = [Document.from_dict(doc) for doc in documents_dict]
# Get transformed data from S3
s3_client = S3Client()
transformed_data = s3_client.get_data_by_key(transformed_data_key)
if not transformed_data:
raise ValueError(
f"No transformed data found in S3 for community {community_id}"
)

# Convert dict data back to Document objects
documents = [Document.from_dict(doc) for doc in transformed_data]

logging.info(f"Starting data load for community {community_id}")
mediawiki_etl = MediawikiETL(
Expand Down
11 changes: 6 additions & 5 deletions hivemind_etl/mediawiki/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ async def run(self, platform_id: str | None = None) -> None:
"namespaces": platform["namespaces"],
"platform_id": platform["platform_id"],
}
# Extract data from MediaWiki

# Extract data from MediaWiki and store in S3
await workflow.execute_activity(
extract_mediawiki,
mediawiki_platform,
Expand All @@ -57,8 +58,8 @@ async def run(self, platform_id: str | None = None) -> None:
),
)

# Transform the extracted data
documents = await workflow.execute_activity(
# Transform the extracted data and store in S3
transformed_data_key = await workflow.execute_activity(
transform_mediawiki_data,
mediawiki_platform,
start_to_close_timeout=timedelta(hours=6),
Expand All @@ -68,8 +69,8 @@ async def run(self, platform_id: str | None = None) -> None:
),
)

mediawiki_platform["documents"] = documents
# Load the transformed data
mediawiki_platform["transformed_data_key"] = transformed_data_key
# Load the transformed data from S3
await workflow.execute_activity(
load_mediawiki_data,
mediawiki_platform,
Expand Down
124 changes: 124 additions & 0 deletions hivemind_etl/storage/s3_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import os
import json
import logging
from datetime import datetime, timezone
from typing import Any, Dict, List

import boto3
from botocore.config import Config
from botocore.exceptions import ClientError
from llama_index.core import Document


class S3Client:
def __init__(self):
# Get AWS S3 environment variables
self.endpoint_url = os.getenv("AWS_ENDPOINT_URL")
self.access_key = os.getenv("AWS_ACCESS_KEY_ID")
self.secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
self.bucket_name = os.getenv("AWS_S3_BUCKET")
self.region = os.getenv("AWS_REGION")
self.secure = os.getenv("AWS_SECURE", "true").lower() == "true"

# Check each required variable and log if missing
missing_vars = []
if not self.endpoint_url:
missing_vars.append("AWS_ENDPOINT_URL")
if not self.access_key:
missing_vars.append("AWS_ACCESS_KEY_ID")
if not self.secret_key:
missing_vars.append("AWS_SECRET_ACCESS_KEY")
if not self.bucket_name:
missing_vars.append("AWS_S3_BUCKET")
if not self.region:
missing_vars.append("AWS_REGION")

if missing_vars:
error_msg = (
f"Missing required environment variables: {', '.join(missing_vars)}"
)
logging.error(error_msg)
raise ValueError(error_msg)

logging.info(
f"Initializing S3 client with endpoint: {self.endpoint_url}, "
f"bucket: {self.bucket_name}, region: {self.region}, secure: {self.secure}"
)

# Configure S3 client
config = Config(
signature_version="s3v4",
region_name=self.region,
)

self.s3_client = boto3.client(
"s3",
endpoint_url=self.endpoint_url,
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
config=config,
verify=self.secure,
)

# Ensure bucket exists
try:
self.s3_client.head_bucket(Bucket=self.bucket_name)
logging.info(f"Successfully connected to bucket: {self.bucket_name}")
except ClientError as e:
if e.response["Error"]["Code"] == "404":
logging.info(f"Creating bucket: {self.bucket_name}")
self.s3_client.create_bucket(
Bucket=self.bucket_name,
CreateBucketConfiguration={"LocationConstraint": self.region},
)
logging.info(f"Successfully created bucket: {self.bucket_name}")
else:
logging.error(f"Error accessing bucket {self.bucket_name}: {str(e)}")
raise

def _get_key(self, community_id: str, activity_type: str, timestamp: str) -> str:
"""Generate a unique S3 key for the data."""
return f"{community_id}/{activity_type}/{timestamp}.json"

def store_extracted_data(self, community_id: str, data: Dict[str, Any]) -> str:
"""Store extracted data in S3."""
timestamp = datetime.now(tz=timezone.utc).isoformat()
key = self._get_key(community_id, "extracted", timestamp)

self.s3_client.put_object(
Bucket=self.bucket_name,
Key=key,
Body=json.dumps(data),
ContentType="application/json",
)
return key

def store_transformed_data(
self, community_id: str, documents: List[Document]
) -> str:
"""Store transformed documents in S3."""
timestamp = datetime.now(tz=timezone.utc).isoformat()
key = self._get_key(community_id, "transformed", timestamp)

# Convert Documents to dict for JSON serialization
docs_data = [doc.to_dict() for doc in documents]

self.s3_client.put_object(
Bucket=self.bucket_name,
Key=key,
Body=json.dumps(docs_data),
ContentType="application/json",
)
return key

def get_data_by_key(self, key: str) -> Dict[str, Any]:
"""Get data from S3 using a specific key."""
try:
obj = self.s3_client.get_object(Bucket=self.bucket_name, Key=key)
return json.loads(obj["Body"].read().decode("utf-8"))
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
logging.error(f"No data found for key: {key}")
raise ValueError(f"No data found for key: {key}")
logging.error(f"Error retrieving data for key {key}: {str(e)}")
raise
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ pydantic==2.9.2
motor>=3.6, <4.0.0
tc-temporal-backend==1.0.0
wikiteam3-fork-proxy==1.0.0
boto3>=1.38.19
botocore>=1.38.19