Skip to content

Commit

Permalink
Formatted Scala code.
Browse files Browse the repository at this point in the history
  • Loading branch information
ravipesala committed May 24, 2016
1 parent cfed00c commit 6840c31
Show file tree
Hide file tree
Showing 36 changed files with 1,180 additions and 1,131 deletions.
29 changes: 28 additions & 1 deletion dev/java-code-format-template.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,26 @@
<option name="RIGHT_MARGIN" value="72" />
</MultiMarkdownCodeStyleSettings>
<ScalaCodeStyleSettings>
<option name="classCountToUseImportOnDemand" value="6" />
<option name="importLayout">
<array>
<option value="java" />
<option value="javax" />
<option value="_______ blank line _______" />
<option value="scala" />
<option value="_______ blank line _______" />
<option value="all other imports" />
<option value="_______ blank line _______" />
<option value="org.carbondata" />
</array>
</option>
<option name="METHOD_BRACE_FORCE" value="1" />
<option name="NOT_CONTINUATION_INDENT_FOR_PARAMS" value="true" />
<option name="USE_ALTERNATE_CONTINUATION_INDENT_FOR_PARAMS" value="true" />
<option name="INDENT_BRACED_FUNCTION_ARGS" value="false" />
<option name="USE_SCALADOC2_FORMATTING" value="false" />
<option name="SPACES_IN_ONE_LINE_BLOCKS" value="true" />
<option name="KEEP_XML_FORMATTING" value="true" />
</ScalaCodeStyleSettings>
<XML>
<option name="XML_LEGACY_SETTINGS_IMPORTED" value="true" />
Expand Down Expand Up @@ -80,14 +98,23 @@
</codeStyleSettings>
<codeStyleSettings language="Scala">
<option name="ALIGN_MULTILINE_PARAMETERS" value="false" />
<option name="ALIGN_MULTILINE_BINARY_OPERATION" value="true" />
<option name="CALL_PARAMETERS_WRAP" value="5" />
<option name="CALL_PARAMETERS_RPAREN_ON_NEXT_LINE" value="true" />
<option name="METHOD_PARAMETERS_WRAP" value="5" />
<option name="EXTENDS_LIST_WRAP" value="5" />
<option name="EXTENDS_KEYWORD_WRAP" value="1" />
<option name="METHOD_CALL_CHAIN_WRAP" value="5" />
<option name="BINARY_OPERATION_WRAP" value="1" />
<option name="KEEP_SIMPLE_BLOCKS_IN_ONE_LINE" value="true" />
<option name="KEEP_SIMPLE_METHODS_IN_ONE_LINE" value="true" />
<option name="FOR_STATEMENT_WRAP" value="1" />
<option name="IF_BRACE_FORCE" value="3" />
<option name="DOWHILE_BRACE_FORCE" value="3" />
<option name="WHILE_BRACE_FORCE" value="3" />
<option name="FOR_BRACE_FORCE" value="3" />
<option name="WRAP_LONG_LINES" value="true" />
<option name="PARAMETER_ANNOTATION_WRAP" value="1" />
<option name="VARIABLE_ANNOTATION_WRAP" value="1" />
</codeStyleSettings>
<codeStyleSettings language="XML">
<indentOptions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.carbondata.core.load.BlockDetails
/**
* this RDD use to combine blocks in node level
* return (host,Array[BlockDetails])
*
* @param prev
*/
class DummyLoadRDD(prev: NewHadoopRDD[LongWritable, Text])
Expand All @@ -34,13 +35,14 @@ class DummyLoadRDD(prev: NewHadoopRDD[LongWritable, Text])
override def getPartitions: Array[Partition] = firstParent[(LongWritable, Text)].partitions

override def compute(theSplit: Partition,
context: TaskContext): Iterator[(String, BlockDetails)] = {
context: TaskContext): Iterator[(String, BlockDetails)] = {
new Iterator[(String, BlockDetails)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
var finished = false
// added to make sure spark distributes tasks not to single node
// giving sufficient time for spark to schedule
Thread.sleep(5000)

override def hasNext: Boolean = {
if (!finished) {
finished = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
*/
@DeveloperApi
case class CarbonAggregate(
partial: Boolean,
groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],
child: SparkPlan)(@transient sqlContext: SQLContext)
partial: Boolean,
groupingExpressions: Seq[Expression],
aggregateExpressions: Seq[NamedExpression],
child: SparkPlan)(@transient sqlContext: SQLContext)
extends UnaryNode {

override def requiredChildDistribution: Seq[Distribution] =
override def requiredChildDistribution: Seq[Distribution] = {
if (partial) {
UnspecifiedDistribution :: Nil
} else {
Expand All @@ -59,6 +59,7 @@ case class CarbonAggregate(
ClusteredDistribution(groupingExpressions) :: Nil
}
}
}

override def otherCopyArgs: Seq[AnyRef] = sqlContext :: Nil

Expand All @@ -77,8 +78,8 @@ case class CarbonAggregate(
* output.
*/
case class ComputedAggregate(unbound: AggregateExpression1,
aggregate: AggregateExpression1,
resultAttribute: AttributeReference)
aggregate: AggregateExpression1,
resultAttribute: AttributeReference)

/** A list of aggregates that need to be computed for each group. */
private[this] val computedAggregates = aggregateExpressions.flatMap { agg =>
Expand Down Expand Up @@ -128,75 +129,78 @@ case class CarbonAggregate(
}
}

override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
if (groupingExpressions.isEmpty) {
child.execute().mapPartitions { iter =>
val buffer = newAggregateBuffer()
var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
var i = 0
while (i < buffer.length) {
buffer(i).update(currentRow)
i += 1
}
}
val resultProjection = new InterpretedProjection(resultExpressions, computedSchema)
val aggregateResults = new GenericMutableRow(computedAggregates.length)

var i = 0
while (i < buffer.length) {
aggregateResults(i) = buffer(i).eval(EmptyRow)
i += 1
}

Iterator(resultProjection(aggregateResults))
}
} else {
child.execute().mapPartitions { iter =>
val hashTable = new HashMap[InternalRow, Array[AggregateFunction1]]
val groupingProjection = new InterpretedMutableProjection(groupingExpressions, childOutput)

var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
val currentGroup = groupingProjection(currentRow)
var currentBuffer = hashTable.get(currentGroup)
if (currentBuffer == null) {
currentBuffer = newAggregateBuffer()
hashTable.put(currentGroup.copy(), currentBuffer)
override def doExecute(): RDD[InternalRow] = {
attachTree(this, "execute") {
if (groupingExpressions.isEmpty) {
child.execute().mapPartitions { iter =>
val buffer = newAggregateBuffer()
var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
var i = 0
while (i < buffer.length) {
buffer(i).update(currentRow)
i += 1
}
}
val resultProjection = new InterpretedProjection(resultExpressions, computedSchema)
val aggregateResults = new GenericMutableRow(computedAggregates.length)

var i = 0
while (i < currentBuffer.length) {
currentBuffer(i).update(currentRow)
while (i < buffer.length) {
aggregateResults(i) = buffer(i).eval(EmptyRow)
i += 1
}
}

new Iterator[InternalRow] {
private[this] val hashTableIter = hashTable.entrySet().iterator()
private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length)
private[this] val resultProjection =
new InterpretedMutableProjection(resultExpressions,
computedSchema ++ namedGroups.map(_._2))
private[this] val joinedRow = new JoinedRow

override final def hasNext: Boolean = hashTableIter.hasNext

override final def next(): InternalRow = {
val currentEntry = hashTableIter.next()
val currentGroup = currentEntry.getKey
val currentBuffer = currentEntry.getValue
Iterator(resultProjection(aggregateResults))
}
} else {
child.execute().mapPartitions { iter =>
val hashTable = new HashMap[InternalRow, Array[AggregateFunction1]]
val groupingProjection = new InterpretedMutableProjection(groupingExpressions,
childOutput)

var currentRow: InternalRow = null
while (iter.hasNext) {
currentRow = iter.next()
val currentGroup = groupingProjection(currentRow)
var currentBuffer = hashTable.get(currentGroup)
if (currentBuffer == null) {
currentBuffer = newAggregateBuffer()
hashTable.put(currentGroup.copy(), currentBuffer)
}

var i = 0
while (i < currentBuffer.length) {
// Evaluating an aggregate buffer returns the result. No row is required since we
// already added all rows in the group using update.
aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
currentBuffer(i).update(currentRow)
i += 1
}
resultProjection(joinedRow(aggregateResults, currentGroup))
}

new Iterator[InternalRow] {
private[this] val hashTableIter = hashTable.entrySet().iterator()
private[this] val aggregateResults = new GenericMutableRow(computedAggregates.length)
private[this] val resultProjection =
new InterpretedMutableProjection(resultExpressions,
computedSchema ++ namedGroups.map(_._2))
private[this] val joinedRow = new JoinedRow

override final def hasNext: Boolean = hashTableIter.hasNext

override final def next(): InternalRow = {
val currentEntry = hashTableIter.next()
val currentGroup = currentEntry.getKey
val currentBuffer = currentEntry.getValue

var i = 0
while (i < currentBuffer.length) {
// Evaluating an aggregate buffer returns the result. No row is required since we
// already added all rows in the group using update.
aggregateResults(i) = currentBuffer(i).eval(EmptyRow)
i += 1
}
resultProjection(joinedRow(aggregateResults, currentGroup))
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@ case class CarbonBoundReference(colExp: ColumnExpression, dataType: DataType, nu

type EvaluatedType = Any

override def toString: String = s"input[" + colExp.getColIndex() + "]"
override def toString: String = s"input[" + colExp.getColIndex + "]"

override def eval(input: InternalRow): Any = input.get(colExp.getColIndex(), dataType)
override def eval(input: InternalRow): Any = input.get(colExp.getColIndex, dataType)

override def name: String = colExp.getColumnName()
override def name: String = colExp.getColumnName

override def toAttribute: Attribute = throw new UnsupportedOperationException

Expand Down
Loading

0 comments on commit 6840c31

Please sign in to comment.