Skip to content

Commit

Permalink
Make user SQL queries shown on process entities
Browse files Browse the repository at this point in the history
  • Loading branch information
weiqingy committed Feb 8, 2018
1 parent e96d7ff commit 83c6cbd
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities)
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Some(SQLQuery.sqlQuery))
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}
Expand All @@ -83,7 +83,7 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities)
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Some(SQLQuery.sqlQuery))
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}
Expand Down Expand Up @@ -113,7 +113,7 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities)
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Some(SQLQuery.sqlQuery))
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}
Expand Down Expand Up @@ -142,7 +142,7 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities)
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Some(SQLQuery.sqlQuery))
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}
Expand All @@ -152,7 +152,7 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
val pathEntity = external.pathToEntity(node.path)
val outputEntities = prepareEntities(node.table)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, List(pathEntity), List(outputEntities.head))
qd.qe, qd.executionId, qd.executionTime, List(pathEntity), List(outputEntities.head), Some(SQLQuery.sqlQuery))
Seq(pEntity, pathEntity) ++ outputEntities
}
}
Expand Down Expand Up @@ -183,7 +183,7 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {

val inputs = inputsEntities.flatMap(_.headOption).toList
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputs, List(destEntity))
qd.qe, qd.executionId, qd.executionTime, inputs, List(destEntity), Some(SQLQuery.sqlQuery))
Seq(pEntity, destEntity) ++ inputsEntities.flatten
}
}
Expand All @@ -203,7 +203,7 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
val inputTableEntity = List(inputEntities.head)
val outputTableEntity = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTableEntity, outputTableEntity)
qd.qe, qd.executionId, qd.executionTime, inputTableEntity, outputTableEntity, Some(SQLQuery.sqlQuery))

Seq(pEntity) ++ inputEntities ++ outputEntities
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hortonworks.spark.atlas.sql

import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.types.{DataType, StructType}
import org.apache.spark.sql.{SparkSession, SparkSessionExtensions}


class SparkExtension extends (SparkSessionExtensions => Unit) {
def apply(e: SparkSessionExtensions): Unit = {
e.injectParser(SparkAtlasConnectorParser)
}
}

case class SparkAtlasConnectorParser(spark: SparkSession, delegate: ParserInterface) extends ParserInterface {
override def parsePlan(sqlText: String): LogicalPlan = {
SQLQuery.sqlQuery = sqlText
delegate.parsePlan(sqlText)
}

override def parseExpression(sqlText: String): Expression =
delegate.parseExpression(sqlText)

override def parseTableIdentifier(sqlText: String): TableIdentifier =
delegate.parseTableIdentifier(sqlText)

override def parseFunctionIdentifier(sqlText: String): FunctionIdentifier =
delegate.parseFunctionIdentifier(sqlText)

override def parseTableSchema(sqlText: String): StructType =
delegate.parseTableSchema(sqlText)

override def parseDataType(sqlText: String): DataType =
delegate.parseDataType(sqlText)
}

object SQLQuery {
var sqlQuery = ""
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ trait AtlasEntityUtils {
executionId: Long,
executionTime: Long,
inputs: List[AtlasEntity],
outputs: List[AtlasEntity]): AtlasEntity =
internal.sparkProcessToEntity(qe, executionId, executionTime, inputs, outputs)
outputs: List[AtlasEntity],
query: Option[String] = None): AtlasEntity =
internal.sparkProcessToEntity(qe, executionId, executionTime, inputs, outputs, query)

def processUniqueAttribute(executionId: Long): String =
internal.sparkProcessUniqueAttribute(executionId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,14 @@ object internal extends Logging {
executionId: Long,
executionTime: Long,
inputs: List[AtlasEntity],
outputs: List[AtlasEntity]): AtlasEntity = {
outputs: List[AtlasEntity],
query: Option[String] = None): AtlasEntity = {
val entity = new AtlasEntity(metadata.PROCESS_TYPE_STRING)
val name = query.getOrElse(sparkProcessUniqueAttribute(executionId))

entity.setAttribute(
AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, sparkProcessUniqueAttribute(executionId))
entity.setAttribute(AtlasClient.NAME, sparkProcessUniqueAttribute(executionId))
entity.setAttribute(AtlasClient.NAME, name)
entity.setAttribute("executionId", executionId)
entity.setAttribute("currUser", SparkUtils.currUser())
entity.setAttribute("remoteUser", SparkUtils.currSessionUser(qe))
Expand Down

0 comments on commit 83c6cbd

Please sign in to comment.