Spark-DNS is Apache Spark data source for retrieving DNS A
type records from DNS server by using zone transfers to retrieve data from DNS server.
The Spark DNS data source may operate on multiple DNS zones in single data frame. Due to nature of DNS zone transfer, data retrieval for single zone transfer cannot be done in parallel, though data from multiple zones is retrieved in parallel (each DNS zone is handled in different Spark partition of RDD)
- Learning Spark internals
- Just for fun
- Demo of how to implement Spark custom data source that does not support data polling from backing storage and/or have data with size that cannot be known without fetching entire data from backing storage
JDK 11 is required for building
./gradlew clean jar
spark-submit --jars spark-dns-1.0.0.jar
pyspark --jars spark-dns-1.0.0.jar
Option name | Description | default value | Required |
---|---|---|---|
server | DNS server address (IP or fqdn) | N/A | Y |
port | DNS server TCP port for zone transfers | 53 | N |
organization | Name of organization the DNS server relates to (free text) | N/A | Y |
serial | Initial DNS zone serial to start zone transfer with | 0 | N |
zones | Comma separated list of DNS forward zones | N/A | Y |
timeout | zone transfer timeout (in seconds) | 10 | N |
xfr | zone transfer type (case-insensitive): AXFR or IXFR | IXFR | N |
ignore-failures | if set to true, XFR errors will be ignored and no records will be returned. Values: true or false | false | N |
root
|-- action: string (nullable = true)
|-- fqdn: string (nullable = true)
|-- ip: string (nullable = true)
|-- timestamp: timestamp (nullable = false)
|-- organization: string (nullable = false)
|-- zone: string (nullable = false)
- action: one of following
AXFR
- if DNS record was received from AXFR DNS zone transferIXFR_ADD
- if DNS record was received from IXFR DNS zone transfer and it's a new record added to the DNS zone via DDNSIXFR_DELETE
- if DNS record was received from IXFR DNS zone transfer and the record was removed from DNS zone via DDNS
- fqdn: : FQDN of DNS record
- ip: IP of DNS record
- timestamp: timestamp of DNS zone transfer
- organization: organization name provided via data source options
- zone: DNS zone name the DNS record relates to
>>> options = dict(server="10.0.0.1",
port="53",
zones="example.acme.,another.zone",
organization="Acme Inc.",
xfr="AXFR",
timeout="60",
serial=1234567890)
>>> spark.read.format("dns").options(**options).load().show(truncate=False)
>>> spark.sql("CREATE TABLE my_table USING dns OPTIONS (server='10.0.0.1', port=53, zones='example.acme,another.zone', serial=1234567890, organization='Acme Inc.'), xfr='AXFR', timeout='60'")
>>> spark.sql("SELECT * FROM my_table").show(truncate=False)
+------+--------------------------+-------------+-----------------------+------------+-------------+
|action|fqdn |ip |timestamp |organization|zone |
+------+--------------------------+-------------+-----------------------+------------+-------------+
|AXFR |dns-server.example.acme. |192.168.3.3 |2021-03-01 23:28:17.499|Acme Inc. |example.acme.|
|AXFR |foo.example.acme. |192.168.2.10 |2021-03-01 23:28:17.499|Acme Inc. |example.acme.|
|AXFR |newhost1.example.acme. |192.168.2.17 |2021-03-01 23:28:17.499|Acme Inc. |example.acme.|
|AXFR |newhost2.example.acme. |192.168.2.33 |2021-03-01 23:28:17.499|Acme Inc. |example.acme.|
|AXFR |newhost3.example.acme. |192.168.2.34 |2021-03-01 23:28:17.499|Acme Inc. |example.acme.|
|AXFR |newhost4.example.acme. |192.168.2.35 |2021-03-01 23:28:17.499|Acme Inc. |example.acme.|
|AXFR |newhost5.example.acme. |192.168.2.38 |2021-03-01 23:28:17.499|Acme Inc. |example.acme.|
|AXFR |newhost6.example.acme. |192.168.2.178|2021-03-01 23:28:17.499|Acme Inc. |example.acme.|
|AXFR |workstation1.example.acme.|192.168.3.2 |2021-03-01 23:28:17.499|Acme Inc. |example.acme.|
|AXFR |workstation2.example.acme.|192.168.5.2 |2021-03-01 23:28:17.499|Acme Inc. |example.acme.|
|AXFR |dns-server.another.zone. |10.0.0.71 |2021-03-01 23:28:18.093|Acme Inc. |another.zone |
|AXFR |workstation1.another.zone.|10.0.0.12 |2021-03-01 23:28:18.093|Acme Inc. |another.zone |
|AXFR |workstation2.another.zone.|10.0.0.13 |2021-03-01 23:28:18.093|Acme Inc. |another.zone |
|AXFR |workstation3.another.zone.|10.0.0.14 |2021-03-01 23:28:18.093|Acme Inc. |another.zone |
|AXFR |workstation4.another.zone.|10.0.0.15 |2021-03-01 23:28:18.093|Acme Inc. |another.zone |
+------+--------------------------+-------------+-----------------------+------------+-------------+
+------------+--------------------------+-------------+-----------------------+------------+-------------+
|action |fqdn |ip |timestamp |organization|zone |
+------------+--------------------------+-------------+-----------------------+------------+-------------+
|IXFR_ADDED |foo.example.acme. |192.168.2.10 |2021-03-01 23:28:17.499|Acme Inc. |example.acme.|
|IXFR_DELETE |newhost1.another.zone. |10.0.2.17 |2021-03-01 23:28:17.499|Acme Inc. |another.zone.|
+------------+--------------------------+-------------+-----------------------+------------+-------------+
root
|-- action: string (nullable = false)
|-- fqdn: string (nullable = false)
|-- ip: string (nullable = false)
|-- timestamp: timestamp (nullable = false)
|-- ttl: integer (nullable = false)
Note:
- The
action
field should have one of following values: IXFR_ADD/IXFR_DELETE. - The
timestamp
field value indicates date+time of dns record update, e.g., if there're multiple updates for same DNS record (action
+ip
+fqdn
), the update with most recenttimestamp
value will be taken.
Option name | Description | default value | Required |
---|---|---|---|
server | DNS server address (IP or fqdn) | N/A | Y |
port | DNS server TCP port for zone transfers | 53 | N |
timeout | zone transfer timeout (in seconds) | 10 | N |
>>> options = dict(server="10.0.0.1",
port="53",
timeout="60")
>>> data.write.format("dns").options(**options).save()
Note: data
DataFrame/Dataset should match the specified above schema:
root
|-- action: string (nullable = false)
|-- fqdn: string (nullable = false)
|-- ip: string (nullable = true)
|-- timestamp: timestamp (nullable = false)
|-- ttl: integer (nullable = false)
>>> data.createTempView("data")
>>> spark.sql("CREATE TABLE output USING dns_update OPTIONS (server='10.0.0.1', port=53, timeout=10)")
>>> spark.sql("INSERT INTO output TABLE data")
Note: data
DataFrame/Dataset should match the specified above schema:
root
|-- action: string (nullable = false)
|-- fqdn: string (nullable = false)
|-- ip: string (nullable = true)
|-- timestamp: timestamp (nullable = false)
|-- ttl: integer (nullable = false)
When using Structured streaming write, each update/row should be "packed" as JSON to new column named "update" as shown below
>>> data = spark.readStream...load()
>>> data.printSchema()
root
|-- action: string (nullable = false)
|-- fqdn: string (nullable = false)
|-- ip: string (nullable = true)
|-- timestamp: timestamp (nullable = false)
|-- ttl: integer (nullable = false)
>>> updates = data.withColumn("update", F.to_json(F.struct(data.schema.fieldNames())))
>>> options = dict(server="10.0.0.1",
port="53",
timeout="60")
>>> data.writeStream.format("dns_updates").options(**options).option("checkpointLocation", "...").start()
- Providing multiple DNS servers in options for same the same dataset/table is currently not supported
- Continuous Structured Streaming writing is not supported yet
- On Spark 2.4 (incl CDH 6.3.x) only batch reading is supported.
- Spark batch read
- Retrieving DNS
A
records from multiple DNS zone (though from single DNS server) - New DNS SOA serial of DNS zone is available in Accumulator via Spark UI (refer to relevant stage)
- Spark Structured Streaming read support (Only trigger Once and ProcessingTime are supported)
- Zone transfer timeout
- Specifying explicit zone transfer type (AXFR/IXFR) to use when retrieving data from DNS server.
- When suing
xfr=ixfr
, only DNS zone updates from initial serial will be returned.- On Structured Streaming this may produce empty DataFrames on no updates
- When using
xfr=axfr
, entire DNS zoneA
records will be returned
- When suing
- Handling temporary failures during zone transfer (similar to
failOnDataLoss
in Spark+Kafka) - Batch write support to send updates to DNS server: zone if derived from record/row FQDN
- Structured Streaming write support (Dataset::writeStream)
- Transaction signatures support for DNS zone transfers (aka TSIGs)
Spark version | JDK | DNS servers |
---|---|---|
Official 3.0.1 (2.12) | AdoptedJdk 11 | Bind9, Windows DNS |
Official 3.1.1 (2.12) | AdoptedJdk 11 | Bind9, Windows DNS |
- https://www.debian.org/doc/manuals/network-administrator/ch-bind.html DNS server setup
- https://wiki.debian.org/DDNS Setup DNS server with dynamic updates from DHCP
Special thanks to Jacek Laskowski (https://github.com/jaceklaskowski) for teaching me Spark internals