Skip to content

Commit

Permalink
change ScanRdd to use RecordReader
Browse files Browse the repository at this point in the history
fix style
  • Loading branch information
jackylk authored and ravipesala committed Nov 25, 2016
1 parent 5c697e9 commit 5f6a56c
Show file tree
Hide file tree
Showing 41 changed files with 790 additions and 1,031 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package org.apache.carbondata.core.carbon.datastore.block;

import java.io.IOException;

/**
* Abstract class which is maintains the locations of node.
* interface to get the locations of node. Used for making task distribution based on locality
*/
public abstract class Distributable implements Comparable<Distributable> {
public interface Distributable extends Comparable<Distributable> {

public abstract String[] getLocations();
String[] getLocations() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@
* class will be used to pass the block detail detail will be passed form driver
* to all the executor to load the b+ tree
*/
public class TableBlockInfo extends Distributable
implements Serializable, Comparable<Distributable> {
public class TableBlockInfo implements Distributable, Serializable {

/**
* serialization id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
/**
* This class is responsible for maintaining the mapping of tasks of a node.
*/
public class TableTaskInfo extends Distributable {
public class TableTaskInfo implements Distributable {

private final List<TableBlockInfo> tableBlockInfoList;
private final String taskId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.carbondata.core.carbon.datastore.DataRefNodeFinder;
import org.apache.carbondata.core.carbon.datastore.IndexKey;
import org.apache.carbondata.core.carbon.datastore.block.AbstractIndex;
import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
import org.apache.carbondata.core.carbon.datastore.impl.btree.BTreeDataRefNodeFinder;
import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
Expand Down Expand Up @@ -127,12 +126,10 @@ public List<DataRefNode> getFilterredBlocks(DataRefNode btreeNode,
FilterExecuter filterExecuter =
FilterUtil.getFilterExecuterTree(filterResolver, tableSegment.getSegmentProperties(),null);
while (startBlock != endBlock) {
addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, startBlock,
tableSegment.getSegmentProperties());
addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, startBlock);
startBlock = startBlock.getNextDataRefNode();
}
addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, endBlock,
tableSegment.getSegmentProperties());
addBlockBasedOnMinMaxValue(filterExecuter, listOfDataBlocksToScan, endBlock);
LOGGER.info("Total Time in retrieving the data reference node" + "after scanning the btree " + (
System.currentTimeMillis() - startTimeInMillis)
+ " Total number of data reference node for executing filter(s) " + listOfDataBlocksToScan
Expand All @@ -147,11 +144,9 @@ public List<DataRefNode> getFilterredBlocks(DataRefNode btreeNode,
* @param filterResolver
* @param listOfDataBlocksToScan
* @param dataRefNode
* @param segmentProperties
*/
private void addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter,
List<DataRefNode> listOfDataBlocksToScan, DataRefNode dataRefNode,
SegmentProperties segmentProperties) {
List<DataRefNode> listOfDataBlocksToScan, DataRefNode dataRefNode) {

BitSet bitSet = filterExecuter
.isScanRequired(dataRefNode.getColumnsMaxValue(), dataRefNode.getColumnsMinValue());
Expand All @@ -174,7 +169,7 @@ private void addBlockBasedOnMinMaxValue(FilterExecuter filterExecuter,
private FilterResolverIntf getFilterResolvertree(Expression expressionTree,
AbsoluteTableIdentifier tableIdentifier) throws FilterUnsupportedException {
FilterResolverIntf filterEvaluatorTree =
createFilterResolverTree(expressionTree, tableIdentifier, null);
createFilterResolverTree(expressionTree, tableIdentifier);
traverseAndResolveTree(filterEvaluatorTree, tableIdentifier);
return filterEvaluatorTree;
}
Expand Down Expand Up @@ -212,24 +207,22 @@ private void traverseAndResolveTree(FilterResolverIntf filterResolverTree,
* @return
*/
private FilterResolverIntf createFilterResolverTree(Expression expressionTree,
AbsoluteTableIdentifier tableIdentifier, Expression intermediateExpression) {
AbsoluteTableIdentifier tableIdentifier) {
ExpressionType filterExpressionType = expressionTree.getFilterExpressionType();
BinaryExpression currentExpression = null;
switch (filterExpressionType) {
case OR:
currentExpression = (BinaryExpression) expressionTree;
return new LogicalFilterResolverImpl(
createFilterResolverTree(currentExpression.getLeft(), tableIdentifier,
currentExpression),
createFilterResolverTree(currentExpression.getRight(), tableIdentifier,
currentExpression),currentExpression);
createFilterResolverTree(currentExpression.getLeft(), tableIdentifier),
createFilterResolverTree(currentExpression.getRight(), tableIdentifier),
currentExpression);
case AND:
currentExpression = (BinaryExpression) expressionTree;
return new LogicalFilterResolverImpl(
createFilterResolverTree(currentExpression.getLeft(), tableIdentifier,
currentExpression),
createFilterResolverTree(currentExpression.getRight(), tableIdentifier,
currentExpression), currentExpression);
createFilterResolverTree(currentExpression.getLeft(), tableIdentifier),
createFilterResolverTree(currentExpression.getRight(), tableIdentifier),
currentExpression);
case EQUALS:
case IN:
return getFilterResolverBasedOnExpressionType(ExpressionType.EQUALS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ object DataFrameAPIExample {

def main(args: Array[String]) {
val cc = ExampleUtils.createCarbonContext("DataFrameAPIExample")
ExampleUtils.writeSampleCarbonFile(cc, "carbon1")
ExampleUtils.writeSampleCarbonFile(cc, "carbon1", 1000)

// use datasource api to read
val in = cc.read
Expand All @@ -42,7 +42,8 @@ object DataFrameAPIExample {
println(s"count after 2 load: $count")

// use SQL to read
cc.sql("SELECT count(*) FROM carbon1 WHERE c3 > 500").show
cc.sql("SELECT c1, count(c3) FROM carbon1 where c3 > 500 group by c1 limit 10").show

cc.sql("DROP TABLE IF EXISTS carbon1")
}
}
Expand Down
Loading

0 comments on commit 5f6a56c

Please sign in to comment.