Skip to content

Commit

Permalink
Add supports to Insert Into (#9)
Browse files Browse the repository at this point in the history
Changes:
Merge code supporting "Insert Into" and "Create View xx As Select * From xx" into master branch. 

Testing:
New unit tests were added.
I manually tested this PR in HDP cluster:
Case1:
    sql("create table table_parquet_200 (a int) using parquet")
    sql("insert into  table table_parquet_200 values (200)")
    sql("create table table_parquet_300 (a int) using parquet")
    sql("INSERT INTO table_parquet_300  SELECT * FROM table_parquet_200")

Case2:
    sql("CREATE TABLE t00000(a int)")
    sql("INSERT INTO t00000 VALUES (1)")
    sql("CREATE TABLE t10000(a int)")
    sql("INSERT INTO t10000 SELECT * FROM t00000")

Case 3: 
sql("CREATE TABLE sourceTable(a int)")
sql("INSERT INTO sourceTable VALUES (1)")
sql("CREATE view s_view1 as select * from sourceTable")

* Add supports to Insert Into

* Add unit tests

* reorder imports

* Fix failure

* Fix review comments

* Add unit tests for CreateViewHarvester

* Fix a small issue
  • Loading branch information
weiqingy authored Feb 10, 2018
1 parent f0f8cc8 commit ca6cda9
Show file tree
Hide file tree
Showing 4 changed files with 394 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

package com.hortonworks.spark.atlas.sql

import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation

import scala.util.Try

import org.apache.atlas.model.instance.AtlasEntity
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{FileRelation, FileSourceScanExec}
import org.apache.spark.sql.execution.command.{CreateDataSourceTableAsSelectCommand, LoadDataCommand}
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.command.{CreateViewCommand, CreateDataSourceTableAsSelectCommand, LoadDataCommand}
import org.apache.spark.sql.execution.datasources.{LogicalRelation, InsertIntoHadoopFsRelationCommand}
import org.apache.spark.sql.hive.execution._

import com.hortonworks.spark.atlas.AtlasClientConf
Expand All @@ -37,61 +39,51 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {

object InsertIntoHiveTableHarvester extends Harvester[InsertIntoHiveTable] {
override def harvest(node: InsertIntoHiveTable, qd: QueryDetail): Seq[AtlasEntity] = {
val child = node.query.asInstanceOf[Project].child
child match {
// case 3. INSERT INTO VALUES
case _: LocalRelation =>
// source tables entities
val tChildren = node.query.collectLeaves()
val inputsEntities = tChildren.map {
case r: HiveTableRelation => tableToEntities(r.tableMeta)
case v: View => tableToEntities(v.desc)
case l: LogicalRelation => tableToEntities(l.catalogTable.get)
case e =>
logWarn(s"Missing unknown leaf node: $e")
Seq.empty
}

// case 4. INSERT INTO SELECT
case s: SubqueryAlias =>
// Prepare input entities
val fromTableIdentifier: Option[TableIdentifier] = s.child match {
case r: View => Some(r.desc.identifier)
case r: HiveTableRelation => Some(r.tableMeta.identifier)
case _ => None
}
require(fromTableIdentifier.isDefined, s"Fail to get input table from node $node")
val inputEntities = prepareEntities(fromTableIdentifier.get)

// Prepare output entities
val outTableIdentifier = node.table.identifier
val outputEntities = prepareEntities(outTableIdentifier)

// Create process entity
val inputTableEntity = List(inputEntities.head)
val outputTableEntity = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTableEntity, outputTableEntity)

Seq(pEntity) ++ inputEntities ++ outputEntities

// case 8. Multiple fromTables
case c: Filter =>
// Prepare input entities
val lChild = c.child.asInstanceOf[Join].left.asInstanceOf[SubqueryAlias]
.child.asInstanceOf[HiveTableRelation].tableMeta.identifier
val lInputs = prepareEntities(lChild)
val rChild = c.child.asInstanceOf[Join].right.asInstanceOf[SubqueryAlias]
.child.asInstanceOf[HiveTableRelation].tableMeta.identifier
val rInputs = prepareEntities(rChild)
val inputsEntities = lInputs ++ rInputs

// Prepare output entities
val outTableIdentifier = node.table.identifier
val outputsEntities = prepareEntities(outTableIdentifier)

// Create process entity
val inputTableEntities = List(lInputs.head, rInputs.head)
val outputTableEntities = List(outputsEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTableEntities, outputTableEntities)

Seq(pEntity) ++ inputsEntities ++ outputsEntities

case _ =>
// new table entity
val outputEntities = tableToEntities(node.table)

// create process entity
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities)
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}

object InsertIntoHadoopFsRelationHarvester extends Harvester[InsertIntoHadoopFsRelationCommand] {
override def harvest(node: InsertIntoHadoopFsRelationCommand, qd: QueryDetail): Seq[AtlasEntity] = {
// source tables entities
val tChildren = node.query.collectLeaves()
val inputsEntities = tChildren.map {
case r: HiveTableRelation => tableToEntities(r.tableMeta)
case v: View => tableToEntities(v.desc)
case l: LogicalRelation => tableToEntities(l.catalogTable.get)
case e =>
logWarn(s"Missing unknown leaf node: $e")
Seq.empty
}

// new table entity
val outputEntities = tableToEntities(node.catalogTable.get)

// create process entity
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities)
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}

Expand Down Expand Up @@ -195,6 +187,28 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
}
}

object CreateViewHarvester extends Harvester[CreateViewCommand] {
override def harvest(node: CreateViewCommand, qd: QueryDetail): Seq[AtlasEntity] = {
// from table entities
val child = node.child.asInstanceOf[Project].child
val fromTableIdentifier = child.asInstanceOf[UnresolvedRelation].tableIdentifier
val inputEntities = prepareEntities(fromTableIdentifier)

// new view entities
val viewIdentifier = node.name
val outputEntities = prepareEntities(viewIdentifier)

// create process entity
val inputTableEntity = List(inputEntities.head)
val outputTableEntity = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTableEntity, outputTableEntity)

Seq(pEntity) ++ inputEntities ++ outputEntities
}
}


private def prepareEntities(tableIdentifier: TableIdentifier): Seq[AtlasEntity] = {
val tableName = tableIdentifier.table
val dbName = tableIdentifier.database.getOrElse("default")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.util.control.NonFatal

import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.util.QueryExecutionListener

Expand Down Expand Up @@ -73,29 +74,45 @@ class SparkExecutionPlanTracker(
while (!stopped) {
try {
Option(qeQueue.poll(3000, TimeUnit.MILLISECONDS)).foreach { qd =>
val entities = qd.qe.sparkPlan.collect { case p: LeafExecNode => p }
.flatMap {
val entities = qd.qe.sparkPlan.collect {
case p: DataWritingCommandExec => p
case p: LeafExecNode => p
}.flatMap {
case r: ExecutedCommandExec =>
r.cmd match {
case c: InsertIntoHiveTable =>
logDebug(s"INSERT query ${qd.qe}")
CommandsHarvester.InsertIntoHiveTableHarvester.harvest(c, qd)

// Case 6. CREATE TABLE AS SELECT
case c: CreateHiveTableAsSelectCommand =>
logDebug(s"CREATE TABLE AS SELECT query: ${qd.qe}")
CommandsHarvester.CreateHiveTableAsSelectHarvester.harvest(c, qd)

case c: LoadDataCommand =>
// Case 1. LOAD DATA LOCAL INPATH (from local)
// Case 2. LOAD DATA INPATH (from HDFS)
logDebug(s"LOAD DATA [LOCAL] INPATH (${c.path}) ${c.table}")
CommandsHarvester.LoadDataHarvester.harvest(c, qd)

// Case 6. CREATE TABLE AS SELECT
case c: CreateHiveTableAsSelectCommand =>
logDebug(s"CREATE TABLE AS SELECT query: ${qd.qe}")
CommandsHarvester.CreateHiveTableAsSelectHarvester.harvest(c, qd)

case c: CreateDataSourceTableAsSelectCommand =>
logDebug(s"CREATE TABLE USING xx AS SELECT query: ${qd.qe}")
CommandsHarvester.CreateDataSourceTableAsSelectHarvester.harvest(c, qd)

case c: CreateViewCommand =>
logDebug(s"CREATE VIEW AS SELECT query: ${qd.qe}")
CommandsHarvester.CreateViewHarvester.harvest(c, qd)

case _ =>
Seq.empty
}

case r: DataWritingCommandExec =>
r.cmd match {
case c: InsertIntoHiveTable =>
logDebug(s"INSERT INTO HIVE TABLE query ${qd.qe}")
CommandsHarvester.InsertIntoHiveTableHarvester.harvest(c, qd)

case c: InsertIntoHadoopFsRelationCommand =>
logDebug(s"INSERT INTO SPARK TABLE query ${qd.qe}")
CommandsHarvester.InsertIntoHadoopFsRelationHarvester.harvest(c, qd)

case _ =>
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hortonworks.spark.atlas.sql

import java.io.File
import java.util

import scala.collection.JavaConverters._
import scala.util.Random

import org.apache.atlas.AtlasClient
import org.apache.atlas.model.instance.AtlasEntity
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.execution.command.{ExecutedCommandExec, CreateViewCommand}
import org.scalatest.{BeforeAndAfterAll, Matchers, FunSuite}

import com.hortonworks.spark.atlas.types.external

class CreateViewHarvesterSuite extends FunSuite with Matchers with BeforeAndAfterAll {
private var sparkSession: SparkSession = _
private val sourceTblName = "source_" + Random.nextInt(100000)
private val destinationViewName = "destination_" + Random.nextInt(100000)

override def beforeAll(): Unit = {
super.beforeAll()
sparkSession = SparkSession.builder()
.master("local")
.config("spark.sql.catalogImplementation", "hive")
.getOrCreate()

sparkSession.sql(s"CREATE TABLE $sourceTblName (name string)")
sparkSession.sql(s"INSERT INTO TABLE $sourceTblName VALUES ('lucy'), ('tom')")
}

override def afterAll(): Unit = {
sparkSession.stop()
SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
sparkSession = null

FileUtils.deleteDirectory(new File("metastore_db"))
FileUtils.deleteDirectory(new File("spark-warehouse"))

super.afterAll()
}

test("CREATE VIEW FROM TABLE") {
val qe = sparkSession.sql(s"CREATE VIEW $destinationViewName " +
s"AS SELECT * FROM $sourceTblName").queryExecution
val qd = QueryDetail(qe, 0L, 0L)

assert(qe.sparkPlan.isInstanceOf[ExecutedCommandExec])
val node = qe.sparkPlan.asInstanceOf[ExecutedCommandExec]
assert(node.cmd.isInstanceOf[CreateViewCommand])
val cmd = node.cmd.asInstanceOf[CreateViewCommand]

val entities = CommandsHarvester.CreateViewHarvester.harvest(cmd, qd)
val pEntity = entities.head

assert(pEntity.getAttribute("inputs").isInstanceOf[util.Collection[_]])
val inputs = pEntity.getAttribute("inputs").asInstanceOf[util.Collection[AtlasEntity]]
inputs.size() should be (1)

val inputTbl = inputs.asScala.head
inputTbl.getTypeName should be (external.HIVE_TABLE_TYPE_STRING)
inputTbl.getAttribute("name") should be (sourceTblName)
inputTbl.getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME).toString should be (
s"default.$sourceTblName@primary")

assert(pEntity.getAttribute("outputs").isInstanceOf[util.Collection[_]])
val outputs = pEntity.getAttribute("outputs").asInstanceOf[util.Collection[AtlasEntity]]
outputs.size() should be (1)
val outputView = outputs.asScala.head
outputView.getAttribute("name") should be (destinationViewName)
outputView.getAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME).toString should endWith (
s"default.$destinationViewName")
}
}
Loading

0 comments on commit ca6cda9

Please sign in to comment.