Skip to content

Reorganize logging jars [was: experimental: spark with delta] #556

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 5 commits into from

Conversation

razvan
Copy link
Member

@razvan razvan commented Feb 9, 2024

Description

Part of stackabletech/spark-k8s-operator#354

Definition of Done Checklist

  • Not all of these items are applicable to all PRs, the author should update this template to only leave the boxes in that are relevant
  • Please make sure all these things are done and tick the boxes
- [ ] Changes are OpenShift compatible
- [ ] All added packages (via microdnf or otherwise) have a comment on why they are added
- [ ] Things not downloaded from Red Hat repositories should be mirrored in the Stackable repository and downloaded from there
- [ ] All packages should have (if available) signatures/hashes verified
- [ ] Does your change affect an SBOM? Make sure to update all SBOMs

@ruslanguns
Copy link

Thank you very much for your help with this image, I confirm that it works perfectly.

In my case, in my proof of concept I needed to adjust the configurations so Spark is able to write to S3, however because by default it recognizes that the file system does not support mutual exclusion, plus certain limitations with concurrent writes, I had to explicitly add a couple of settings to the LogStore and the implementation type, this made me no longer throw any errors as before.

I admit that it took me some work to find the solution as this is not explained in the Delta Lake documentation anywhere, at least not in a clear and direct way for me.

So I attach the final yaml file that I used which includes the small adjustments to make it work.... as you can see I am using the reference connection to that points to a S3Bucket resource, and this works great.

For the rest, just a comment, if you plan to leave a version that is compatible with delta lake officially, I think it may make sense to install by default PIP dependencies especially for those who want to work with pyspark but for Scalar I think it would work out of the box.

Thanks again!

apiVersion: spark.stackable.tech/v1alpha1
kind: SparkApplication
metadata:
  name: spark-delta-test-3
  namespace: infrastructure
spec:
  version: "1.0"
  sparkImage:
    custom: docker.stackable.tech/sandbox/spark-k8s:3.5.0-stackable0.0.0-delta3.1.0
    productVersion: "3.5.0"
  mode: cluster
  mainApplicationFile: local:///stackable/spark/jobs/spark_test.py
  s3connection:
    reference: spark-s3-bucket
  deps:
    requirements:
    - importlib-metadata
    - delta-spark==3.1.0
  sparkConf:
    spark.hadoop.fs.s3a.aws.credentials.provider: org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
  logFileDirectory:
    s3:
      prefix: sparklogs/
      bucket:
        reference: spark-s3-bucket
  driver:
    config:
      volumeMounts:
      - mountPath: /stackable/spark/jobs
        name: script
  executor:
    replicas: 1
    config:
      volumeMounts:
      - mountPath: /stackable/spark/jobs
        name: script
  volumes:
  - name: script
    configMap:
      name: spark-delta-test-script
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: spark-delta-test-script
  namespace: infrastructure
data:
  spark_test.py: |
    from datetime import datetime, date
    from pyspark.sql import Row, SparkSession
    from delta import *

    def main():
        builder = SparkSession.builder.appName("MyApp") \
            .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
            .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
            .config("spark.delta.logStore.class", "org.apache.spark.sql.delta.storage.S3SingleDriverLogStore") \
            .config("spark.hadoop.delta.enableFastS3AListFrom", "true")

        spark = configure_spark_with_delta_pip(builder).getOrCreate()

        df = spark.createDataFrame([
            Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
            Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
            Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
        ])

        location = "s3a://lab-dataplatform-heidelberg-production/lab/spark-delta-test"

        df.write.format("delta").mode("overwrite").save(location)

    if __name__ == "__main__":
        main()

@razvan
Copy link
Member Author

razvan commented Feb 12, 2024

For a deeper analysis of the root cause see the followup issue: stackabletech/spark-k8s-operator#354

@razvan razvan changed the title experimental: spark with delta Reorganize logging jars [was: experimental: spark with delta] Feb 12, 2024
@razvan
Copy link
Member Author

razvan commented Feb 13, 2024

Closing as an operator bugfix makes it unnecessary.

@razvan razvan closed this Feb 13, 2024
@razvan razvan deleted the feat/spark-delta branch July 5, 2024 10:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants