Skip to content

Executor metadata collection plugin (cloud + K8s)#62

Open
minskya wants to merge 10 commits into
mainfrom
executor_plugin
Open

Executor metadata collection plugin (cloud + K8s)#62
minskya wants to merge 10 commits into
mainfrom
executor_plugin

Conversation

@minskya
Copy link
Copy Markdown
Contributor

@minskya minskya commented Apr 19, 2026

Summary

Adds an ExecutorPlugin that collects machine metadata (instance type, spot/on-demand, system info) from each executor and reports it to the driver via Spark's plugin RPC. Supports cloud VMs (AWS/GCP/Azure) and Kubernetes.

  • ExecutorPlugin runs on each executor at startup, collects metadata, and sends it to the driver via PluginContext.send() / DriverPlugin.receive()
  • Cloud VM detection reads /sys/class/dmi/id/sys_vendor locally to identify AWS/GCP/Azure, then runs a single provider-specific bash one-liner against the cloud metadata API
  • Kubernetes support detects K8s via KUBERNETES_SERVICE_HOST, skips the metadata service, and reads from user-configured env vars (exposed from node labels via the downward API)
  • System basics always collected via JVM APIs: OS name/arch, JVM version, CPU cores, total memory
  • Event pipeline follows the existing DataFlint pattern: event → listener → KVStore → export
  • Opt-in via spark.dataflint.experimental.executor.metadata.enabled=true (default: false)

Changed files

File Change
executor/CloudMetadataDetector.scala New — Reads /sys/class/dmi/id/sys_vendor to identify cloud, runs single bash one-liner for instance type + lifecycle
executor/K8sMetadataDetector.scala New — Reads metadata from configured env vars (no HTTP calls)
executor/DataflintExecutorPlugin.scala New — ExecutorPlugin collecting system basics + branching cloud/K8s detection
executor/ExecutorMetadataMessage.scala New — Serializable RPC message for executor→driver
executor/DriverMetadataHelper.scala New — Reads spark.dataflint.experimental.executor.metadata.* configs
listener/model.scala Added DataflintExecutorMetadataInfo, DataflintExecutorMetadataEvent, DataflintExecutorMetadataWrapper
listener/DataflintListener.scala Added case for new event in onOtherEvent()
listener/DataflintStore.scala Added executorMetadata() query method
saas/SparkRunStore.scala Added dataflintExecutorMetadata field to export data model
saas/StoreDataExtractor.scala Added readAll[DataflintExecutorMetadataWrapper] to extraction
SparkDataflintPlugin.scala (spark3 + spark4) Returns DataflintExecutorPlugin, passes config via extraConf, adds receive()
plugin/src/test/.../executor/CloudMetadataDetectorSpec.scala New — Unit tests for JSON parsing
plugin/src/test/.../executor/K8sMetadataDetectorSpec.scala New — Unit tests for env-var lookup

Architecture

sequenceDiagram
    participant Driver as Driver (SparkDataflintDriverPlugin)
    participant Executor as Executor (DataflintExecutorPlugin)
    participant Cloud as Cloud Metadata API

    Driver->>Executor: init(extraConf: {enabled: true, k8s.*.envVar: ...})
    Executor->>Executor: Collect system basics (OS, JVM, cores, memory)
    alt KUBERNETES_SERVICE_HOST set
        Executor->>Executor: Read configured env vars (no HTTP)
    else Cloud VM
        Executor->>Executor: Read /sys/class/dmi/id/sys_vendor
        alt AWS / GCP / Azure detected
            Executor->>Cloud: bash + curl to provider metadata API
        else No cloud detected
            Executor->>Executor: Skip cloud metadata
        end
    end
    Executor->>Driver: PluginContext.send(ExecutorMetadataMessage)
    Driver->>Driver: receive() → post DataflintExecutorMetadataEvent
    Driver->>Driver: DataflintListener → KVStore
    Driver->>Driver: StoreDataExtractor → SparkRunStore → S3 export
Loading

Configs

Property Default Description
spark.dataflint.experimental.executor.metadata.enabled false Enable executor metadata collection
spark.dataflint.experimental.executor.metadata.k8s.instanceType.envVar (unset) Env var name holding the instance type when running in K8s
spark.dataflint.experimental.executor.metadata.k8s.lifecycleType.envVar (unset) Env var name holding the lifecycle (spot/on-demand) when running in K8s
spark.dataflint.experimental.executor.metadata.k8s.cloudProvider.envVar (unset) Env var name holding the cloud provider when running in K8s

K8s pod spec example (downward API)

env:
  - name: NODE_INSTANCE_TYPE
    valueFrom:
      fieldRef:
        fieldPath: metadata.labels['node.kubernetes.io/instance-type']
  - name: NODE_LIFECYCLE
    valueFrom:
      fieldRef:
        fieldPath: metadata.labels['karpenter.sh/capacity-type']

Then set:

spark.dataflint.experimental.executor.metadata.k8s.instanceType.envVar=NODE_INSTANCE_TYPE
spark.dataflint.experimental.executor.metadata.k8s.lifecycleType.envVar=NODE_LIFECYCLE

Collected Metadata

Field Source Always collected?
executorId Spark PluginContext Yes
executorHost InetAddress.getLocalHost Yes
osName System.getProperty("os.name") Yes
osArch System.getProperty("os.arch") Yes
jvmVersion System.getProperty("java.version") Yes
availableProcessors Runtime.availableProcessors() Yes
totalMemoryBytes Runtime.maxMemory() Yes
cloudProvider /sys/class/dmi/id/sys_vendor or configured K8s env var When detected
instanceType Cloud metadata API or configured K8s env var When detected
lifecycleType Cloud metadata API or configured K8s env var When detected
collectionError Error message if detection fails Only on failure

Test plan

  • sbt pluginspark3/compile passes
  • sbt pluginspark4/compile passes
  • sbt plugin/test — all unit tests (including new ones) pass
  • sbt pluginspark3/test — all existing + new tests pass
  • Manual test on Databricks (EC2 spot) — both executors reported aws / m5.large / spot with full system info
  • Manual test on EKS/GKE/AKS with K8s env vars configured

🤖 Generated with Claude Code

minskya and others added 2 commits April 19, 2026 14:09
Implement an ExecutorPlugin that collects machine metadata from each executor
and reports it to the driver. When enabled via spark.dataflint.executor.metadata.enabled,
each executor detects its cloud provider by reading /sys/class/dmi/id/sys_vendor,
then fetches instance type and spot/on-demand status from the cloud metadata API.
System basics (OS, JVM, CPU cores, memory) are always collected.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@minskya minskya changed the title Add executor metadata collection plugin [DATAFLINT-4414] Add executor metadata collection plugin Apr 20, 2026
@notion-workspace
Copy link
Copy Markdown

@notion-workspace
Copy link
Copy Markdown

@minskya minskya changed the title [DATAFLINT-4414] Add executor metadata collection plugin [DATAFLINT-4359] Add executor metadata collection plugin Apr 20, 2026
minskya and others added 8 commits April 20, 2026 16:02
Return null from DriverPlugin.receive() since PluginContext.send() is
fire-and-forget — returning a string caused a spurious warning.

Relax broadcast join test assertions to duration >= 0 because the
codegen sleep is only in doProduce path, not doExecute which broadcast
joins use.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
`spark.dataflint.experimental.executor.metadata.enabled`
When running in K8s (detected via KUBERNETES_SERVICE_HOST env var), skip
the cloud metadata service and read instance type, lifecycle, and cloud
provider from user-configured env vars instead. The user is responsible
for exposing node labels as env vars via the downward API in the pod spec.

New configs:
- spark.dataflint.experimental.executor.metadata.k8s.instanceType.envVar
- spark.dataflint.experimental.executor.metadata.k8s.lifecycleType.envVar
- spark.dataflint.experimental.executor.metadata.k8s.cloudProvider.envVar

Add unit tests for CloudMetadataDetector JSON parsing and K8sMetadataDetector
env-var lookup logic.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@minskya minskya changed the title [DATAFLINT-4359] Add executor metadata collection plugin Executor metadata collection plugin (cloud + K8s) May 18, 2026
@minskya minskya requested review from Copilot and menishmueli May 18, 2026 17:16
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds an experimental Spark executor-side plugin that collects executor machine/runtime metadata (cloud VM via metadata service or Kubernetes via env vars) and reports it to the driver via Spark plugin RPC, then persists/exports it through the existing DataFlint listener → KVStore → extraction pipeline.

Changes:

  • Introduces executor-side metadata collection (system + cloud/K8s detection) and a serializable executor→driver message.
  • Adds driver-side RPC handling that posts a new DataflintExecutorMetadataEvent, plus listener/store/export model plumbing.
  • Adds unit tests for cloud JSON parsing and Kubernetes env-var based detection; adjusts SQL node timing tests for joins.

Reviewed changes

Copilot reviewed 15 out of 15 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
spark-plugin/pluginspark4/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala Enables executor plugin + passes extraConf; driver receive() converts RPC message into listener event
spark-plugin/pluginspark3/src/main/scala/io/dataflint/spark/SparkDataflintPlugin.scala Same as Spark 4 variant for Spark 3
spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/CloudMetadataDetector.scala Detects cloud provider via DMI vendor and runs provider-specific metadata curl via bash
spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/K8sMetadataDetector.scala Detects K8s and reads configured env vars for instance/lifecycle/provider
spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DataflintExecutorPlugin.scala ExecutorPlugin that collects metadata and sends it to driver
spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/ExecutorMetadataMessage.scala Serializable RPC payload for executor→driver
spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/executor/DriverMetadataHelper.scala Reads Spark configs and posts executor metadata events
spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/model.scala Adds executor metadata info/event/wrapper model types
spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintListener.scala Persists executor metadata events into KVStore
spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/listener/DataflintStore.scala Adds query method to read executor metadata from KVStore
spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/SparkRunStore.scala Extends export model with executor metadata collection
spark-plugin/plugin/src/main/scala/org/apache/spark/dataflint/saas/StoreDataExtractor.scala Extracts executor metadata wrappers into SparkRunStore
spark-plugin/plugin/src/test/scala/org/apache/spark/dataflint/executor/CloudMetadataDetectorSpec.scala Unit tests for cloud JSON parsing/filtering
spark-plugin/plugin/src/test/scala/org/apache/spark/dataflint/executor/K8sMetadataDetectorSpec.scala Unit tests for env-var based K8s metadata lookup
spark-plugin/pluginspark3/src/test/scala/org/apache/spark/dataflint/DataFlintSqlNodesSpec.scala Refactors duration assertions to accommodate join exec paths without codegen sleep

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +37 to +41
} catch {
case e: Throwable =>
logWarning("Failed to detect cloud metadata", e)
CloudMetadataDetector.CloudMetadata(None, None, None)
}
Comment on lines +26 to +29
val jvmVersion = System.getProperty("java.version", "unknown")
val availableProcessors = Runtime.getRuntime.availableProcessors()
val totalMemoryBytes = Runtime.getRuntime.maxMemory()

Comment on lines +19 to +24
private val COMMAND_TIMEOUT_MS = 5000L

private val SYS_VENDOR_PATH = "/sys/class/dmi/id/sys_vendor"

private val AWS_COMMAND =
"""TOKEN=$(curl -sf -X PUT "http://169.254.169.254/latest/api/token" -H "X-aws-ec2-metadata-token-ttl-seconds: 21600" --connect-timeout 1 --max-time 2 2>/dev/null)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants