Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add supports to Insert Into #9

Merged
merged 7 commits into from
Feb 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

package com.hortonworks.spark.atlas.sql

import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation

import scala.util.Try

import org.apache.atlas.model.instance.AtlasEntity
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{FileRelation, FileSourceScanExec}
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, LoadDataCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.command.{CreateViewCommand, CreateDataSourceTableAsSelectCommand, LoadDataCommand}
import org.apache.spark.sql.execution.datasources.{LogicalRelation, InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.hive.execution._

import com.hortonworks.spark.atlas.AtlasClientConf
Expand All @@ -37,61 +39,51 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {

object InsertIntoHiveTableHarvester extends Harvester[InsertIntoHiveTable] {
override def harvest(node: InsertIntoHiveTable, qd: QueryDetail): Seq[AtlasEntity] = {
val child = node.query.asInstanceOf[Project].child
child match {
// case 3. INSERT INTO VALUES
case _: LocalRelation =>
// source tables entities
val tChildren = node.query.collectLeaves()
val inputsEntities = tChildren.map {
case r: HiveTableRelation => tableToEntities(r.tableMeta)
case v: View => tableToEntities(v.desc)
case l: LogicalRelation => tableToEntities(l.catalogTable.get)
case e =>
logWarn(s"Missing unknown leaf node: $e")
Seq.empty
}

// case 4. INSERT INTO SELECT
case s: SubqueryAlias =>
// Prepare input entities
val fromTableIdentifier: Option[TableIdentifier] = s.child match {
case r: View => Some(r.desc.identifier)
case r: HiveTableRelation => Some(r.tableMeta.identifier)
case _ => None
}
require(fromTableIdentifier.isDefined, s"Fail to get input table from node $node")
val inputEntities = prepareEntities(fromTableIdentifier.get)

// Prepare output entities
val outTableIdentifier = node.table.identifier
val outputEntities = prepareEntities(outTableIdentifier)

// Create process entity
val inputTableEntity = List(inputEntities.head)
val outputTableEntity = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTableEntity, outputTableEntity)

Seq(pEntity) ++ inputEntities ++ outputEntities

// case 8. Multiple fromTables
case c: Filter =>
// Prepare input entities
val lChild = c.child.asInstanceOf[Join].left.asInstanceOf[SubqueryAlias]
.child.asInstanceOf[HiveTableRelation].tableMeta.identifier
val lInputs = prepareEntities(lChild)
val rChild = c.child.asInstanceOf[Join].right.asInstanceOf[SubqueryAlias]
.child.asInstanceOf[HiveTableRelation].tableMeta.identifier
val rInputs = prepareEntities(rChild)
val inputsEntities = lInputs ++ rInputs

// Prepare output entities
val outTableIdentifier = node.table.identifier
val outputsEntities = prepareEntities(outTableIdentifier)

// Create process entity
val inputTableEntities = List(lInputs.head, rInputs.head)
val outputTableEntities = List(outputsEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTableEntities, outputTableEntities)

Seq(pEntity) ++ inputsEntities ++ outputsEntities

case _ =>
// new table entity
val outputEntities = tableToEntities(node.table)

// create process entity
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities)
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}

object InsertIntoHadoopFsRelationHarvester extends Harvester[InsertIntoHadoopFsRelationCommand] {
override def harvest(node: InsertIntoHadoopFsRelationCommand, qd: QueryDetail): Seq[AtlasEntity] = {
// source tables entities
val tChildren = node.query.collectLeaves()
val inputsEntities = tChildren.map {
case r: HiveTableRelation => tableToEntities(r.tableMeta)
case v: View => tableToEntities(v.desc)
case l: LogicalRelation => tableToEntities(l.catalogTable.get)
case e =>
logWarn(s"Missing unknown leaf node: $e")
Seq.empty
}

// new table entity
val outputEntities = tableToEntities(node.catalogTable.get)

// create process entity
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities)
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}

Expand Down Expand Up @@ -195,6 +187,28 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
}
}

object CreateViewHarvester extends Harvester[CreateViewCommand] {
override def harvest(node: CreateViewCommand, qd: QueryDetail): Seq[AtlasEntity] = {
// from table entities
val child = node.child.asInstanceOf[Project].child
val fromTableIdentifier = child.asInstanceOf[UnresolvedRelation].tableIdentifier
val inputEntities = prepareEntities(fromTableIdentifier)

// new view entities
val viewIdentifier = node.name
val outputEntities = prepareEntities(viewIdentifier)

// create process entity
val inputTableEntity = List(inputEntities.head)
val outputTableEntity = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTableEntity, outputTableEntity)

Seq(pEntity) ++ inputEntities ++ outputEntities
}
}


private def prepareEntities(tableIdentifier: TableIdentifier): Seq[AtlasEntity] = {
val tableName = tableIdentifier.table
val dbName = tableIdentifier.database.getOrElse("default")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.util.QueryExecutionListener

Expand Down Expand Up @@ -73,29 +74,45 @@ class SparkExecutionPlanTracker(
while (!stopped) {
try {
Option(qeQueue.poll(3000, TimeUnit.MILLISECONDS)).foreach { qd =>
val entities = qd.qe.sparkPlan.collect { case p: LeafExecNode => p }
.flatMap {
val entities = qd.qe.sparkPlan.collect {
case p: DataWritingCommandExec => p
case p: LeafExecNode => p
}.flatMap {
case r: ExecutedCommandExec =>
r.cmd match {
case c: InsertIntoHiveTable =>
logDebug(s"INSERT query ${qd.qe}")
CommandsHarvester.InsertIntoHiveTableHarvester.harvest(c, qd)

// Case 6. CREATE TABLE AS SELECT
case c: CreateHiveTableAsSelectCommand =>
logDebug(s"CREATE TABLE AS SELECT query: ${qd.qe}")
CommandsHarvester.CreateHiveTableAsSelectHarvester.harvest(c, qd)

case c: LoadDataCommand =>
// Case 1. LOAD DATA LOCAL INPATH (from local)
// Case 2. LOAD DATA INPATH (from HDFS)
logDebug(s"LOAD DATA [LOCAL] INPATH (${c.path}) ${c.table}")
CommandsHarvester.LoadDataHarvester.harvest(c, qd)

// Case 6. CREATE TABLE AS SELECT
case c: CreateHiveTableAsSelectCommand =>
logDebug(s"CREATE TABLE AS SELECT query: ${qd.qe}")
CommandsHarvester.CreateHiveTableAsSelectHarvester.harvest(c, qd)

case c: CreateDataSourceTableAsSelectCommand =>
logDebug(s"CREATE TABLE USING xx AS SELECT query: ${qd.qe}")
CommandsHarvester.CreateDataSourceTableAsSelectHarvester.harvest(c, qd)

case c: CreateViewCommand =>
Copy link
Contributor

@dongjoon-hyun dongjoon-hyun Feb 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test case for this? Otherwise, can we add this in another PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh, not yet. I'll add unit tests for CreateViewCommand

logDebug(s"CREATE VIEW AS SELECT query: ${qd.qe}")
CommandsHarvester.CreateViewHarvester.harvest(c, qd)

case _ =>
Seq.empty
}

case r: DataWritingCommandExec =>
r.cmd match {
case c: InsertIntoHiveTable =>
logDebug(s"INSERT INTO HIVE TABLE query ${qd.qe}")
CommandsHarvester.InsertIntoHiveTableHarvester.harvest(c, qd)

case c: InsertIntoHadoopFsRelationCommand =>
logDebug(s"INSERT INTO SPARK TABLE query ${qd.qe}")
CommandsHarvester.InsertIntoHadoopFsRelationHarvester.harvest(c, qd)

case _ =>
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hortonworks.spark.atlas.sql

import java.io.File
import java.util

import scala.collection.JavaConverters._
import scala.util.Random

import org.apache.atlas.AtlasClient
import org.apache.atlas.model.instance.AtlasEntity
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.command.{ExecutedCommandExec, CreateViewCommand}
import org.scalatest.{BeforeAndAfterAll, Matchers, FunSuite}

import com.hortonworks.spark.atlas.types.external

class CreateViewHarvesterSuite extends FunSuite with Matchers with BeforeAndAfterAll {
private var sparkSession: SparkSession = _
private val sourceTblName = "source_" + Random.nextInt(100000)
private val destinationViewName = "destination_" + Random.nextInt(100000)

override def beforeAll(): Unit = {
super.beforeAll()
sparkSession = SparkSession.builder()
.master("local")
.config("spark.sql.catalogImplementation", "hive")
.getOrCreate()

sparkSession.sql(s"CREATE TABLE $sourceTblName (name string)")
sparkSession.sql(s"INSERT INTO TABLE $sourceTblName VALUES ('lucy'), ('tom')")
}

override def afterAll(): Unit = {
sparkSession.stop()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
sparkSession = null

FileUtils.deleteDirectory(new File("metastore_db"))
FileUtils.deleteDirectory(new File("spark-warehouse"))

super.afterAll()
}

test("CREATE VIEW FROM TABLE") {
val qe = sparkSession.sql(s"CREATE VIEW $destinationViewName " +
s"AS SELECT * FROM $sourceTblName").queryExecution
val qd = QueryDetail(qe, 0L, 0L)

assert(qe.sparkPlan.isInstanceOf[ExecutedCommandExec])
val node = qe.sparkPlan.asInstanceOf[ExecutedCommandExec]
assert(node.cmd.isInstanceOf[CreateViewCommand])
val cmd = node.cmd.asInstanceOf[CreateViewCommand]

val entities = CommandsHarvester.CreateViewHarvester.harvest(cmd, qd)
val pEntity = entities.head

assert(pEntity.getAttribute("inputs").isInstanceOf[util.Collection[_]])
val inputs = pEntity.getAttribute("inputs").asInstanceOf[util.Collection[AtlasEntity]]
inputs.size() should be (1)

val inputTbl = inputs.asScala.head
inputTbl.getTypeName should be (external.HIVE_TABLE_TYPE_STRING)
inputTbl.getAttribute("name") should be (sourceTblName)
inputTbl.getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME).toString should be (
s"default.$sourceTblName@primary")

assert(pEntity.getAttribute("outputs").isInstanceOf[util.Collection[_]])
val outputs = pEntity.getAttribute("outputs").asInstanceOf[util.Collection[AtlasEntity]]
outputs.size() should be (1)
val outputView = outputs.asScala.head
outputView.getAttribute("name") should be (destinationViewName)
outputView.getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME).toString should endWith (
s"default.$destinationViewName")
}
}
Loading