Skip to content

Commit

Permalink
[SAC-21] Add a scala style and fix style errors (#36)
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This PR enables scala style check in Travis.CI.

## How was this patch tested?

Pass Travis CI.
  • Loading branch information
dongjoon-hyun authored and weiqingy committed Mar 8, 2018
1 parent 03d130e commit 9bdfd6d
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 32 deletions.
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,4 @@ notifications:
email: false

install:
- mvn -q clean package
- mvn -q clean scalastyle:check package
24 changes: 24 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,30 @@
<artifactId>scalatest-maven-plugin</artifactId>
</plugin>

<plugin>
<groupId>org.scalastyle</groupId>
<artifactId>scalastyle-maven-plugin</artifactId>
<version>1.0.0</version>
<configuration>
<verbose>false</verbose>
<failOnViolation>true</failOnViolation>
<includeTestSourceDirectory>false</includeTestSourceDirectory>
<failOnWarning>false</failOnWarning>
<sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
<testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
<configLocation>scalastyle-config.xml</configLocation>
<outputFile>${basedir}/target/scalastyle-output.xml</outputFile>
<inputEncoding>${project.build.sourceEncoding}</inputEncoding>
<outputEncoding>${project.reporting.outputEncoding}</outputEncoding>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

Expand Down
350 changes: 350 additions & 0 deletions scalastyle-config.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
override val conf: AtlasClientConf = new AtlasClientConf

// Spark HBase Connector
private val HBASE_RELATION_CLASS_NAME = "org.apache.spark.sql.execution.datasources.hbase.HBaseRelation"
private val HBASE_RELATION_CLASS_NAME =
"org.apache.spark.sql.execution.datasources.hbase.HBaseRelation"

// Load HBaseRelation class
lazy val maybeClazz: Option[Class[_]] = {
Expand Down Expand Up @@ -71,14 +72,15 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
// create process entity
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, inputTablesEntities,
outputTableEntities, Option(SQLQuery.get()))
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}

object InsertIntoHadoopFsRelationHarvester extends Harvester[InsertIntoHadoopFsRelationCommand] {
override def harvest(node: InsertIntoHadoopFsRelationCommand, qd: QueryDetail): Seq[AtlasEntity] = {
override def harvest(node: InsertIntoHadoopFsRelationCommand, qd: QueryDetail)
: Seq[AtlasEntity] = {
// source tables/files entities
val tChildren = node.query.collectLeaves()
var isFiles = false
Expand Down Expand Up @@ -106,8 +108,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
val inputTablesEntities = if (isFiles) inputsEntities.flatten.toList
else inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, inputTablesEntities,
outputTableEntities, Option(SQLQuery.get()))
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}
Expand Down Expand Up @@ -140,8 +142,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
// create process entity
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, inputTablesEntities,
outputTableEntities, Option(SQLQuery.get()))
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}
Expand All @@ -166,8 +168,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
val outputEntities = tableToEntities(node.table)
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, inputTablesEntities,
outputTableEntities, Option(SQLQuery.get()))

Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
Expand All @@ -177,8 +179,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
override def harvest(node: LoadDataCommand, qd: QueryDetail): Seq[AtlasEntity] = {
val pathEntity = external.pathToEntity(node.path)
val outputEntities = prepareEntities(node.table)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, List(pathEntity), List(outputEntities.head), Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, List(pathEntity),
List(outputEntities.head), Option(SQLQuery.get()))
Seq(pEntity, pathEntity) ++ outputEntities
}
}
Expand Down Expand Up @@ -228,8 +230,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
// create process entity
val inputTableEntity = List(inputEntities.head)
val outputTableEntity = List(outputEntities.head)
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTableEntity, outputTableEntity, Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, inputTableEntity,
outputTableEntity, Option(SQLQuery.get()))

Seq(pEntity) ++ inputEntities ++ outputEntities
}
Expand Down Expand Up @@ -277,8 +279,8 @@ object CommandsHarvester extends AtlasEntityUtils with Logging {
// create process entity
val inputTablesEntities = inputsEntities.flatMap(_.headOption).toList
val outputTableEntities = outputEntities.toList
val pEntity = processToEntity(
qd.qe, qd.executionId, qd.executionTime, inputTablesEntities, outputTableEntities, Option(SQLQuery.get()))
val pEntity = processToEntity(qd.qe, qd.executionId, qd.executionTime, inputTablesEntities,
outputTableEntities, Option(SQLQuery.get()))
Seq(pEntity) ++ inputsEntities.flatten ++ outputEntities
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,11 @@ class SparkExecutionPlanTracker(
case sw: KafkaStreamWriter => KafkaHarvester.harvest(sw, r, qd)
case _ => Seq.empty
}
}catch {
case e: NoSuchMethodException =>
logDebug(s"Can not get KafkaStreamWriter, so can not create Kafka topic entities: ${qd.qe}")
Seq.empty
} catch {
case _: NoSuchMethodException =>
logDebug("Can not get KafkaStreamWriter, so can not create Kafka topic " +
s"entities: ${qd.qe}")
Seq.empty
}

case _ =>
Expand All @@ -150,8 +151,8 @@ class SparkExecutionPlanTracker(
// Case 5. FROM ... INSERT (OVERWRITE) INTO t2 INSERT INTO t3
// CASE LLAP:
// case r: RowDataSourceScanExec
// if (r.relation.getClass.getCanonicalName.endsWith("dd")) =>
// println("close hive connection via " + r.relation.getClass.getCanonicalName)
// if (r.relation.getClass.getCanonicalName.endsWith("dd")) =>
// println("close hive connection via " + r.relation.getClass.getCanonicalName)

} ++ {
qd.qe.sparkPlan match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ class SparkExtension extends (SparkSessionExtensions => Unit) {
}
}

case class SparkAtlasConnectorParser(spark: SparkSession, delegate: ParserInterface) extends ParserInterface {
case class SparkAtlasConnectorParser(spark: SparkSession, delegate: ParserInterface)
extends ParserInterface {
override def parsePlan(sqlText: String): LogicalPlan = {
SQLQuery.set(sqlText)
delegate.parsePlan(sqlText)
Expand All @@ -56,5 +57,5 @@ case class SparkAtlasConnectorParser(spark: SparkSession, delegate: ParserInterf
object SQLQuery {
private[this] val sqlQuery = new ThreadLocal[String]
def get(): String = sqlQuery.get
def set(s: String) = sqlQuery.set(s)
def set(s: String): Unit = sqlQuery.set(s)
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ object external {
val HBASE_COLUMNFAMILY_STRING = "hbase_column_family"
val HBASE_COLUMN_STRING = "hbase_column"

def hbaseTableToEntity(cluster: String, tableName: String, nameSpace: String): Seq[AtlasEntity] = {
def hbaseTableToEntity(cluster: String, tableName: String, nameSpace: String)
: Seq[AtlasEntity] = {
val hbaseEntity = new AtlasEntity(HBASE_TABLE_STRING)
hbaseEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
getTableQualifiedName(cluster, nameSpace, tableName))
Expand Down Expand Up @@ -116,7 +117,7 @@ object external {
val HIVE_COLUMN_TYPE_STRING = "hive_column"
val HIVE_TABLE_TYPE_STRING = "hive_table"

def hiveDbUniqueAttribute(cluster: String, db: String) = s"${db.toLowerCase}@$cluster"
def hiveDbUniqueAttribute(cluster: String, db: String): String = s"${db.toLowerCase}@$cluster"

def hiveDbToEntities(dbDefinition: CatalogDatabase, cluster: String): Seq[AtlasEntity] = {
val dbEntity = new AtlasEntity(HIVE_DB_TYPE_STRING)
Expand Down Expand Up @@ -190,7 +191,7 @@ object external {
cluster: String,
db: String,
table: String,
isTemporary: Boolean = false): String = {
isTemporary: Boolean = false): String = {
val tableName = if (isTemporary) {
if (SessionState.get() != null && SessionState.get().getSessionId != null) {
s"${table}_temp-${SessionState.get().getSessionId}"
Expand All @@ -215,9 +216,9 @@ object external {
val dbEntities = hiveDbToEntities(dbDefinition, cluster)
val sdEntities = hiveStorageDescToEntities(
tableDefinition.storage, cluster, db, table
/* isTempTable = false Spark doesn't support temp table*/)
/* isTempTable = false Spark doesn't support temp table */)
val schemaEntities = hiveSchemaToEntities(
tableDefinition.schema, cluster, db, table /*, isTempTable = false */)
tableDefinition.schema, cluster, db, table /* , isTempTable = false */)

val tblEntity = new AtlasEntity(HIVE_TABLE_TYPE_STRING)
tblEntity.setAttribute(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,11 @@ object KafkaHarvester extends AtlasEntityUtils with Logging {
case e: NoSuchMethodException =>
logDebug(s"Can not get topic, so can not create Kafka topic entities: ${qd.qe}")
}
val outputEntities = if (destTopic.isDefined)
external.kafkaToEntity(clusterName, destTopic.get) else Seq.empty
val outputEntities = if (destTopic.isDefined) {
external.kafkaToEntity(clusterName, destTopic.get)
} else {
Seq.empty
}

// create process entity
val pDescription = StringBuilder.newBuilder.append("Topics subscribed( ")
Expand Down

0 comments on commit 9bdfd6d

Please sign in to comment.