Skip to content

Flesh out Apache Spark Examples documentation #5160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/package.mill
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ $txt
if (seenCode) ""
else {
val exampleDashed = examplePath.segments.mkString("-")
val download =
val download =
s"{mill-download-url}/mill-dist-${build.millVersion()}-$exampleDashed.zip[download]"
val browse = s"{mill-example-url}/$examplePath[browse]"
s".build.mill ($download, $browse)"
Expand Down
129 changes: 129 additions & 0 deletions example/scalalib/spark/3-spark-streaming/build.mill
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package build
import mill._, scalalib._

object foo extends ScalaModule {
def scalaVersion = "2.12.15"
def mvnDeps = Seq(
mvn"org.apache.spark::spark-core:3.5.4",
mvn"org.apache.spark::spark-sql:3.5.4",
mvn"org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.4"
)

def forkArgs = Seq("--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED")

def forkEnv: T[Map[String, String]] = Map("KAFKA_SERVER" -> "localhost:9092")

def createTopic(args: mill.define.Args) = Task.Command {
val topic = args.value.mkString(" ")
val server = forkEnv()("KAFKA_SERVER")
os.call(
(
"docker-compose",
"exec",
"kafka",
"kafka-topics",
"--create",
"--topic",
topic,
"--bootstrap-server",
server
),
stdout = os.Inherit
)
}

def listTopics: T[Unit] = Task {
val server = forkEnv()("KAFKA_SERVER")
os.call(
Seq(
"docker-compose",
"exec",
"kafka",
"kafka-topics",
"--list",
"--bootstrap-server",
server
),
stdout = os.Inherit
)
}

def describeTopic(args: mill.define.Args) = Task.Command {
val topic = args.value.mkString(" ")
val server = forkEnv()("KAFKA_SERVER")
os.call(
Seq(
"docker-compose",
"exec",
"kafka",
"kafka-topics",
"--describe",
"--topic",
topic,
"--bootstrap-server",
server
),
stdout = os.Inherit
)
}

def readAllMessages(args: mill.define.Args) = Task.Command {
val topic = args.value.mkString(" ")
val server = forkEnv()("KAFKA_SERVER")
os.call(
Seq(
"docker-compose",
"exec",
"kafka",
"kafka-console-consumer",
"--bootstrap-server",
server,
"--topic",
topic,
"--from-beginning",
"--timeout-ms",
"1000"
),
stdout = os.Inherit
)
}

}

object producer extends ScalaModule {
def scalaVersion = "2.12.15"
def mvnDeps = Seq(
mvn"org.apache.kafka:kafka-clients:3.7.0"
)
}

// This examples demonstrates using mill to manage a streaming provider like kafka.

/** Usage

> docker-compose up -d

> ./mill foo.createTopic "test-topic" # Custom mill task to create topic.
...
Created topic test-topic.

> ./mill foo.listTopics # Custom mill task to list all topics.
...
test-topic

> ./mill foo.describeTopic "test-topic" # Custom mill task to describe a topic.
...
Topic: test-topic...

> ./mill producer.run # Send message

> ./mill foo.run # Consume message with spark.
...
+-------------+
| message|
+-------------+
|Hello, World!|
+-------------+

> docker-compose down -v --remove-orphans
*/
22 changes: 22 additions & 0 deletions example/scalalib/spark/3-spark-streaming/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: '3.8'

services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000

kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
depends_on:
- zookeeper
33 changes: 33 additions & 0 deletions example/scalalib/spark/3-spark-streaming/foo/src/foo/Foo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package foo

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object Foo {

def streamFromKafka(spark: SparkSession): DataFrame = {
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", sys.env("KAFKA_SERVER"))
.option("subscribe", "test-topic")
.option("startingOffsets", "earliest")
.load()
.selectExpr("CAST(value AS STRING)").toDF("message")
}

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("HelloWorldKafka")
.master("local[*]")
.getOrCreate()

val df = streamFromKafka(spark)

val query = df.writeStream
.format("console")
.outputMode("append")
.start()

query.awaitTermination(9000)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import org.apache.kafka.clients.producer._

object Producer {
def main(args: Array[String]): Unit = {
val props = new java.util.Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
producer.send(new ProducerRecord[String, String]("test-topic", "Hello, World!"))
producer.close()
}
}
37 changes: 37 additions & 0 deletions example/scalalib/spark/4-hello-delta/build.mill
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package build
import mill._, scalalib._

object foo extends ScalaModule {
def scalaVersion = "2.12.15"
def mvnDeps = Seq(
mvn"org.apache.spark::spark-core:3.4.0",
mvn"org.apache.spark::spark-sql:3.4.0",
mvn"io.delta:delta-core_2.12:2.4.0"
)

def forkArgs = Seq("--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED")

object test extends ScalaTests {
def mvnDeps = Seq(mvn"com.lihaoyi::utest:0.8.5")
def testFramework = "utest.runner.Framework"

def forkArgs = Seq("--add-opens", "java.base/sun.nio.ch=ALL-UNNAMED")
}

}
// This examples demonstrates running spark with delta using mill.

/** Usage

> ./mill foo.run
+-------------+
| message|
+-------------+
|Hello, Delta!|
+-------------+

> ./mill foo.test
...
+ foo.FooTests.Delta table should contain one row with 'Hello, Delta!'...
...
*/
33 changes: 33 additions & 0 deletions example/scalalib/spark/4-hello-delta/foo/src/foo/Foo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package foo

import org.apache.spark.sql.{DataFrame, SparkSession}
import io.delta.tables._

object Foo {

def writeDeltaTable(spark: SparkSession, path: String): Unit = {
import spark.implicits._
val data = Seq("Hello, Delta!").toDF("message")
data.write.format("delta").mode("overwrite").save(path)
}

def readDeltaTable(spark: SparkSession, path: String): DataFrame = {
spark.read.format("delta").load(path)
}

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("HelloDelta")
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()

val deltaPath = "tmp/delta-table"

writeDeltaTable(spark, deltaPath)
readDeltaTable(spark, deltaPath).show()

spark.stop()
}
}
29 changes: 29 additions & 0 deletions example/scalalib/spark/4-hello-delta/foo/test/src/FooTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package foo

import org.apache.spark.sql.SparkSession
import utest._

object FooTests extends TestSuite {
def tests = Tests {
test("Delta table should contain one row with 'Hello, Delta!'") {
val spark = SparkSession.builder()
.appName("FooDeltaTest")
.master("local[*]")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog"
)
.getOrCreate()

val path = "tmp/test-delta-table"
Foo.writeDeltaTable(spark, path)
val df = Foo.readDeltaTable(spark, path)

val messages = df.collect().map(_.getString(0)).toList
assert(messages == List("Hello, Delta!"))

spark.stop()
}
}
}
43 changes: 43 additions & 0 deletions example/scalalib/spark/5-hello-iceberg/build.mill
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package build
import mill._, scalalib._

object foo extends ScalaModule {
def scalaVersion = "2.12.15"
def mvnDeps = Seq(
mvn"org.apache.spark::spark-core:3.5.4",
mvn"org.apache.spark::spark-sql:3.5.4",
mvn"org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3"
)

def forkArgs = Seq(
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-exports=java.base/sun.nio.ch=ALL-UNNAMED"
)

object test extends ScalaTests {
def mvnDeps = Seq(mvn"com.lihaoyi::utest:0.8.5")
def testFramework = "utest.runner.Framework"

def forkArgs = Seq(
"--add-opens=java.base/java.nio=ALL-UNNAMED",
"--add-exports=java.base/sun.nio.ch=ALL-UNNAMED"
)
}

}
// This examples demonstrates running spark with ice-berg using mill.

/** Usage

> ./mill foo.run
+---------------+
| message|
+---------------+
|Hello, Iceberg!|
+---------------+

> ./mill foo.test
...
+ foo.FooTests.Iceberg table should contain one row with 'Hello, Iceberg!'...
...
*/
34 changes: 34 additions & 0 deletions example/scalalib/spark/5-hello-iceberg/foo/src/foo/Foo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package foo

import org.apache.spark.sql.{DataFrame, SparkSession}
import java.nio.file.{Files, Paths}

object Foo {

def writeIcebergTable(spark: SparkSession, tableName: String): Unit = {
import spark.implicits._
val data = Seq("Hello, Iceberg!").toDF("message")
data.writeTo(tableName).using("iceberg").createOrReplace()
}

def readIcebergTable(spark: SparkSession, tableName: String): DataFrame = {
spark.read.table(tableName)
}

def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("HelloIceberg")
.master("local[*]")
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.local.type", "hadoop")
.config("spark.sql.catalog.local.warehouse", "tmp/iceberg-warehouse")
.getOrCreate()

val table = "local.db.hello_iceberg"

writeIcebergTable(spark, table)
readIcebergTable(spark, table).show()

spark.stop()
}
}
Loading
Loading