Skip to content

Commit

Permalink
Modify the coprocessor
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyunh committed Jun 4, 2015
1 parent 6e46385 commit 0c6fef4
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -825,20 +825,18 @@ private[hbase] case class HBaseRelation(
def buildRowAfterCoprocessor(projections: Seq[(Attribute, Int)],
result: Result,
row: MutableRow): Row = {
assert(projections.size == row.length, "Projection size and row size mismatched")
projections.zipWithIndex.foreach { case (p, i) =>
for (i <- projections.indices) {
val kv: Cell = result.rawCells()(i)
val colValue: HBaseRawType = CellUtil.cloneValue(kv)
DataTypeUtils.setRowColumnFromHBaseRawType(
row, p._2, colValue, 0, colValue.length, p._1.dataType)
row, projections(i)._2, colValue, 0, colValue.length, projections(i)._1.dataType)
}
row
}

def buildRow(projections: Seq[(Attribute, Int)],
result: Result,
row: MutableRow): Row = {
assert(projections.size == row.length, "Projection size and row size mismatched")
val rowKeys = HBaseKVHelper.decodingRawKeyColumns(result.getRow, keyColumns)
projections.foreach {
p =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ abstract class BaseRegionScanner extends RegionScanner {

class SparkSqlRegionObserver extends BaseRegionObserver {
lazy val logger = Logger.getLogger(getClass.getName)
lazy val EmptyArray = Array[Byte]()

override def postScannerOpen(e: ObserverContext[RegionCoprocessorEnvironment],
scan: Scan,
Expand Down Expand Up @@ -172,16 +173,16 @@ class SparkSqlRegionObserver extends BaseRegionObserver {
override def next(results: java.util.List[Cell]): Boolean = {
val hasMore: Boolean = result.hasNext
if (hasMore) {
val nextRow: Seq[Any] = result.next().toSeq
val nextRowWithDateType = nextRow zip outputDataType
val cellsInRow: Seq[Cell] = nextRowWithDateType.map {
case (data, dataType) =>
val dataOfBytes: HBaseRawType = {
if (data == null) null else DataTypeUtils.dataToBytes(data, dataType)
}
new KeyValue(Array[Byte](), Array[Byte](), Array[Byte](), dataOfBytes)
val nextRow = result.next()
val numOfCells = outputDataType.length
for (i <- 0 until numOfCells) {
val data = nextRow(i)
val dataType = outputDataType(i)
val dataOfBytes: HBaseRawType = {
if (data == null) null else DataTypeUtils.dataToBytes(data, dataType)
}
results.add(new KeyValue(EmptyArray, EmptyArray, EmptyArray, dataOfBytes))
}
results.addAll(cellsInRow)
}
hasMore
}
Expand Down

0 comments on commit 0c6fef4

Please sign in to comment.