Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SAC-21] Add a scala style and fix style errors #36

Merged
merged 1 commit into from
Mar 8, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ notifications:
email: false

install:
- mvn -q clean package
- mvn -q clean scalastyle:check package
24 changes: 24 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,30 @@
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>

<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<configLocation>scalastyle-config.xml</configLocation>
<outputFile>${basedir}/target/scalastyle-output.xml</outputFile>
<inputEncoding>${project.build.sourceEncoding}</inputEncoding>
<outputEncoding>${project.reporting.outputEncoding}</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
350 changes: 350 additions & 0 deletions scalastyle-config.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
override val conf: AtlasClientConf = new AtlasClientConf

// Spark HBase Connector
private val HBASE_RELATION_CLASS_NAME = "org.apache.spark.sql.execution.datasources.hbase.HBaseRelation"
private val HBASE_RELATION_CLASS_NAME =
"org.apache.spark.sql.execution.datasources.hbase.HBaseRelation"

// Load HBaseRelation class
lazy val maybeClazz: Option[Class[_]] = {
Expand Down Expand Up @@ -71,14 +72,15 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
// create process entity
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, inputTablesEntities,
outputTableEntities, Option(SQLQuery.get()))
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}

object InsertIntoHadoopFsRelationHarvester extends Harvester[InsertIntoHadoopFsRelationCommand] {
override def harvest(node: InsertIntoHadoopFsRelationCommand, qd: QueryDetail): Seq[AtlasEntity] = {
override def harvest(node: InsertIntoHadoopFsRelationCommand, qd: QueryDetail)
: Seq[AtlasEntity] = {
// source tables/files entities
val tChildren = node.query.collectLeaves()
var isFiles = false
Expand Down Expand Up @@ -106,8 +108,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
val inputTablesEntities = if (isFiles) inputsEntities.flatten.toList
else inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, inputTablesEntities,
outputTableEntities, Option(SQLQuery.get()))
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}
Expand Down Expand Up @@ -140,8 +142,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
// create process entity
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, inputTablesEntities,
outputTableEntities, Option(SQLQuery.get()))
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}
Expand All @@ -166,8 +168,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
val outputEntities = tableToEntities(node.table)
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, inputTablesEntities,
outputTableEntities, Option(SQLQuery.get()))

Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
Expand All @@ -177,8 +179,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
override def harvest(node: LoadDataCommand, qd: QueryDetail): Seq[AtlasEntity] = {
val pathEntity = external.pathToEntity(node.path)
val outputEntities = prepareEntities(node.table)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, List(pathEntity), List(outputEntities.head), Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, List(pathEntity),
List(outputEntities.head), Option(SQLQuery.get()))
Seq(pEntity, pathEntity) ++ outputEntities
}
}
Expand Down Expand Up @@ -228,8 +230,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
// create process entity
val inputTableEntity = List(inputEntities.head)
val outputTableEntity = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTableEntity, outputTableEntity, Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, inputTableEntity,
outputTableEntity, Option(SQLQuery.get()))

Seq(pEntity) ++ inputEntities ++ outputEntities
}
Expand Down Expand Up @@ -277,8 +279,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
// create process entity
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = outputEntities.toList
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, inputTablesEntities,
outputTableEntities, Option(SQLQuery.get()))
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,11 @@ class SparkExecutionPlanTracker(
case sw: KafkaStreamWriter => KafkaHarvester.harvest(sw, r, qd)
case _ => Seq.empty
}
}catch {
case e: NoSuchMethodException =>
logDebug(s"Can not get KafkaStreamWriter, so can not create Kafka topic entities: ${qd.qe}")
Seq.empty
} catch {
case _: NoSuchMethodException =>
logDebug("Can not get KafkaStreamWriter, so can not create Kafka topic " +
s"entities: ${qd.qe}")
Seq.empty
}

case _ =>
Expand All @@ -150,8 +151,8 @@ class SparkExecutionPlanTracker(
// Case 5. FROM ... INSERT (OVERWRITE) INTO t2 INSERT INTO t3
// CASE LLAP:
// case r: RowDataSourceScanExec
// if (r.relation.getClass.getCanonicalName.endsWith("dd")) =>
// println("close hive connection via " + r.relation.getClass.getCanonicalName)
// if (r.relation.getClass.getCanonicalName.endsWith("dd")) =>
// println("close hive connection via " + r.relation.getClass.getCanonicalName)

} ++ {
qd.qe.sparkPlan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class SparkExtension extends (SparkSessionExtensions => Unit) {
}
}

case class SparkAtlasConnectorParser(spark: SparkSession, delegate: ParserInterface) extends ParserInterface {
case class SparkAtlasConnectorParser(spark: SparkSession, delegate: ParserInterface)
extends ParserInterface {
override def parsePlan(sqlText: String): LogicalPlan = {
SQLQuery.set(sqlText)
delegate.parsePlan(sqlText)
Expand All @@ -56,5 +57,5 @@ case class SparkAtlasConnectorParser(spark: SparkSession, delegate: ParserInterf
object SQLQuery {
private[this] val sqlQuery = new ThreadLocal[String]
def get(): String = sqlQuery.get
def set(s: String) = sqlQuery.set(s)
def set(s: String): Unit = sqlQuery.set(s)
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ object external {
val HBASE_COLUMNFAMILY_STRING = "hbase_column_family"
val HBASE_COLUMN_STRING = "hbase_column"

def hbaseTableToEntity(cluster: String, tableName: String, nameSpace: String): Seq[AtlasEntity] = {
def hbaseTableToEntity(cluster: String, tableName: String, nameSpace: String)
: Seq[AtlasEntity] = {
val hbaseEntity = new AtlasEntity(HBASE_TABLE_STRING)
hbaseEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getTableQualifiedName(cluster, nameSpace, tableName))
Expand Down Expand Up @@ -116,7 +117,7 @@ object external {
val HIVE_COLUMN_TYPE_STRING = "hive_column"
val HIVE_TABLE_TYPE_STRING = "hive_table"

def hiveDbUniqueAttribute(cluster: String, db: String) = s"${db.toLowerCase}@$cluster"
def hiveDbUniqueAttribute(cluster: String, db: String): String = s"${db.toLowerCase}@$cluster"

def hiveDbToEntities(dbDefinition: CatalogDatabase, cluster: String): Seq[AtlasEntity] = {
val dbEntity = new AtlasEntity(HIVE_DB_TYPE_STRING)
Expand Down Expand Up @@ -190,7 +191,7 @@ object external {
cluster: String,
db: String,
table: String,
isTemporary: Boolean = false): String = {
isTemporary: Boolean = false): String = {
val tableName = if (isTemporary) {
if (SessionState.get() != null && SessionState.get().getSessionId != null) {
s"${table}_temp-${SessionState.get().getSessionId}"
Expand All @@ -215,9 +216,9 @@ object external {
val dbEntities = hiveDbToEntities(dbDefinition, cluster)
val sdEntities = hiveStorageDescToEntities(
tableDefinition.storage, cluster, db, table
/* isTempTable = false Spark doesn't support temp table*/)
/* isTempTable = false Spark doesn't support temp table */)
val schemaEntities = hiveSchemaToEntities(
tableDefinition.schema, cluster, db, table /*, isTempTable = false */)
tableDefinition.schema, cluster, db, table /* , isTempTable = false */)

val tblEntity = new AtlasEntity(HIVE_TABLE_TYPE_STRING)
tblEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ object KafkaHarvester extends AtlasEntityUtils with Logging {
case e: NoSuchMethodException =>
logDebug(s"Can not get topic, so can not create Kafka topic entities: ${qd.qe}")
}
val outputEntities = if (destTopic.isDefined)
external.kafkaToEntity(clusterName, destTopic.get) else Seq.empty
val outputEntities = if (destTopic.isDefined) {
external.kafkaToEntity(clusterName, destTopic.get)
} else {
Seq.empty
}

// create process entity
val pDescription = StringBuilder.newBuilder.append("Topics subscribed( ")
Expand Down