Skip to content

Update script.py #34

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 175 additions & 25 deletions Data Engineering Introduction/script.py
Original file line number Diff line number Diff line change
@@ -1,34 +1,184 @@
from bs4 import BeautifulSoup
import requests
import os
import sys
import csv
import requests
import logging
import boto3
from auth import ACCESS_KEY, SECRET_KEY
from botocore.exceptions import ClientError
from bs4 import BeautifulSoup

# Configure logging for better feedback
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Configuration ---
# It's highly recommended to use environment variables for sensitive credentials
# rather than hardcoding them or importing from a local file that might be committed.
# For example:
# AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID')
# AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')
# If you must use a file for local development, ensure 'auth.py' is in .gitignore.
try:
from auth import ACCESS_KEY, SECRET_KEY
except ImportError:
logging.error("auth.py not found or ACCESS_KEY/SECRET_KEY not defined. "
"Please ensure your AWS credentials are set as environment variables "
"or defined in auth.py (and auth.py is in .gitignore).")
# Fallback to environment variables if auth.py is not available
ACCESS_KEY = os.getenv('AWS_ACCESS_KEY_ID')
SECRET_KEY = os.getenv('AWS_SECRET_ACCESS_KEY')

# Define constants for better readability and easier modification
SOURCE_URL = 'http://127.0.0.1:5500/'
CSV_FILE_NAME = 'data.csv'
S3_BUCKET_NAME = 'aws-data-engineering-csv-etl-pipeline-demo'
S3_OBJECT_KEY = 'data.csv' # The name of the file in the S3 bucket

def fetch_and_parse_data(url: str) -> list[dict]:
"""
Fetches web content from the given URL and parses it to extract article data.

Args:
url (str): The URL of the web page to scrape.

Returns:
list[dict]: A list of dictionaries, where each dictionary represents an article
with 'headline', 'summary', and 'link'. Returns an empty list on failure.
"""
logging.info(f"Attempting to fetch data from: {url}")
try:
response = requests.get(url, timeout=10) # Add a timeout for robustness
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
except requests.exceptions.Timeout:
logging.error(f"Request timed out when fetching from {url}")
return []
except requests.exceptions.RequestException as e:
logging.error(f"Error fetching data from {url}: {e}")
return []

soup = BeautifulSoup(response.text, 'lxml')
articles_data = []

articles = soup.find_all('article')
if not articles:
logging.warning("No 'article' tags found on the page. Check HTML structure or URL.")
return []

for i, article in enumerate(articles):
try:
headline_tag = article.h2.a
summary_tag = article.p

headline = headline_tag.text.strip() if headline_tag else 'N/A'
summary = summary_tag.text.strip() if summary_tag else 'N/A'
link = headline_tag["href"].strip() if headline_tag and "href" in headline_tag.attrs else 'N/A'

articles_data.append({
'headline': headline,
'summary': summary,
'link': link
})
except AttributeError as e:
logging.warning(f"Could not parse article {i+1} due to missing elements: {e}")
continue # Skip to the next article if elements are missing

logging.info(f"Successfully extracted {len(articles_data)} articles.")
return articles_data

def write_data_to_csv(data: list[dict], file_name: str):
"""
Writes a list of dictionaries to a CSV file.

Args:
data (list[dict]): The data to write, where each dict is a row.
file_name (str): The name of the CSV file to create/overwrite.
"""
if not data:
logging.warning("No data to write to CSV. Skipping CSV creation.")
return

logging.info(f"Writing data to CSV file: {file_name}")
try:
with open(file_name, 'w', newline='', encoding='utf-8') as csv_file:
fieldnames = ['headline', 'summary', 'link'] # Define headers explicitly
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)

writer.writeheader()
writer.writerows(data)
logging.info(f"Data successfully written to {file_name}")
except IOError as e:
logging.error(f"Error writing to CSV file {file_name}: {e}")

def upload_to_s3(file_path: str, bucket_name: str, object_key: str, access_key: str, secret_key: str):
"""
Uploads a file to an AWS S3 bucket.

Args:
file_path (str): The path to the file to upload.
bucket_name (str): The name of the S3 bucket.
object_key (str): The key (path) for the object in the S3 bucket.
access_key (str): AWS Access Key ID.
secret_key (str): AWS Secret Access Key.
"""
if not os.path.exists(file_path):
logging.error(f"File not found at {file_path}. Cannot upload to S3.")
return

if not access_key or not secret_key:
logging.error("AWS credentials not provided. Cannot upload to S3.")
return

logging.info(f"Attempting to upload {file_path} to S3 bucket '{bucket_name}' as '{object_key}'")
try:
s3_client = boto3.client(
's3',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
)

# Check if bucket exists, create if not (handle potential race conditions or permissions)
try:
s3_client.head_bucket(Bucket=bucket_name)
logging.info(f"Bucket '{bucket_name}' already exists.")
except ClientError as e:
error_code = int(e.response['Error']['Code'])
if error_code == 404:
logging.info(f"Bucket '{bucket_name}' does not exist. Creating it now.")
s3_client.create_bucket(Bucket=bucket_name)
logging.info(f"Bucket '{bucket_name}' created successfully.")
else:
logging.error(f"Error checking or creating bucket '{bucket_name}': {e}")
return

# Get data from the site and store in CSV file
soup = BeautifulSoup(requests.get('http://127.0.0.1:5500/').text, 'lxml')
csv_file = open('data.csv', 'w', newline='')
writer = csv.writer(csv_file)
with open(file_path, "rb") as f:
s3_client.upload_fileobj(f, bucket_name, object_key)
logging.info('File uploaded to S3 successfully!')
except ClientError as e:
logging.error(f"AWS S3 client error: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during S3 upload: {e}")

articles = soup.find_all('article')
writer.writerow(['header', 'summary', 'link'])
def main():
"""
Main function to orchestrate the web scraping, CSV writing, and S3 upload process.
"""
logging.info("Starting ETL pipeline...")

for article in articles:
headline = article.h2.a.text
summary = article.p.text
link = article.h2.a["href"]
writer.writerow([headline, summary, link])
# 1. Fetch and parse data
articles_data = fetch_and_parse_data(SOURCE_URL)
if not articles_data:
logging.error("No data extracted. Exiting.")
sys.exit(1) # Exit with an error code

csv_file.close()
# 2. Write data to CSV
write_data_to_csv(articles_data, CSV_FILE_NAME)
if not os.path.exists(CSV_FILE_NAME):
logging.error("CSV file was not created. Exiting.")
sys.exit(1)

# 3. Upload to AWS S3
upload_to_s3(CSV_FILE_NAME, S3_BUCKET_NAME, S3_OBJECT_KEY, ACCESS_KEY, SECRET_KEY)

# Store to AWS S3 bucket
client = boto3.client(
's3',
aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY,
)
logging.info('ETL Task Completed Sucessfully!')

client.create_bucket(Bucket='aws-data-engineering-csv-etl-pipeline-demo')
with open("data.csv", "rb") as f:
client.upload_fileobj(f, "aws-data-engineering-csv-etl-pipeline-demo", "data.csv")
print('Task Completed Sucessfully!')
if __name__ == '__main__':
main()