-
-
Notifications
You must be signed in to change notification settings - Fork 7
Reorganize logging jars [was: experimental: spark with delta] #556
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Thank you very much for your help with this image, I confirm that it works perfectly. In my case, in my proof of concept I needed to adjust the configurations so Spark is able to write to S3, however because by default it recognizes that the file system does not support mutual exclusion, plus certain limitations with concurrent writes, I had to explicitly add a couple of settings to the LogStore and the implementation type, this made me no longer throw any errors as before.
So I attach the final yaml file that I used which includes the small adjustments to make it work.... as you can see I am using the reference connection to that points to a S3Bucket resource, and this works great. For the rest, just a comment, if you plan to leave a version that is compatible with delta lake officially, I think it may make sense to install by default PIP dependencies especially for those who want to work with pyspark but for Scalar I think it would work out of the box. Thanks again! apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
name: spark-delta-test-3
namespace: infrastructure
spec:
version: "1.0"
sparkImage:
custom: docker.stackable.tech/sandbox/spark-k8s:3.5.0-stackable0.0.0-delta3.1.0
productVersion: "3.5.0"
mode: cluster
mainApplicationFile: local:///stackable/spark/jobs/spark_test.py
s3connection:
reference: spark-s3-bucket
deps:
requirements:
- importlib-metadata
- delta-spark==3.1.0
sparkConf:
spark.hadoop.fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
logFileDirectory:
s3:
prefix: sparklogs/
bucket:
reference: spark-s3-bucket
driver:
config:
volumeMounts:
- mountPath: /stackable/spark/jobs
name: script
executor:
replicas: 1
config:
volumeMounts:
- mountPath: /stackable/spark/jobs
name: script
volumes:
- name: script
configMap:
name: spark-delta-test-script
---
apiVersion: v1
kind: ConfigMap
metadata:
name: spark-delta-test-script
namespace: infrastructure
data:
spark_test.py: |
from datetime import datetime, date
from pyspark.sql import Row, SparkSession
from delta import *
def main():
builder = SparkSession.builder.appName("MyApp") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
.config("spark.hadoop.delta.enableFastS3AListFrom", "true")
spark = configure_spark_with_delta_pip(builder).getOrCreate()
df = spark.createDataFrame([
Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])
location = "s3a://lab-dataplatform-heidelberg-production/lab/spark-delta-test"
df.write.format("delta").mode("overwrite").save(location)
if __name__ == "__main__":
main() |
For a deeper analysis of the root cause see the followup issue: stackabletech/spark-k8s-operator#354 |
Closing as an operator bugfix makes it unnecessary. |
Description
Part of stackabletech/spark-k8s-operator#354
Definition of Done Checklist