Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](multi-catalog)support the max compute partition prune #27902

Merged
merged 1 commit into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,6 @@ MaxComputeTableDescriptor::MaxComputeTableDescriptor(const TTableDescriptor& tde
_table(tdesc.mcTable.table),
_access_key(tdesc.mcTable.access_key),
_secret_key(tdesc.mcTable.secret_key),
_partition_spec(tdesc.mcTable.partition_spec),
_public_access(tdesc.mcTable.public_access) {}

MaxComputeTableDescriptor::~MaxComputeTableDescriptor() = default;
Expand Down
2 changes: 0 additions & 2 deletions be/src/runtime/descriptors.h
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ class MaxComputeTableDescriptor : public TableDescriptor {
const std::string table() const { return _table; }
const std::string access_key() const { return _access_key; }
const std::string secret_key() const { return _secret_key; }
const std::string partition_spec() const { return _partition_spec; }
const std::string public_access() const { return _public_access; }

private:
Expand All @@ -246,7 +245,6 @@ class MaxComputeTableDescriptor : public TableDescriptor {
std::string _table;
std::string _access_key;
std::string _secret_key;
std::string _partition_spec;
std::string _public_access;
};

Expand Down
9 changes: 7 additions & 2 deletions be/src/vec/exec/format/table/max_compute_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,15 @@ class Block;
namespace doris::vectorized {

MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_desc,
const TMaxComputeFileDesc& max_compute_params,
const std::vector<SlotDescriptor*>& file_slot_descs,
const TFileRangeDesc& range, RuntimeState* state,
RuntimeProfile* profile)
: _file_slot_descs(file_slot_descs), _range(range), _state(state), _profile(profile) {
: _max_compute_params(max_compute_params),
_file_slot_descs(file_slot_descs),
_range(range),
_state(state),
_profile(profile) {
_table_desc = mc_desc;
std::ostringstream required_fields;
std::ostringstream columns_types;
Expand All @@ -64,7 +69,7 @@ MaxComputeJniReader::MaxComputeJniReader(const MaxComputeTableDescriptor* mc_des
{"access_key", _table_desc->access_key()},
{"secret_key", _table_desc->secret_key()},
{"project", _table_desc->project()},
{"partition_spec", _table_desc->partition_spec()},
{"partition_spec", _max_compute_params.partition_spec},
{"table", _table_desc->table()},
{"public_access", _table_desc->public_access()},
{"start_offset", std::to_string(_range.start_offset)},
Expand Down
12 changes: 7 additions & 5 deletions be/src/vec/exec/format/table/max_compute_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class MaxComputeJniReader : public GenericReader {

public:
MaxComputeJniReader(const MaxComputeTableDescriptor* mc_desc,
const TMaxComputeFileDesc& max_compute_params,
const std::vector<SlotDescriptor*>& file_slot_descs,
const TFileRangeDesc& range, RuntimeState* state, RuntimeProfile* profile);

Expand All @@ -68,13 +69,14 @@ class MaxComputeJniReader : public GenericReader {
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);

private:
const MaxComputeTableDescriptor* _table_desc;
const MaxComputeTableDescriptor* _table_desc = nullptr;
const TMaxComputeFileDesc& _max_compute_params;
const std::vector<SlotDescriptor*>& _file_slot_descs;
const TFileRangeDesc& _range;
RuntimeState* _state;
RuntimeProfile* _profile;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
std::unique_ptr<JniConnector> _jni_connector;
RuntimeState* _state = nullptr;
RuntimeProfile* _profile = nullptr;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
std::unique_ptr<JniConnector> _jni_connector = nullptr;
};

} // namespace doris::vectorized
12 changes: 6 additions & 6 deletions be/src/vec/exec/scan/vfile_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,13 +686,13 @@ Status VFileScanner::_get_next_reader() {
bool need_to_get_parsed_schema = false;
switch (format_type) {
case TFileFormatType::FORMAT_JNI: {
if (_real_tuple_desc->table_desc()->table_type() ==
::doris::TTableType::type::MAX_COMPUTE_TABLE) {
const MaxComputeTableDescriptor* mc_desc =
static_cast<const MaxComputeTableDescriptor*>(
_real_tuple_desc->table_desc());
if (range.__isset.table_format_params &&
range.table_format_params.table_format_type == "max_compute") {
const auto* mc_desc = static_cast<const MaxComputeTableDescriptor*>(
_real_tuple_desc->table_desc());
std::unique_ptr<MaxComputeJniReader> mc_reader = MaxComputeJniReader::create_unique(
mc_desc, _file_slot_descs, range, _state, _profile);
mc_desc, range.table_format_params.max_compute_params, _file_slot_descs,
range, _state, _profile);
init_status = mc_reader->init_reader(_colname_to_value_range);
_cur_reader = std::move(mc_reader);
} else if (range.__isset.table_format_params &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ public class MaxComputeJniScanner extends JniScanner {
private static final String START_OFFSET = "start_offset";
private static final String SPLIT_SIZE = "split_size";
private static final String PUBLIC_ACCESS = "public_access";
private final RootAllocator arrowAllocator = new RootAllocator(Integer.MAX_VALUE);
private final Map<String, MaxComputeTableScan> tableScans = new ConcurrentHashMap<>();
private final String region;
private final String project;
private final String table;
private RootAllocator arrowAllocator;
private PartitionSpec partitionSpec;
private Set<String> partitionColumns;
private MaxComputeTableScan curTableScan;
Expand Down Expand Up @@ -171,9 +171,14 @@ public void open() throws IOException {
partitionColumns = session.getSchema().getPartitionColumns().stream()
.map(Column::getName)
.collect(Collectors.toSet());
List<Column> maxComputeColumns = new ArrayList<>(readColumns);
maxComputeColumns.removeIf(e -> partitionColumns.contains(e.getName()));
curReader = session.openArrowRecordReader(start, totalRows, maxComputeColumns, arrowAllocator);
List<Column> pushDownColumns = new ArrayList<>(readColumns);
pushDownColumns.removeIf(e -> partitionColumns.contains(e.getName()));
if (pushDownColumns.isEmpty() && !partitionColumns.isEmpty()) {
// query columns required non-null, when query partition table
pushDownColumns.add(session.getSchema().getColumn(0));
}
arrowAllocator = new RootAllocator(Integer.MAX_VALUE);
curReader = session.openArrowRecordReader(start, totalRows, pushDownColumns, arrowAllocator);
remainBatchRows = totalRows;
} catch (TunnelException e) {
if (retryCount > 0 && e.getErrorMsg().contains("TableModified")) {
Expand Down Expand Up @@ -254,7 +259,8 @@ public void close() throws IOException {
startOffset = -1;
splitSize = -1;
if (curReader != null) {
arrowAllocator.releaseBytes(arrowAllocator.getAllocatedMemory());
arrowAllocator.close();
arrowAllocator = null;
curReader.close();
curReader = null;
}
Expand All @@ -279,15 +285,25 @@ protected int getNext() throws IOException {
private int readVectors(int expectedRows) throws IOException {
VectorSchemaRoot batch;
int curReadRows = 0;
while (curReadRows < expectedRows && (batch = curReader.read()) != null) {
while (curReadRows < expectedRows) {
batch = curReader.read();
if (batch == null) {
break;
}
try {
List<FieldVector> fieldVectors = batch.getFieldVectors();
int batchRows = 0;
for (FieldVector column : fieldVectors) {
Integer readColumnId = readColumnsToId.get(column.getName());
if (readColumnId == null) {
// use for partition if no column need to read.
batchRows = column.getValueCount();
continue;
}
columnValue.reset(column);
batchRows = column.getValueCount();
for (int j = 0; j < batchRows; j++) {
appendData(readColumnsToId.get(column.getName()), columnValue);
appendData(readColumnId, columnValue);
}
}
if (partitionSpec != null) {
Expand All @@ -303,6 +319,8 @@ private int readVectors(int expectedRows) throws IOException {
}
}
curReadRows += batchRows;
} catch (Exception e) {
throw new RuntimeException("Fail to read arrow data, reason: " + e.getMessage(), e);
} finally {
batch.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Type;
import org.apache.doris.catalog.external.HMSExternalTable;
import org.apache.doris.catalog.external.MaxComputeExternalTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
Expand All @@ -37,6 +38,7 @@
import org.apache.doris.common.util.OrderByPair;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.HMSExternalCatalog;
import org.apache.doris.datasource.MaxComputeExternalCatalog;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ShowResultSetMetaData;
Expand Down Expand Up @@ -126,7 +128,7 @@ public void analyze(Analyzer analyzer) throws UserException {

DatabaseIf db = catalog.getDbOrAnalysisException(dbName);
TableIf table = db.getTableOrMetaException(tblName, Table.TableType.OLAP, TableType.MATERIALIZED_VIEW,
TableType.HMS_EXTERNAL_TABLE);
TableType.HMS_EXTERNAL_TABLE, TableType.MAX_COMPUTE_EXTERNAL_TABLE);

if (table instanceof HMSExternalTable) {
if (((HMSExternalTable) table).isView()) {
Expand All @@ -138,6 +140,13 @@ public void analyze(Analyzer analyzer) throws UserException {
return;
}

if (table instanceof MaxComputeExternalTable) {
if (((MaxComputeExternalTable) table).getOdpsTable().getPartitions().isEmpty()) {
throw new AnalysisException("Table " + tblName + " is not a partitioned table");
}
return;
}

table.readLock();
try {
// build proc path
Expand Down Expand Up @@ -170,7 +179,8 @@ public void analyzeImpl(Analyzer analyzer) throws UserException {
}

// disallow unsupported catalog
if (!(catalog.isInternalCatalog() || catalog instanceof HMSExternalCatalog)) {
if (!(catalog.isInternalCatalog() || catalog instanceof HMSExternalCatalog
|| catalog instanceof MaxComputeExternalCatalog)) {
throw new AnalysisException(String.format("Catalog of type '%s' is not allowed in ShowPartitionsStmt",
catalog.getType()));
}
Expand Down
Loading