Managing dependencies is a crucial part of any complex project. Handling this manually can be tedious and time-consuming,
leaving less room to focus on other essential aspects of development.
This is where Spring Boot starters come into play.
These are pre-configured dependency descriptors designed to simplify dependency management.
By including a starter POM in your project, you can effortlessly bring in all the necessary Spring and related technologies,
saving you from the hassle of searching through documentation or copying multiple dependency definitions.
Spring Boot offers starters for popular technologies to streamline your development process.
But starter for Spark is not available yet because it's recommended to have Spark dependencies in provided
scope in your applications,
as they are supposed to be provided by containers where the Jobs are deployed such as Spark Cluster or AWS EMR.
But as long as the Spark dependency versions in your application are same as that in container, it does not matter if you have them in compile
scope.
The Spring Boot Starter for Spark is a set of convenient dependency descriptors that you can include in your Spring boot application
to have all required Spark dependencies and SparkSession
and Iceberg Catalog
beans auto-configured with spark configurations support in spring boot yml
or properties
file in your favourite IDE.
It specifies the following versions:
- Java 17
- Spring Boot 3.4.5
- Spark 3.5.5
- Scala 2.13.16
<java.version>17</java.version>
<spring-boot.version>3.4.5</spring-boot.version>
<!-- Spark dependencies versions-->
<scala.version>2.13.16</scala.version>
<scala.compact.version>2.13</scala.compact.version>
<spark.version>3.5.5</spark.version>
<spark.compact.version>3.5</spark.compact.version>
- Bundles spark, Iceberg and Hadoop dependencies compatible with Spring boot 3+.
- Provides auto-configured
SparkSession
bean which can be customized in any manner. - Provides auto-configured
Iceberg Catalog
supportingHadoop
,Hive
andNessie
and other Catalog types. - Exposes all Spark configurations including Catalog configurations as Spring boot environment properties.
- Enables auto-completion assistance for Spark configuration properties in Spring boot
yml
andproperties
files in IDEs such as IntelliJ, Eclipse etc. Find details at spring-configuration-metadata.json
Current version: 1.3 Refer to Release notes while upgrading.
Define the following properties in pom.xml
:
<properties>
<java.version>17</java.version>
<spring-boot.version>3.4.5</spring-boot.version>
<spring-boot-starter-spark.version>1.3</spring-boot-starter-spark.version>
<!-- The Following two versions must be specified otherwise you will get exception java.lang.ClassNotFoundException: javax.servlet.http.HttpServlet-->
<jakarta-servlet.version>4.0.3</jakarta-servlet.version>
<jersey.version>2.36</jersey.version>
</properties>
Important
Spring boot parent pom provides jakarta-servlet.version
and jersey.version
versions.
These must be overridden in your pom as mentioned above, otherwise you will get exception java.lang.ClassNotFoundException: javax.servlet.http.HttpServlet.
Add the following dependency to your pom.xml
:
<dependency>
<groupId>io.github.officiallysingh</groupId>
<artifactId>spring-boot-starter-spark</artifactId>
<version>${spring-boot-starter-spark.version}</version>
</dependency>
Note
spring-boot-starter-spark
jar contains spark core, spark sql, spark mllib, iceberg and hadoop dependencies.
You can exclude any dependency if you don't need it.
See example usage in a Spark Spring could task
Any spark properties can be configured in application.yml
as follows:
spark:
master: local[*]
executor:
instances: 2
memory: 2g
cores: 1
driver:
memory: 1g
cores: 1
or in application.properties
as follows:
spark.master=local[*]
spark.executor.instances=2
spark.executor.memory=2g
spark.executor.cores=1
spark.driver.memory=1g
spark.driver.cores=1
The following Spring beans are auto-configured but they are conditional and can be customized as elaborated in the next section.
For details refer to SparkAutoConfiguration
SparkSession
bean is auto-configured, but if you want to override you can define your ownSparkSession
class bean in your application.SparkConf
bean is auto-configured with Spark configuration properties using the standard Spring boot mechanism
i.e. you can use a variety of external configuration sources including Java properties files, YAML files, environment variables, and command-line arguments. But if you want to override it, you can define your ownSparkConf
class bean in your application.sprkProperties
bean exposes all spark configurations as Spring boot environment properties. All properties in this bean as set inSparkConf
bean.SparkSession.Builder
provides extension mechanism to customiseSparkSession
bean creation.
The following Spring beans are auto-configured but they are conditional and can be overridden by defining these beans in your application code.
For details refer to SparkCatalogAutoConfiguration
,
It has been tested with following Catalog types, but other Catalog types
would also work as long as you have the required dependencies in your classpath and catalog configurations in place.
- Iceberg
Catalog
bean is auto-configured based on the catalog type specified inapplication.yml
orapplication.properties
file.- If catalog type is
hadoop
, it auto-configuresHadoopCatalog
. - If catalog type is
hive
, it auto-configuresHiveCatalog
. - If catalog type is
nessie
, it auto-configuresNessieCatalog
.
- If catalog type is
- CatalogProperties is auto-configured with properties specified in
application.yml
orapplication.properties
file.
Note
SparkCatalogAutoConfiguration
is enabled only when some catalog configuration is present in application.yml
or application.properties
.
Refer to Apache Hadoop and Hive installation guide for details on how to install Hadoop and Hive.
You can customize SparkSession.Builder
by defining any number of beans of type SparkSessionBuilderCustomizer
in your application.
@Bean
public SparkSessionBuilderCustomizer enableHiveSupportCustomizer() {
return SparkSession.Builder::enableHiveSupport;
}
Using SparkSessionCustomizer
you can customize SparkSession
by defining any number of beans of type SparkSessionCustomizer
in your application.
Following is an example to register User Defined Function in SparkSession
:
import static org.apache.spark.sql.functions.callUDF;
import java.time.LocalDate;
import java.time.temporal.ChronoUnit;
import java.util.List;
import java.util.stream.Stream;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.api.java.UDF2;
import org.apache.spark.sql.functions;
public class UserDefinedFunctions {
public static final String EXPLODE_DATE_SEQ = "explodeDateSeq";
static UDF2<LocalDate, LocalDate, List<LocalDate>> explodeDateSeq =
(start, end) -> {
long numOfDaysBetween = ChronoUnit.DAYS.between(start, end) + 1;
return Stream.iterate(start, date -> date.plusDays(1)).limit(numOfDaysBetween).toList();
};
public static Column explodeDateSeq(final Column startCol, final Column endCol) {
return functions.explode(callUDF(UserDefinedFunctions.EXPLODE_DATE_SEQ, startCol, endCol));
}
}
import com.ksoot.spark.springframework.boot.autoconfigure.SparkSessionCustomizer;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
@Component
class SparkSessionUDFCustomizer implements SparkSessionCustomizer, Ordered {
@Override
public void customize(final SparkSession sparkSession) {
sparkSession
.udf()
.register(
UserDefinedFunctions.EXPLODE_DATE_SEQ,
UserDefinedFunctions.explodeDateSeq,
DataTypes.createArrayType(DataTypes.DateType));
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
Dataset<Row> originalDf =
this.sparkSession.createDataFrame(
Arrays.asList(
Dataframe.of("c1", LocalDate.of(2024, 6, 5), "f105"),
Dataframe.of("c1", LocalDate.of(2024, 6, 6), "f106"),
Dataframe.of("c1", LocalDate.of(2024, 6, 7), "f107"),
Dataframe.of("c1", LocalDate.of(2024, 6, 10), "f110"),
Dataframe.of("c2", LocalDate.of(2024, 6, 12), "f212"),
Dataframe.of("c2", LocalDate.of(2024, 6, 13), "f213"),
Dataframe.of("c2", LocalDate.of(2024, 6, 15), "f215")),
Dataframe.class);
Dataset<Row> customerMinMaxDateDf =
originalDf
.groupBy("customer_id")
.agg(min("date").as("min_date"), max("date").as("max_date"));
// Generate the expanded dataset
Dataset<Row> customerIdDatesDf =
customerMinMaxDateDf
.withColumn(
"date",
UserDefinedFunctions.explodeDateSeq(
customerMinMaxDateDf.col("min_date"), customerMinMaxDateDf.col("max_date")))
.select("customer_id", "date");
customerIdDatesDf.show();
Note
To support java 8 datetime in Spark, set property spark.sql.datetime.java8API.enabled
as true
in application.yml
or application.properties
For Spark Iceberg integration demo refer to spring-boot-spark-iceberg
.
Following are Iceberg Catalog configurations using Local Hadoop as Data storage.
Configure Hadoop Catalog as follows. Catalog name is also set to hadoop
but it can be any name you want.
spark:
sql:
catalog:
hadoop: org.apache.iceberg.spark.SparkCatalog
hadoop.type: hadoop
hadoop.warehouse: ${CATALOG_WAREHOUSE:hdfs://localhost:9000/warehouse}
hadoop.uri: ${CATALOG_URI:hdfs://localhost:9000}
hadoop.default-namespace: ${CATALOG_NAMESPACE:ksoot}
hadoop.io-impl: org.apache.iceberg.hadoop.HadoopFileIO
Configure Hive Catalog as follows. Catalog name is also set to hive
but it can be any name you want.
spark:
sql:
catalog:
hadoop: org.apache.iceberg.spark.SparkCatalog
hadoop.type: hadoop
hadoop.warehouse: ${CATALOG_WAREHOUSE:hdfs://localhost:9000/warehouse}
hadoop.uri: ${CATALOG_URI:hdfs://localhost:9000}
hadoop.default-namespace: ${CATALOG_NAMESPACE:ksoot}
hadoop.io-impl: org.apache.iceberg.hadoop.HadoopFileIO
Configure Nessie Catalog as follows. Catalog name is also set to nessie
but it can be any name you want.
spark:
sql:
catalog:
nessie: org.apache.iceberg.spark.SparkCatalog
nessie.type: nessie
nessie.warehouse: ${CATALOG_WAREHOUSE:hdfs://localhost:9000/warehouse}
nessie.uri: ${CATALOG_URI:http://localhost:19120/api/v2}
nessie.default-namespace: ${CATALOG_NAMESPACE:ksoot}
nessie.io-impl: org.apache.iceberg.hadoop.HadoopFileIO
Important
AWS S3, Azure Blob Storage and Google Cloud Storage (GCS) are also supported as data storage for Iceberg tables.
You need to have required dependencies in your classpath and configure catalog properties accordingly in application.yml
or application.properties
file.
Along with the catalog configurations, you also need to do following.
- Configure AWS Cli with your AWS credentials and region.
- Add following dependencies to your
pom.xml
:
<properties>
<spring-cloud-aws.version>3.2.1</spring-cloud-aws.version>
<awssdk-bundle.version>2.25.70</awssdk-bundle.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-dependencies</artifactId>
<version>${spring-cloud-aws.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-s3</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bundle</artifactId>
<version>${awssdk-bundle.version}</version>
</dependency>
</dependencies>
- Update $HIVE_HOME/conf/hive-site.xml with following properties. It will take AWS credetials from AWS CLI configuration.
Replace
{Your AWS Region}
with your actual AWS region, e.g.ap-south-1
etc.
<property>
<name>fs.s3a.aws.credentials.provider</name>
<value>com.amazonaws.auth.DefaultAWSCredentialsProviderChain</value>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>s3.{Your AWS Region}.amazonaws.com</value>
</property>
<property>
<name>fs.s3a.impl</name>
<value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
</property>
- Add
aws-java-sdk-bundle-1.12.262.jar
,hadoop-aws-3.3.4.jar
andpostgresql-42.7.4.jar
to folder$HIVE_HOME/lib
. Versions may vary, so make sure to use compatible versions with your setup. - Add the following properties to your
application.yml
orapplication.properties
file:
spring:
cloud:
aws:
credentials:
access-key: ${AWS_ACCESS_KEY:<Your AWS Access Key>}
secret-key: ${AWS_SECRET_KEY:<Your AWS Secret Key>}
region:
static: ${AWS_REGION:<Your AWS Region>}
s3:
endpoint: ${AWS_S3_ENDPOINT:https://s3.<Your AWS Region>.amazonaws.com}
- Spark Hadoop Configurations
Each catalog stores its metadata in its own storage such as Postgres (or any other relational database) for Hive, MongoDB for Nessie etc.
But the table's data is stored in a distributed file system such as HDFS, S3, Azure Blob Storage or Google Cloud Storage (GCS).
So you need to set Spark Hadoop configurations, either you can configure them globally as follows, which will be used by all Catalogs configured in your application.
spark:
hadoop:
fs:
s3a:
aws.credentials.provider: "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access.key: ${AWS_ACCESS_KEY:<Your AWS Access Key>}
secret.key: ${AWS_SECRET_KEY:<Your AWS Secret Key>}
endpoint: ${AWS_S3_ENDPOINT:s3.<Your AWS Region>.amazonaws.com}
impl: org.apache.hadoop.fs.s3a.S3AFileSystem
path.style.access: true # For path-style access, useful in some S3-compatible services
connection.ssl.enabled: false # Enable SSL
fast.upload: true # Enable faster uploads
Or you can configure them in each catalog configuration as shown in the following sections, if it's different from the global configurations.
Following are Iceberg Catalog configurations using AWS S3 as Data storage.
Important
Catalog Hadoop fs configurations are only required if they are different from the spark global Hadoop fs configurations. Otherwise you can skip them.
Configure Hadoop Catalog as follows. Catalog name is also set to hadoop
but it can be any name you want.
spark:
sql:
catalog:
hadoop: org.apache.iceberg.spark.SparkCatalog
hadoop.type: hadoop
hadoop.warehouse: ${CATALOG_WAREHOUSE:s3a://<Your S3 Bucket Name>/warehouse}
hadoop.uri: ${CATALOG_URI:hdfs://localhost:9000}
hadoop.default-namespace: ${CATALOG_NAMESPACE:ksoot}
hadoop.io-impl: org.apache.iceberg.aws.s3.S3FileIO
hadoop.hadoop.fs.s3a.access.key: ${AWS_ACCESS_KEY:<Your AWS Access Key>}
hadoop.hadoop.fs.s3a.secret.key: ${AWS_SECRET_KEY:<Your AWS Secret Key>}
hadoop.hadoop.fs.s3a.endpoint: ${AWS_S3_ENDPOINT:s3.<Your AWS Region>.amazonaws.com}
hadoop.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
hadoop.hadoop.fs.s3a.path.style.access: true # For path-style access, useful in some S3-compatible services
hadoop.hadoop.fs.s3a.connection.ssl.enabled: false # Enable SSL
hadoop.hadoop.fs.s3a.fast.upload: true # Enable faster uploads
Configure Hive Catalog as follows. Catalog name is also set to hive
but it can be any name you want.
spark:
sql:
catalog:
hive: org.apache.iceberg.spark.SparkCatalog
hive.type: hive
hive.warehouse: ${CATALOG_WAREHOUSE:s3a://<Your S3 Bucket Name>/warehouse}
hive.uri: ${CATALOG_URI:thrift://localhost:9083}
hive.default-namespace: ${CATALOG_NAMESPACE:ksoot}
hive.io-impl: org.apache.iceberg.aws.s3.S3FileIO
hive.hadoop.fs.s3a.access.key: ${AWS_ACCESS_KEY:<Your AWS Access Key>}
hive.hadoop.fs.s3a.secret.key: ${AWS_SECRET_KEY:<Your AWS Secret Key>}
hive.hadoop.fs.s3a.endpoint: ${AWS_S3_ENDPOINT:s3.<Your AWS Region>.amazonaws.com}
hive.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
hive.hadoop.fs.s3a.path.style.access: true # For path-style access, useful in some S3-compatible services
hive.hadoop.fs.s3a.connection.ssl.enabled: false # Enable SSL
hive.hadoop.fs.s3a.fast.upload: true # Enable faster uploads
Configure Nessie Catalog as follows. Catalog name is also set to nessie
but it can be any name you want.
spark:
sql:
catalog:
nessie: org.apache.iceberg.spark.SparkCatalog
nessie.type: nessie
nessie.warehouse: ${CATALOG_WAREHOUSE:s3a://<Your S3 Bucket Name>/warehouse}
nessie.uri: ${CATALOG_URI:http://localhost:19120/api/v2}
nessie.default-namespace: ${CATALOG_NAMESPACE:ksoot}
nessie.io-impl: org.apache.iceberg.aws.s3.S3FileIO
nessie.hadoop.fs.s3a.access.key: ${AWS_ACCESS_KEY:<Your AWS Access Key>}
nessie.hadoop.fs.s3a.secret.key: ${AWS_SECRET_KEY:<Your AWS Secret Key>}
nessie.hadoop.fs.s3a.endpoint: ${AWS_S3_ENDPOINT:s3.<Your AWS Region>.amazonaws.com}
nessie.hadoop.fs.s3a.impl: org.apache.hadoop.fs.s3a.S3AFileSystem
nessie.hadoop.fs.s3a.path.style.access: true # For path-style access, useful in some S3-compatible services
nessie.hadoop.fs.s3a.connection.ssl.enabled: false # Enable SSL
nessie.hadoop.fs.s3a.fast.upload: true # Enable faster uploads
Important
You can also configure multiple catalogs in your Spark application,
for example, you can have both Hadoop and Hive catalogs configured in your application, but choose the one you want to use in your Spark pipelines at runtime.
You can see we need to set AWS S3 and Hadoop S3A configurations at multiple places becuase of following reasons
- Spring boot application is not picking up the region from application.yml configurations, so need to set it through AWS Cli settings.
- Apache Hive accesses AWS S3 through Hadoop S3A configurations, so need to set them in
hive-site.xml
. - AWS_ACCESS_KEY, AWS_SECRET_KEY and AWS_S3_ENDPOINT need to be set in
application.yml
to make them available to Spark at runtime.
It isn't recommended to override default beans as you can always extend them in your application. But if you really need to do that, you can do it as follows:
Make sure either the bean definition method name or explicitly specified bean name is sparkProperties
, otherwise it would not override the default bean.
@Bean
Properties sparkProperties() {
// Your custom logic. The Following is just for demonstration
Properties sparkProperties = new Properties();
sparkProperties.put("spark.master", "local[*]");
return sparkProperties;
}
Override default SparkConf
bean as follows with your custom implementation.
@Bean
SparkConf sparkConf() {
// Your custom logic. The Following is just for demonstration
final SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.master", "local[*]");
return sparkConf;
}
Override default SparkSession.Builder
bean as follows with your custom implementation.
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
SparkSession.Builder sparkSessionBuilder() {
// Your custom logic. The Following is just for demonstration
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.master", "local[*]");
return builder = SparkSession.builder().config(sparkConf);
}
Override default SparkSession
bean as follows with your custom implementation.
@Bean(destroyMethod = "stop")
SparkSession sparkSession() {
// Your custom logic. The Following is just for demonstration
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.master", "local[*]");
final SparkSession sparkSession = SparkSession.builder().config(sparkConf).getOrCreate();
return sparkSession;
}
Override default Catalog
bean as follows with your custom implementation.
@Bean
Catalog catalog(final SparkSession sparkSession, final CatalogProperties catalogProperties) {
// Your custom logic
}
Open source The MIT License
Rajveer Singh, In case you find any issues or need any support, please email me at raj14.1984@gmail.com. Please give me a ⭐ and a 👏 on medium.com if you find it helpful.
- Apache Hadoop and Hive installation guide for details on how to install Hadoop and Hive.
- Demo Spark Spring could task and
spring-boot-spark-iceberg
- To know about Spark Refer to Spark Documentation.
- Find all Spark Configurations details at Spark Configuration Documentation
- How to create new Spring boot starter
- Apache Iceberg
- Apache Iceberg Spark Quickstart
- Apache Hadoop
- Apache Hive
- Nessie