Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions spark-sample-apps/hive-connection-test/cr/s3_secrets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@ stringData:
AWS_ACCESS_KEY_ID: {"minioaccesskey"}
AWS_SECRET_ACCESS_KEY: {miniosecretkey}
S3_ENDPOINT_URL: {minio url}
BUCKET_NAME: {bucket name}

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ spec:
"spark.kubernetes.submission.requestTimeout": "60000000"
"spark.kubernetes.driver.connectionTimeout": "60000000"
"spark.kubernetes.driver.requestTimeout": "60000000"
"spark.sql.warehouse.dir": "s3a://hive/warehouse"
"spark.sql.hive.metastore.version": "3.1.3"
"spark.sql.hive.metastore.jars": "path"
"spark.sql.hive.metastore.jars.path": "/opt/spark/hivejars/*"
Expand Down Expand Up @@ -85,6 +84,13 @@ spec:
secretKeyRef:
name: s3-secrets
key: S3_ENDPOINT_URL
- name: BUCKET_NAME
valueFrom:
secretKeyRef:
name: s3-secrets
key: BUCKET_NAME
- name: S3_PREFIX
value: "warehouse/mysparkdb2.db/"
securityContext:
seccompProfile:
type: RuntimeDefault
Expand All @@ -107,6 +113,13 @@ spec:
secretKeyRef:
name: s3-secrets
key: AWS_SECRET_ACCESS_KEY
- name: BUCKET_NAME
valueFrom:
secretKeyRef:
name: s3-secrets
key: BUCKET_NAME
- name: DB_NAME
value: "mysparkdb2"
executor:
cores: 1
instances: 1
Expand Down Expand Up @@ -135,4 +148,10 @@ spec:
secretKeyRef:
name: s3-secrets
key: AWS_SECRET_ACCESS_KEY

- name: BUCKET_NAME
valueFrom:
secretKeyRef:
name: s3-secrets
key: BUCKET_NAME
- name: DB_NAME
value: "mysparkdb2"
12 changes: 7 additions & 5 deletions spark-sample-apps/hive-connection-test/delete_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ def main():
aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
s3_endpoint_url = os.getenv("S3_ENDPOINT_URL")
bucket_name = os.getenv("BUCKET_NAME")
prefix = os.getenv("S3_PREFIX", "warehouse/mysparkdb2.db/")

if not all([aws_access_key, aws_secret_key, s3_endpoint_url, bucket_name]):
print("Missing required environment variables for S3 connection.")
exit(1)

s3_w = boto3.client(
"s3",
Expand All @@ -16,10 +22,6 @@ def main():
aws_secret_access_key=aws_secret_key,
)

# Define the S3 path
bucket_name = "hive"
prefix = "warehouse/mysparkdb2.db/"

# Delete S3 files one by one (avoids Content-MD5 error)
response = s3_w.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
if "Contents" in response:
Expand All @@ -32,4 +34,4 @@ def main():


if __name__ == "__main__":
main()
main()
18 changes: 14 additions & 4 deletions spark-sample-apps/hive-connection-test/spark_hive_test.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
from pyspark.sql import SparkSession

import os

def main():
# 1. Read the BUCKET_NAME environment variable
bucket_name = os.getenv("BUCKET_NAME")
database_name = os.getenv("DB_NAME")
if not bucket_name:
raise ValueError("BUCKET_NAME environment variable not set")

spark = (
SparkSession.builder.appName("Spark-hive-test")
.config("spark.sql.warehouse.dir", f"s3a://{bucket_name}/warehouse/")
.enableHiveSupport()
.getOrCreate()
)

database_name = "mysparkdb2"
table_name = "mytable20"

# Ensure the database exists before dropping it
Expand All @@ -24,13 +30,17 @@ def main():
# Create the database and table again
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")

# 2. Use the environment variable to construct the S3 LOCATION path
table_location = f"s3a://{bucket_name}/warehouse/{database_name}.db/{table_name}"
print(f"Creating table at location: {table_location}")

spark.sql(
f"""
CREATE TABLE IF NOT EXISTS {database_name}.{table_name} (
id INT,
name STRING
) USING PARQUET
LOCATION 's3a://hive/warehouse/{database_name}.db/{table_name}'
LOCATION '{table_location}'
"""
)

Expand All @@ -48,6 +58,6 @@ def main():

spark.stop()


if __name__ == "__main__":
main()