Skip to content

Commit

Permalink
fix outputoffset
Browse files Browse the repository at this point in the history
  • Loading branch information
zhexuany committed Mar 28, 2019
1 parent ce6e612 commit 8a07409
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import org.apache.spark.sql.BaseInitialOnceSuite

class LogicalAndOr0Suite extends BaseInitialOnceSuite {
private val allCases = Seq[String](
"select tp_char, tp_smallint, tp_char, tp_smallint, tp_char from full_data_type_table where tp_char = tp_smallint and tp_char > 0",
"select tp_char,tp_smallint from full_data_type_table where tp_char = tp_smallint and tp_char > 0",
"select tp_char,id_dt from full_data_type_table where tp_char = id_dt and tp_char > 0",
"select tp_nvarchar,tp_tinyint from full_data_type_table where tp_nvarchar = tp_tinyint and tp_nvarchar > 0",
Expand Down
247 changes: 145 additions & 102 deletions tikv-client/src/main/java/com/pingcap/tikv/meta/TiDAGRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,141 @@ public void resolve() {
typeMap = inferrer.getTypeMap();
}

// IndexScan
private DAGRequest.Builder buildIndexScan(
long id,
DAGRequest.Builder dagRequestBuilder,
Executor.Builder executorBuilder,
IndexScan.Builder indexScanBuilder,
Map<ColumnRef, Integer> colOffsetInFieldMap,
Map<TiColumnInfo, Integer> colPosInIndexMap) {
if (indexInfo == null) {
throw new TiClientInternalException("Index is empty for index scan");
}
List<TiColumnInfo> columnInfoList = tableInfo.getColumns();
boolean hasPk = false;
// We extract index column info
List<Integer> indexColOffsets =
indexInfo
.getIndexColumns()
.stream()
.map(TiIndexColumn::getOffset)
.collect(Collectors.toList());

int idxPos = 0;
// for index scan builder, columns are added by its order in index
for (Integer idx : indexColOffsets) {
TiColumnInfo tiColumnInfo = columnInfoList.get(idx);
ColumnInfo columnInfo = tiColumnInfo.toProto(tableInfo);
colPosInIndexMap.put(tiColumnInfo, idxPos++);

ColumnInfo.Builder colBuilder = ColumnInfo.newBuilder(columnInfo);
if (columnInfo.getColumnId() == -1) {
hasPk = true;
colBuilder.setPkHandle(true);
}
indexScanBuilder.addColumns(colBuilder);
}

if (isDoubleRead()) {
// double read case
if (!hasPk) {
indexScanBuilder.addColumns(handleColumn);
}

int colCount = indexScanBuilder.getColumnsCount();
// double read case: need to retrieve handle
dagRequestBuilder.addOutputOffsets(colCount != 0 ? colCount - 1 : 0);
} else {
int colCount = indexScanBuilder.getColumnsCount();
boolean pkIsNeeded = false;
// =================== IMPORTANT ======================
// offset for dagRequest should be in accordance with fields
for (ColumnRef col : getFields()) {
Integer pos = colPosInIndexMap.get(col.getColumnInfo());
if (pos != null) {
TiColumnInfo columnInfo = columnInfoList.get(indexColOffsets.get(pos));
if (col.getColumnInfo().equals(columnInfo)) {
dagRequestBuilder.addOutputOffsets(pos);
colOffsetInFieldMap.put(col, pos);
}
}
// if a column of field is not contained in index selected,
// logically it must be the pk column and
// the pkIsHandle must be true. Extra check here.
else if (col.getColumnInfo().isPrimaryKey() && tableInfo.isPkHandle()) {
pkIsNeeded = true;
// offset should be processed for each primary key encountered
dagRequestBuilder.addOutputOffsets(colCount);
// for index scan, column offset must be in the order of index->handle
colOffsetInFieldMap.put(col, indexColOffsets.size());
} else {
throw new DAGRequestException(
"columns other than primary key and index key exist in fields while index single read: "
+ col.getName());
}
}
// pk is not included in index but still needed
if (pkIsNeeded) {
indexScanBuilder.addColumns(handleColumn);
}
}
executorBuilder.setTp(ExecType.TypeIndexScan);

indexScanBuilder.setTableId(id).setIndexId(indexInfo.getId());
dagRequestBuilder.addExecutors(executorBuilder.setIdxScan(indexScanBuilder).build());
return dagRequestBuilder;
}

// TableScan
private DAGRequest.Builder buildTblScan(
long id,
DAGRequest.Builder dagRequestBuilder,
Executor.Builder executorBuilder,
TableScan.Builder tblScanBuilder,
Map<ColumnRef, Integer> colOffsetInFieldMap) {
executorBuilder.setTp(ExecType.TypeTableScan);
tblScanBuilder.setTableId(id);
// Step1. Add columns to first executor
Map<ColumnRef, Integer> seenColsOffset = new HashMap<>();
int realOffset = 0;
for (int i = 0; i < getFields().size(); i++) {
ColumnRef col = getFields().get(i);
// can't allow duplicated col added into executor.
if (!colOffsetInFieldMap.containsKey(col)) {
tblScanBuilder.addColumns(col.getColumnInfo().toProto(tableInfo));
colOffsetInFieldMap.put(col, realOffset);
}
if (!seenColsOffset.containsKey(col)) {
seenColsOffset.put(col, realOffset);
realOffset++;
}
}

// column offset should be in accordance with fields
for (int i = 0; i < getFields().size(); i++) {
ColumnRef col = getFields().get(i);
if (seenColsOffset.containsKey(col)) {
dagRequestBuilder.addOutputOffsets(seenColsOffset.get(col));
}
}

// Currently, according to TiKV's implementation, if handle
// is needed, we should add an extra column with an ID of -1
// to the TableScan executor
if (isHandleNeeded()) {
tblScanBuilder.addColumns(handleColumn);
// if handle is needed, we should append one output offset
// duplicated col may exists, we need append the size of current
// output offset's size.
dagRequestBuilder.addOutputOffsets(dagRequestBuilder.getOutputOffsetsCount());
}

dagRequestBuilder.addExecutors(executorBuilder.setTblScan(tblScanBuilder));

return dagRequestBuilder;
}

/**
* Unify indexScan and tableScan building logic since they are very much alike. DAGRequest for
* IndexScan should also contain filters and aggregation, so we can reuse this part of logic.
Expand All @@ -275,109 +410,17 @@ public DAGRequest buildScan(boolean isIndexScan) {
Map<TiColumnInfo, Integer> colPosInIndexMap = new HashMap<>();

if (isIndexScan) {
// IndexScan
if (indexInfo == null) {
throw new TiClientInternalException("Index is empty for index scan");
}
List<TiColumnInfo> columnInfoList = tableInfo.getColumns();
boolean hasPk = false;
// We extract index column info
List<Integer> indexColOffsets =
indexInfo
.getIndexColumns()
.stream()
.map(TiIndexColumn::getOffset)
.collect(Collectors.toList());

int idxPos = 0;
// for index scan builder, columns are added by its order in index
for (Integer idx : indexColOffsets) {
TiColumnInfo tiColumnInfo = columnInfoList.get(idx);
ColumnInfo columnInfo = tiColumnInfo.toProto(tableInfo);
colPosInIndexMap.put(tiColumnInfo, idxPos++);

ColumnInfo.Builder colBuilder = ColumnInfo.newBuilder(columnInfo);
if (columnInfo.getColumnId() == -1) {
hasPk = true;
colBuilder.setPkHandle(true);
}
indexScanBuilder.addColumns(colBuilder);
}

if (isDoubleRead()) {
// double read case
if (!hasPk) {
indexScanBuilder.addColumns(handleColumn);
}

int colCount = indexScanBuilder.getColumnsCount();
// double read case: need to retrieve handle
dagRequestBuilder.addOutputOffsets(colCount != 0 ? colCount - 1 : 0);
} else {
int colCount = indexScanBuilder.getColumnsCount();
boolean pkIsNeeded = false;
// =================== IMPORTANT ======================
// offset for dagRequest should be in accordance with fields
for (ColumnRef col : getFields()) {
Integer pos = colPosInIndexMap.get(col.getColumnInfo());
if (pos != null) {
TiColumnInfo columnInfo = columnInfoList.get(indexColOffsets.get(pos));
if (col.getColumnInfo().equals(columnInfo)) {
dagRequestBuilder.addOutputOffsets(pos);
colOffsetInFieldMap.put(col, pos);
}
}
// if a column of field is not contained in index selected,
// logically it must be the pk column and
// the pkIsHandle must be true. Extra check here.
else if (col.getColumnInfo().isPrimaryKey() && tableInfo.isPkHandle()) {
pkIsNeeded = true;
// offset should be processed for each primary key encountered
dagRequestBuilder.addOutputOffsets(colCount);
// for index scan, column offset must be in the order of index->handle
colOffsetInFieldMap.put(col, indexColOffsets.size());
} else {
throw new DAGRequestException(
"columns other than primary key and index key exist in fields while index single read: "
+ col.getName());
}
}
// pk is not included in index but still needed
if (pkIsNeeded) {
indexScanBuilder.addColumns(handleColumn);
}
}
executorBuilder.setTp(ExecType.TypeIndexScan);

indexScanBuilder.setTableId(id).setIndexId(indexInfo.getId());
dagRequestBuilder.addExecutors(executorBuilder.setIdxScan(indexScanBuilder).build());
dagRequestBuilder =
buildIndexScan(
id,
dagRequestBuilder,
executorBuilder,
indexScanBuilder,
colOffsetInFieldMap,
colPosInIndexMap);
} else {
// TableScan
executorBuilder.setTp(ExecType.TypeTableScan);
tblScanBuilder.setTableId(id);
// Step1. Add columns to first executor
for (int i = 0; i < getFields().size(); i++) {
ColumnRef col = getFields().get(i);
tblScanBuilder.addColumns(col.getColumnInfo().toProto(tableInfo));
colOffsetInFieldMap.put(col, i);
}
// Currently, according to TiKV's implementation, if handle
// is needed, we should add an extra column with an ID of -1
// to the TableScan executor
if (isHandleNeeded()) {
tblScanBuilder.addColumns(handleColumn);
}
dagRequestBuilder.addExecutors(executorBuilder.setTblScan(tblScanBuilder));

// column offset should be in accordance with fields
for (int i = 0; i < getFields().size(); i++) {
dagRequestBuilder.addOutputOffsets(i);
}

// if handle is needed, we should append one output offset
if (isHandleNeeded()) {
dagRequestBuilder.addOutputOffsets(tableInfo.getColumns().size());
}
dagRequestBuilder =
buildTblScan(id, dagRequestBuilder, executorBuilder, tblScanBuilder, colOffsetInFieldMap);
}

if (!isIndexScan || (isIndexScan() && !isDoubleRead())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ public abstract class CoprocessIterator<T> implements Iterator<T> {
public static CoprocessIterator<Row> getRowIterator(
TiDAGRequest req, List<RegionTask> regionTasks, TiSession session) {
return new DAGIterator<Row>(
// If index scan is a covering index, the logic is table scan
// so we need set isIndexScan to false.
req.buildScan(req.isIndexScan() && !req.isDoubleRead()),
regionTasks,
session,
Expand Down

0 comments on commit 8a07409

Please sign in to comment.