Skip to content

officiallysingh/spring-boot-starter-spark

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

55 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Spring Boot Starter Spark

Java Scala Spring Boot Apache Spark Maven MongoDB Docker

Spark Spring boot Starter

Introduction

Managing dependencies is a crucial part of any complex project. Handling this manually can be tedious and time-consuming,
leaving less room to focus on other essential aspects of development. This is where Spring Boot starters come into play. These are pre-configured dependency descriptors designed to simplify dependency management.
By including a starter POM in your project, you can effortlessly bring in all the necessary Spring and related technologies,
saving you from the hassle of searching through documentation or copying multiple dependency definitions. Spring Boot offers starters for popular technologies to streamline your development process. But starter for Spark is not available yet because it's recommended to have Spark dependencies in provided scope in your applications,
as they are supposed to be provided by containers where the Jobs are deployed such as Spark Cluster or AWS EMR. But as long as the Spark dependency versions in your application are same as that in container, it does not matter if you have them in compile scope.

The Spring Boot Starter for Spark is a set of convenient dependency descriptors that you can include in your Spring boot application to have all required Spark dependencies and SparkSession and Iceberg Catalog beans auto-configured with spark configurations support in spring boot yml or properties file in your favourite IDE.

Dependency versions

It specifies the following versions:

  • Java 17
  • Spring Boot 3.4.5
  • Spark 3.5.5
  • Scala 2.13.16
<java.version>17</java.version>
<spring-boot.version>3.4.5</spring-boot.version>

<!-- Spark dependencies versions-->
<scala.version>2.13.16</scala.version>
<scala.compact.version>2.13</scala.compact.version>
<spark.version>3.5.5</spark.version>
<spark.compact.version>3.5</spark.compact.version>

Features

  • Bundles spark, Iceberg and Hadoop dependencies compatible with Spring boot 3+.
  • Provides auto-configured SparkSession bean which can be customized in any manner.
  • Provides auto-configured Iceberg Catalog supporting Hadoop, Hive and Nessie and other Catalog types.
  • Exposes all Spark configurations including Catalog configurations as Spring boot environment properties.
  • Enables auto-completion assistance for Spark configuration properties in Spring boot yml and properties files in IDEs such as IntelliJ, Eclipse etc. Find details at spring-configuration-metadata.json IntelliJ Auto Completion

Installation

Current version: 1.3 Refer to Release notes while upgrading.

Define the following properties in pom.xml:

<properties>
    <java.version>17</java.version>
    <spring-boot.version>3.4.5</spring-boot.version>

    <spring-boot-starter-spark.version>1.3</spring-boot-starter-spark.version>
    <!-- The Following two versions must be specified otherwise you will get exception java.lang.ClassNotFoundException: javax.servlet.http.HttpServlet-->
    <jakarta-servlet.version>4.0.3</jakarta-servlet.version>
    <jersey.version>2.36</jersey.version>
</properties>

Important

Spring boot parent pom provides jakarta-servlet.version and jersey.version versions. These must be overridden in your pom as mentioned above, otherwise you will get exception java.lang.ClassNotFoundException: javax.servlet.http.HttpServlet.

Add the following dependency to your pom.xml:

<dependency>
    <groupId>io.github.officiallysingh</groupId>
    <artifactId>spring-boot-starter-spark</artifactId>
    <version>${spring-boot-starter-spark.version}</version>
</dependency>

Note

spring-boot-starter-spark jar contains spark core, spark sql, spark mllib, iceberg and hadoop dependencies. You can exclude any dependency if you don't need it.

See example usage in a Spark Spring could task

Spark Configurations

Any spark properties can be configured in application.yml as follows:

spark:
  master: local[*]
  executor:
    instances: 2
    memory: 2g
    cores: 1
  driver:
    memory: 1g
    cores: 1

or in application.properties as follows:

spark.master=local[*]
spark.executor.instances=2
spark.executor.memory=2g
spark.executor.cores=1
spark.driver.memory=1g
spark.driver.cores=1

Auto-configuration

Spark Session

The following Spring beans are auto-configured but they are conditional and can be customized as elaborated in the next section.
For details refer to SparkAutoConfiguration

  • SparkSession bean is auto-configured, but if you want to override you can define your own SparkSession class bean in your application.
  • SparkConf bean is auto-configured with Spark configuration properties using the standard Spring boot mechanism
    i.e. you can use a variety of external configuration sources including Java properties files, YAML files, environment variables, and command-line arguments. But if you want to override it, you can define your own SparkConf class bean in your application.
  • sprkProperties bean exposes all spark configurations as Spring boot environment properties. All properties in this bean as set in SparkConf bean.
  • SparkSession.Builder provides extension mechanism to customise SparkSession bean creation.

Iceberg Catalog

The following Spring beans are auto-configured but they are conditional and can be overridden by defining these beans in your application code.
For details refer to SparkCatalogAutoConfiguration, It has been tested with following Catalog types, but other Catalog types would also work as long as you have the required dependencies in your classpath and catalog configurations in place.

  • Iceberg Catalog bean is auto-configured based on the catalog type specified in application.yml or application.properties file.
  • CatalogProperties is auto-configured with properties specified in application.yml or application.properties file.

Note

SparkCatalogAutoConfiguration is enabled only when some catalog configuration is present in application.yml or application.properties.

Refer to Apache Hadoop and Hive installation guide for details on how to install Hadoop and Hive.

Customizations

You can customize SparkSession.Builder by defining any number of beans of type SparkSessionBuilderCustomizer in your application.

@Bean
public SparkSessionBuilderCustomizer enableHiveSupportCustomizer() {
    return SparkSession.Builder::enableHiveSupport;
}

you can customize SparkSession by defining any number of beans of type SparkSessionCustomizer in your application. Following is an example to register User Defined Function in SparkSession:

Defining UDF

import static org.apache.spark.sql.functions.callUDF;

import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.stream.Stream;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.functions;

public class UserDefinedFunctions {

    public static final String EXPLODE_DATE_SEQ = "explodeDateSeq";

    static UDF2<LocalDate, LocalDate, List<LocalDate>> explodeDateSeq =
            (start, end) -> {
                long numOfDaysBetween = ChronoUnit.DAYS.between(start, end) + 1;
                return Stream.iterate(start, date -> date.plusDays(1)).limit(numOfDaysBetween).toList();
            };

    public static Column explodeDateSeq(final Column startCol, final Column endCol) {
        return functions.explode(callUDF(UserDefinedFunctions.EXPLODE_DATE_SEQ, startCol, endCol));
    }
}

Registering UDF

import com.ksoot.spark.springframework.boot.autoconfigure.SparkSessionCustomizer;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;

@Component
class SparkSessionUDFCustomizer implements SparkSessionCustomizer, Ordered {

    @Override
    public void customize(final SparkSession sparkSession) {
        sparkSession
                .udf()
                .register(
                        UserDefinedFunctions.EXPLODE_DATE_SEQ,
                        UserDefinedFunctions.explodeDateSeq,
                        DataTypes.createArrayType(DataTypes.DateType));
    }

    @Override
    public int getOrder() {
        return Ordered.HIGHEST_PRECEDENCE;
    }
}

Calling UDF

Dataset<Row> originalDf =
    this.sparkSession.createDataFrame(
            Arrays.asList(
                    Dataframe.of("c1", LocalDate.of(2024, 6, 5), "f105"),
                    Dataframe.of("c1", LocalDate.of(2024, 6, 6), "f106"),
                    Dataframe.of("c1", LocalDate.of(2024, 6, 7), "f107"),
                    Dataframe.of("c1", LocalDate.of(2024, 6, 10), "f110"),
                    Dataframe.of("c2", LocalDate.of(2024, 6, 12), "f212"),
                    Dataframe.of("c2", LocalDate.of(2024, 6, 13), "f213"),
                    Dataframe.of("c2", LocalDate.of(2024, 6, 15), "f215")),
            Dataframe.class);

Dataset<Row> customerMinMaxDateDf =
        originalDf
                .groupBy("customer_id")
                .agg(min("date").as("min_date"), max("date").as("max_date"));

// Generate the expanded dataset
Dataset<Row> customerIdDatesDf =
        customerMinMaxDateDf
                .withColumn(
                        "date",
                        UserDefinedFunctions.explodeDateSeq(
                                customerMinMaxDateDf.col("min_date"), customerMinMaxDateDf.col("max_date")))
                .select("customer_id", "date");

customerIdDatesDf.show();   

Note

To support java 8 datetime in Spark, set property spark.sql.datetime.java8API.enabled as true in application.yml or application.properties

Iceberg Catalog Configuration with Local Hadoop as Data storage

For Spark Iceberg integration demo refer to spring-boot-spark-iceberg.

Following are Iceberg Catalog configurations using Local Hadoop as Data storage.

Hadoop Catalog

Configure Hadoop Catalog as follows. Catalog name is also set to hadoop but it can be any name you want.

spark:
  sql:
    catalog:
      hadoop: org.apache.iceberg.spark.SparkCatalog
      hadoop.type: hadoop
      hadoop.warehouse: ${CATALOG_WAREHOUSE:hdfs://localhost:9000/warehouse}
      hadoop.uri: ${CATALOG_URI:hdfs://localhost:9000}
      hadoop.default-namespace: ${CATALOG_NAMESPACE:ksoot}
      hadoop.io-impl: org.apache.iceberg.hadoop.HadoopFileIO

Hive Catalog

Configure Hive Catalog as follows. Catalog name is also set to hive but it can be any name you want.

spark:
  sql:
    catalog:
      hadoop: org.apache.iceberg.spark.SparkCatalog
      hadoop.type: hadoop
      hadoop.warehouse: ${CATALOG_WAREHOUSE:hdfs://localhost:9000/warehouse}
      hadoop.uri: ${CATALOG_URI:hdfs://localhost:9000}
      hadoop.default-namespace: ${CATALOG_NAMESPACE:ksoot}
      hadoop.io-impl: org.apache.iceberg.hadoop.HadoopFileIO

Nessie Catalog

Configure Nessie Catalog as follows. Catalog name is also set to nessie but it can be any name you want.

spark:
  sql:
    catalog:
      nessie: org.apache.iceberg.spark.SparkCatalog
      nessie.type: nessie
      nessie.warehouse: ${CATALOG_WAREHOUSE:hdfs://localhost:9000/warehouse}
      nessie.uri: ${CATALOG_URI:http://localhost:19120/api/v2}
      nessie.default-namespace: ${CATALOG_NAMESPACE:ksoot}
      nessie.io-impl: org.apache.iceberg.hadoop.HadoopFileIO

Important

AWS S3, Azure Blob Storage and Google Cloud Storage (GCS) are also supported as data storage for Iceberg tables.
You need to have required dependencies in your classpath and configure catalog properties accordingly in application.yml or application.properties file.

Iceberg Catalog Configuration with AWS S3 as Data storage

Along with the catalog configurations, you also need to do following.

  • Configure AWS Cli with your AWS credentials and region.
  • Add following dependencies to your pom.xml:
<properties>
    <spring-cloud-aws.version>3.2.1</spring-cloud-aws.version>
    <awssdk-bundle.version>2.25.70</awssdk-bundle.version>
</properties>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>io.awspring.cloud</groupId>
            <artifactId>spring-cloud-aws-dependencies</artifactId>
            <version>${spring-cloud-aws.version}</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<dependencies>
    <dependency>
        <groupId>io.awspring.cloud</groupId>
        <artifactId>spring-cloud-aws-starter-s3</artifactId>
    </dependency>
    <dependency>
        <groupId>software.amazon.awssdk</groupId>
        <artifactId>bundle</artifactId>
        <version>${awssdk-bundle.version}</version>
    </dependency>
</dependencies>
  • Update $HIVE_HOME/conf/hive-site.xml with following properties. It will take AWS credetials from AWS CLI configuration. Replace {Your AWS Region} with your actual AWS region, e.g. ap-south-1 etc.
<property>
    <name>fs.s3a.aws.credentials.provider</name>
    <value>com.amazonaws.auth.DefaultAWSCredentialsProviderChain</value>
</property>
<property>
    <name>fs.s3a.endpoint</name>
    <value>s3.{Your AWS Region}.amazonaws.com</value>
</property>
<property>
    <name>fs.s3a.impl</name>
    <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
  • Add aws-java-sdk-bundle-1.12.262.jar, hadoop-aws-3.3.4.jar and postgresql-42.7.4.jar to folder $HIVE_HOME/lib. Versions may vary, so make sure to use compatible versions with your setup.
  • Add the following properties to your application.yml or application.properties file:
spring:
  cloud:
    aws:
      credentials:
        access-key: ${AWS_ACCESS_KEY:<Your AWS Access Key>}
        secret-key: ${AWS_SECRET_KEY:<Your AWS Secret Key>}
      region:
        static: ${AWS_REGION:<Your AWS Region>}
      s3:
        endpoint: ${AWS_S3_ENDPOINT:https://s3.<Your AWS Region>.amazonaws.com}
  • Spark Hadoop Configurations Each catalog stores its metadata in its own storage such as Postgres (or any other relational database) for Hive, MongoDB for Nessie etc. But the table's data is stored in a distributed file system such as HDFS, S3, Azure Blob Storage or Google Cloud Storage (GCS).
    So you need to set Spark Hadoop configurations, either you can configure them globally as follows, which will be used by all Catalogs configured in your application.
spark:
  hadoop:
    fs:
      s3a:
        aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
        access.key: ${AWS_ACCESS_KEY:<Your AWS Access Key>}
        secret.key: ${AWS_SECRET_KEY:<Your AWS Secret Key>}
        endpoint: ${AWS_S3_ENDPOINT:s3.<Your AWS Region>.amazonaws.com}
        impl: org.apache.hadoop.fs.s3a.S3AFileSystem
        path.style.access: true  # For path-style access, useful in some S3-compatible services
        connection.ssl.enabled: false  # Enable SSL
        fast.upload: true  # Enable faster uploads

Or you can configure them in each catalog configuration as shown in the following sections, if it's different from the global configurations.

Following are Iceberg Catalog configurations using AWS S3 as Data storage.

Important

Catalog Hadoop fs configurations are only required if they are different from the spark global Hadoop fs configurations. Otherwise you can skip them.

Hadoop Catalog with AWS S3

Configure Hadoop Catalog as follows. Catalog name is also set to hadoop but it can be any name you want.

spark:
  sql:
    catalog:
      hadoop: org.apache.iceberg.spark.SparkCatalog
      hadoop.type: hadoop
      hadoop.warehouse: ${CATALOG_WAREHOUSE:s3a://<Your S3 Bucket Name>/warehouse}
      hadoop.uri: ${CATALOG_URI:hdfs://localhost:9000}
      hadoop.default-namespace: ${CATALOG_NAMESPACE:ksoot}
      hadoop.io-impl: org.apache.iceberg.aws.s3.S3FileIO
      hadoop.hadoop.fs.s3a.access.key: ${AWS_ACCESS_KEY:<Your AWS Access Key>}
      hadoop.hadoop.fs.s3a.secret.key: ${AWS_SECRET_KEY:<Your AWS Secret Key>}
      hadoop.hadoop.fs.s3a.endpoint: ${AWS_S3_ENDPOINT:s3.<Your AWS Region>.amazonaws.com}
      hadoop.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
      hadoop.hadoop.fs.s3a.path.style.access: true  # For path-style access, useful in some S3-compatible services
      hadoop.hadoop.fs.s3a.connection.ssl.enabled: false  # Enable SSL
      hadoop.hadoop.fs.s3a.fast.upload: true  # Enable faster uploads

Hive Catalog with AWS S3

Configure Hive Catalog as follows. Catalog name is also set to hive but it can be any name you want.

spark:
  sql:
    catalog:
      hive: org.apache.iceberg.spark.SparkCatalog
      hive.type: hive
      hive.warehouse: ${CATALOG_WAREHOUSE:s3a://<Your S3 Bucket Name>/warehouse}
      hive.uri: ${CATALOG_URI:thrift://localhost:9083}
      hive.default-namespace: ${CATALOG_NAMESPACE:ksoot}
      hive.io-impl: org.apache.iceberg.aws.s3.S3FileIO
      hive.hadoop.fs.s3a.access.key: ${AWS_ACCESS_KEY:<Your AWS Access Key>}
      hive.hadoop.fs.s3a.secret.key: ${AWS_SECRET_KEY:<Your AWS Secret Key>}
      hive.hadoop.fs.s3a.endpoint: ${AWS_S3_ENDPOINT:s3.<Your AWS Region>.amazonaws.com}
      hive.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
      hive.hadoop.fs.s3a.path.style.access: true  # For path-style access, useful in some S3-compatible services
      hive.hadoop.fs.s3a.connection.ssl.enabled: false  # Enable SSL
      hive.hadoop.fs.s3a.fast.upload: true  # Enable faster uploads

Nessie Catalog with AWS S3

Configure Nessie Catalog as follows. Catalog name is also set to nessie but it can be any name you want.

spark:
  sql:
    catalog:
      nessie: org.apache.iceberg.spark.SparkCatalog
      nessie.type: nessie
      nessie.warehouse: ${CATALOG_WAREHOUSE:s3a://<Your S3 Bucket Name>/warehouse}
      nessie.uri: ${CATALOG_URI:http://localhost:19120/api/v2}
      nessie.default-namespace: ${CATALOG_NAMESPACE:ksoot}
      nessie.io-impl: org.apache.iceberg.aws.s3.S3FileIO
      nessie.hadoop.fs.s3a.access.key: ${AWS_ACCESS_KEY:<Your AWS Access Key>}
      nessie.hadoop.fs.s3a.secret.key: ${AWS_SECRET_KEY:<Your AWS Secret Key>}
      nessie.hadoop.fs.s3a.endpoint: ${AWS_S3_ENDPOINT:s3.<Your AWS Region>.amazonaws.com}
      nessie.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
      nessie.hadoop.fs.s3a.path.style.access: true  # For path-style access, useful in some S3-compatible services
      nessie.hadoop.fs.s3a.connection.ssl.enabled: false  # Enable SSL
      nessie.hadoop.fs.s3a.fast.upload: true  # Enable faster uploads

Important

You can also configure multiple catalogs in your Spark application, for example, you can have both Hadoop and Hive catalogs configured in your application, but choose the one you want to use in your Spark pipelines at runtime.
You can see we need to set AWS S3 and Hadoop S3A configurations at multiple places becuase of following reasons

  • Spring boot application is not picking up the region from application.yml configurations, so need to set it through AWS Cli settings.
  • Apache Hive accesses AWS S3 through Hadoop S3A configurations, so need to set them in hive-site.xml.
  • AWS_ACCESS_KEY, AWS_SECRET_KEY and AWS_S3_ENDPOINT need to be set in application.yml to make them available to Spark at runtime.

Override default beans

It isn't recommended to override default beans as you can always extend them in your application. But if you really need to do that, you can do it as follows:

Override default sparkProperties bean as follows with your custom implementation.

Make sure either the bean definition method name or explicitly specified bean name is sparkProperties, otherwise it would not override the default bean.

@Bean
Properties sparkProperties() {
  // Your custom logic. The Following is just for demonstration
    Properties sparkProperties = new Properties();
    sparkProperties.put("spark.master", "local[*]");
    return sparkProperties;
}

Override default SparkConf bean as follows with your custom implementation.

@Bean
SparkConf sparkConf() {
  // Your custom logic. The Following is just for demonstration
    final SparkConf sparkConf = new SparkConf();
    sparkConf.set("spark.master", "local[*]");
    return sparkConf;
}

Override default SparkSession.Builder bean as follows with your custom implementation.

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
SparkSession.Builder sparkSessionBuilder() {
  // Your custom logic. The Following is just for demonstration
    SparkConf sparkConf = new SparkConf();
    sparkConf.set("spark.master", "local[*]");
    return builder = SparkSession.builder().config(sparkConf);
}

Override default SparkSession bean as follows with your custom implementation.

@Bean(destroyMethod = "stop")
SparkSession sparkSession() {
  // Your custom logic. The Following is just for demonstration
    SparkConf sparkConf = new SparkConf();
    sparkConf.set("spark.master", "local[*]");
    final SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
    return sparkSession;
}

Override default Catalog bean as follows with your custom implementation.

  @Bean
Catalog catalog(final SparkSession sparkSession, final CatalogProperties catalogProperties) {
    // Your custom logic

}

Licence

Open source The MIT License

Authors and acknowledgment

Rajveer Singh, In case you find any issues or need any support, please email me at raj14.1984@gmail.com. Please give me a ⭐ and a 👏 on medium.com if you find it helpful.

References

About

Spark Spring Boot starter

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages