Skip to content

Commit

Permalink
Add supports to Insert Into
Browse files Browse the repository at this point in the history
  • Loading branch information
weiqingy committed Feb 8, 2018
1 parent 44211bb commit e96d7ff
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

package com.hortonworks.spark.atlas.sql

import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation

import scala.util.Try

import org.apache.atlas.model.instance.AtlasEntity
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{FileRelation, FileSourceScanExec}
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, LoadDataCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.command.{CreateViewCommand, CreateDataSourceTableAsSelectCommand, LoadDataCommand}
import org.apache.spark.sql.execution.datasources.{InsertIntoHadoopFsRelationCommand, LogicalRelation}
import org.apache.spark.sql.hive.execution._

import com.hortonworks.spark.atlas.AtlasClientConf
Expand All @@ -37,61 +39,52 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {

object InsertIntoHiveTableHarvester extends Harvester[InsertIntoHiveTable] {
override def harvest(node: InsertIntoHiveTable, qd: QueryDetail): Seq[AtlasEntity] = {
val child = node.query.asInstanceOf[Project].child
child match {
// case 3. INSERT INTO VALUES
case _: LocalRelation =>
// source tables entities
val tChildren = node.query.collectLeaves()
val inputsEntities = tChildren.map {
case r: HiveTableRelation => tableToEntities(r.tableMeta)
case v: View => tableToEntities(v.desc)
case e =>
logWarn(s"Missing unknown leaf node: $e")
Seq.empty
}

// case 4. INSERT INTO SELECT
case s: SubqueryAlias =>
// Prepare input entities
val fromTableIdentifier: Option[TableIdentifier] = s.child match {
case r: View => Some(r.desc.identifier)
case r: HiveTableRelation => Some(r.tableMeta.identifier)
case _ => None
}
require(fromTableIdentifier.isDefined, s"Fail to get input table from node $node")
val inputEntities = prepareEntities(fromTableIdentifier.get)

// Prepare output entities
val outTableIdentifier = node.table.identifier
val outputEntities = prepareEntities(outTableIdentifier)

// Create process entity
val inputTableEntity = List(inputEntities.head)
val outputTableEntity = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTableEntity, outputTableEntity)

Seq(pEntity) ++ inputEntities ++ outputEntities

// case 8. Multiple fromTables
case c: Filter =>
// Prepare input entities
val lChild = c.child.asInstanceOf[Join].left.asInstanceOf[SubqueryAlias]
.child.asInstanceOf[HiveTableRelation].tableMeta.identifier
val lInputs = prepareEntities(lChild)
val rChild = c.child.asInstanceOf[Join].right.asInstanceOf[SubqueryAlias]
.child.asInstanceOf[HiveTableRelation].tableMeta.identifier
val rInputs = prepareEntities(rChild)
val inputsEntities = lInputs ++ rInputs

// Prepare output entities
val outTableIdentifier = node.table.identifier
val outputsEntities = prepareEntities(outTableIdentifier)

// Create process entity
val inputTableEntities = List(lInputs.head, rInputs.head)
val outputTableEntities = List(outputsEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTableEntities, outputTableEntities)

Seq(pEntity) ++ inputsEntities ++ outputsEntities

case _ =>
// new table entity
val outputEntities = tableToEntities(node.table)

// create process entity
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities)
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}

object InsertIntoHadoopFsRelationHarvester extends Harvester[InsertIntoHadoopFsRelationCommand] {
override def harvest(node: InsertIntoHadoopFsRelationCommand, qd: QueryDetail): Seq[AtlasEntity] = {
// source tables entities
val tChildren = node.query.collectLeaves()
val inputsEntities = tChildren.map {
case r: HiveTableRelation => tableToEntities(r.tableMeta)
case v: View => tableToEntities(v.desc)
case l: LogicalRelation if l.relation.isInstanceOf[FileRelation] =>
l.catalogTable.map(tableToEntities(_)).getOrElse(
l.relation.asInstanceOf[FileRelation].inputFiles.map(external.pathToEntity).toSeq)
case e =>
logWarn(s"Missing unknown leaf node: $e")
Seq.empty
}

// new table entity
val outputEntities = tableToEntities(node.catalogTable.get)

// create process entity
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities)
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}

Expand Down Expand Up @@ -195,6 +188,28 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
}
}

object CreateViewHarvester extends Harvester[CreateViewCommand] {
override def harvest(node: CreateViewCommand, qd: QueryDetail): Seq[AtlasEntity] = {
// from table entities
val child = node.child.asInstanceOf[Project].child
val fromTableIdentifier = child.asInstanceOf[UnresolvedRelation].tableIdentifier
val inputEntities = prepareEntities(fromTableIdentifier)

// new view entities
val viewIdentifier = node.name
val outputEntities = prepareEntities(viewIdentifier)

// create process entity
val inputTableEntity = List(inputEntities.head)
val outputTableEntity = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTableEntity, outputTableEntity)

Seq(pEntity) ++ inputEntities ++ outputEntities
}
}


private def prepareEntities(tableIdentifier: TableIdentifier): Seq[AtlasEntity] = {
val tableName = tableIdentifier.table
val dbName = tableIdentifier.database.getOrElse("default")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.util.QueryExecutionListener

Expand Down Expand Up @@ -73,29 +74,45 @@ class SparkExecutionPlanTracker(
while (!stopped) {
try {
Option(qeQueue.poll(3000, TimeUnit.MILLISECONDS)).foreach { qd =>
val entities = qd.qe.sparkPlan.collect { case p: LeafExecNode => p }
.flatMap {
val entities = qd.qe.sparkPlan.collect {
case p: DataWritingCommandExec => p
case p: LeafExecNode => p
}.flatMap {
case r: ExecutedCommandExec =>
r.cmd match {
case c: InsertIntoHiveTable =>
logDebug(s"INSERT query ${qd.qe}")
CommandsHarvester.InsertIntoHiveTableHarvester.harvest(c, qd)

// Case 6. CREATE TABLE AS SELECT
case c: CreateHiveTableAsSelectCommand =>
logDebug(s"CREATE TABLE AS SELECT query: ${qd.qe}")
CommandsHarvester.CreateHiveTableAsSelectHarvester.harvest(c, qd)

case c: LoadDataCommand =>
// Case 1. LOAD DATA LOCAL INPATH (from local)
// Case 2. LOAD DATA INPATH (from HDFS)
logDebug(s"LOAD DATA [LOCAL] INPATH (${c.path}) ${c.table}")
CommandsHarvester.LoadDataHarvester.harvest(c, qd)

// Case 6. CREATE TABLE AS SELECT
case c: CreateHiveTableAsSelectCommand =>
logDebug(s"CREATE TABLE AS SELECT query: ${qd.qe}")
CommandsHarvester.CreateHiveTableAsSelectHarvester.harvest(c, qd)

case c: CreateDataSourceTableAsSelectCommand =>
logDebug(s"CREATE TABLE USING xx AS SELECT query: ${qd.qe}")
CommandsHarvester.CreateDataSourceTableAsSelectHarvester.harvest(c, qd)

case c: CreateViewCommand =>
logDebug(s"CREATE VIEW AS SELECT query: ${qd.qe}")
CommandsHarvester.CreateViewHarvester.harvest(c, qd)

case _ =>
Seq.empty
}

case r: DataWritingCommandExec =>
r.cmd match {
case c: InsertIntoHiveTable =>
logDebug(s"INSERT INTO HIVE TABLE query ${qd.qe}")
CommandsHarvester.InsertIntoHiveTableHarvester.harvest(c, qd)

case c: InsertIntoHadoopFsRelationCommand =>
logDebug(s"INSERT INTO SPARK TABLE query ${qd.qe}")
CommandsHarvester.InsertIntoHadoopFsRelationHarvester.harvest(c, qd)

case _ =>
Seq.empty
}
Expand Down

0 comments on commit e96d7ff

Please sign in to comment.