Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ public void createScanRangeLocationsUnsplittable(FileLoadScanNode.ParamCreateCon
context.params.setCompressType(compressType);
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
context.fileGroup.getColumnNamesFromPath());
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size, columnsFromPath);
List<String> columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath();
TFileRangeDesc rangeDesc = createFileRangeDesc(0, fileStatus, fileStatus.size,
columnsFromPath, columnsFromPathKeys);
locations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
}
scanRangeLocations.add(locations);
Expand Down Expand Up @@ -289,12 +291,13 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte
context.params.setCompressType(compressType);
List<String> columnsFromPath = BrokerUtil.parseColumnsFromPath(fileStatus.path,
context.fileGroup.getColumnNamesFromPath());
List<String> columnsFromPathKeys = context.fileGroup.getColumnNamesFromPath();
// Assign scan range locations only for broker load.
// stream load has only one file, and no need to set multi scan ranges.
if (tmpBytes > bytesPerInstance && jobType != JobType.STREAM_LOAD) {
long rangeBytes = bytesPerInstance - curInstanceBytes;
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, rangeBytes,
columnsFromPath);
columnsFromPath, columnsFromPathKeys);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset += rangeBytes;

Expand All @@ -303,7 +306,8 @@ public void createScanRangeLocationsSplittable(FileLoadScanNode.ParamCreateConte
curLocations = newLocations(context.params, brokerDesc, backendPolicy);
curInstanceBytes = 0;
} else {
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath);
TFileRangeDesc rangeDesc = createFileRangeDesc(curFileOffset, fileStatus, leftBytes, columnsFromPath,
columnsFromPathKeys);
curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc);
curFileOffset = 0;
curInstanceBytes += leftBytes;
Expand Down Expand Up @@ -374,14 +378,15 @@ private TFileFormatType formatType(String fileFormat, String path) throws UserEx
}

private TFileRangeDesc createFileRangeDesc(long curFileOffset, TBrokerFileStatus fileStatus, long rangeBytes,
List<String> columnsFromPath) {
List<String> columnsFromPath, List<String> columnsFromPathKeys) {
TFileRangeDesc rangeDesc = new TFileRangeDesc();
if (jobType == JobType.BULK_LOAD) {
rangeDesc.setPath(fileStatus.path);
rangeDesc.setStartOffset(curFileOffset);
rangeDesc.setSize(rangeBytes);
rangeDesc.setFileSize(fileStatus.size);
rangeDesc.setColumnsFromPath(columnsFromPath);
rangeDesc.setColumnsFromPathKeys(columnsFromPathKeys);
if (getFileType() == TFileType.FILE_HDFS) {
URI fileUri = new Path(fileStatus.path).toUri();
rangeDesc.setFsName(fileUri.getScheme() + "://" + fileUri.getAuthority());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_load_columns_from_path", "load_p0") {
def s3BucketName = getS3BucketName()
def s3Endpoint = getS3Endpoint()
def s3Region = getS3Region()
def ak = getS3AK()
def sk = getS3SK()
def tableName = "test_columns_from_path"
def label = UUID.randomUUID().toString().replace("-", "0")
def path = "s3://${s3BucketName}/load/product=p1/code=107020/dt=20250202/data.csv"

sql """ DROP TABLE IF EXISTS ${tableName} """

sql """
CREATE TABLE ${tableName} (
k1 INT,
k2 INT,
pd VARCHAR(20) NULL,
code INT NULL,
dt DATE
)
DUPLICATE KEY(`k1`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
"""
// test all three columns with set three
try {
sql """
LOAD LABEL ${label}
(
DATA INFILE("${path}")
INTO TABLE ${tableName}
COLUMNS TERMINATED BY ","
FORMAT AS "CSV"
(k1, k2)
COLUMNS FROM PATH AS (product, code, dt)
SET
(
pd = product,
code = code,
dt = dt
)
)
WITH S3
(
"s3.access_key" = "${ak}",
"s3.secret_key" = "${sk}",
"s3.endpoint" = "${s3Endpoint}",
"s3.region" = "${s3Region}"
)
"""

// Wait for load job to finish
def maxRetry = 60
def result = ""
for (int i = 0; i < maxRetry; i++) {
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
if (result[0].State == "FINISHED" || result[0].State == "CANCELLED") {
break
}
sleep(1000)
}

// Check load job state
assertEquals("FINISHED", result[0].State)

// Verify the loaded data
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
assertTrue(rowCount[0][0] > 0, "No data was loaded")

// Verify columns from path are extracted correctly
def pathData = sql "SELECT pd, code, dt FROM ${tableName} LIMIT 1"
assertEquals("p1", pathData[0][0])
assertEquals(107020, pathData[0][1])
assertEquals("2025-02-02", pathData[0][2].toString())

} finally {
sql """ TRUNCATE TABLE ${tableName} """
}

// test all three columns with set non-same name column
label = UUID.randomUUID().toString().replace("-", "1")
try {
sql """
LOAD LABEL ${label}
(
DATA INFILE("${path}")
INTO TABLE ${tableName}
COLUMNS TERMINATED BY ","
FORMAT AS "CSV"
(k1, k2)
COLUMNS FROM PATH AS (product, code, dt)
SET (
pd = product
)
)
WITH S3
(
"s3.access_key" = "${ak}",
"s3.secret_key" = "${sk}",
"s3.endpoint" = "${s3Endpoint}",
"s3.region" = "${s3Region}"
)
"""

// Wait for load job to finish
def maxRetry = 60
def result = ""
for (int i = 0; i < maxRetry; i++) {
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
if (result[0].State == "FINISHED" || result[0].State == "CANCELLED") {
break
}
sleep(1000)
}

// Check load job state
assertEquals("FINISHED", result[0].State)

// Verify the loaded data
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
assertTrue(rowCount[0][0] > 0, "No data was loaded")

// Verify columns from path are extracted correctly
def pathData = sql "SELECT pd, code, dt FROM ${tableName} LIMIT 1"
assertEquals("p1", pathData[0][0])
assertEquals(107020, pathData[0][1])
assertEquals("2025-02-02", pathData[0][2].toString())

} finally {
sql """ TRUNCATE TABLE ${tableName} """
}

// test extracting only one column from path (only product)
label = UUID.randomUUID().toString().replace("-", "2")
try {
sql """
LOAD LABEL ${label}
(
DATA INFILE("${path}")
INTO TABLE ${tableName}
COLUMNS TERMINATED BY ","
FORMAT AS "CSV"
(k1, k2)
COLUMNS FROM PATH AS (product)
SET
(
pd = product
)
)
WITH S3
(
"s3.access_key" = "${ak}",
"s3.secret_key" = "${sk}",
"s3.endpoint" = "${s3Endpoint}",
"s3.region" = "${s3Region}"
)
"""

// Wait for load job to finish
def maxRetry = 60
def result = ""
for (int i = 0; i < maxRetry; i++) {
result = sql_return_maparray "SHOW LOAD WHERE LABEL = '${label}'"
if (result[0].State == "FINISHED" || result[0]. State == "CANCELLED") {
break
}
sleep(1000)
}

// Check load job state
assertEquals("FINISHED", result[0].State)

// Verify the loaded data
def rowCount = sql "SELECT COUNT(*) FROM ${tableName}"
assertTrue(rowCount[0][0] > 0, "No data was loaded")

// Verify only pd column is extracted from path, code and dt are loaded from CSV file
def pathData = sql "SELECT pd FROM ${tableName} LIMIT 1"
assertEquals("p1", pathData[0][0])
// code and dt should be loaded from CSV file data, not from path
// The actual values depend on the CSV file content

} finally {
sql """ DROP TABLE ${tableName} """
}
}