Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] fix several bugs of pipe #34871

Merged
merged 8 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class DateUtils {
.addSerializationExclusionStrategy(new GsonUtils.HiddenAnnotationExclusionStrategy())
.addDeserializationExclusionStrategy(new GsonUtils.HiddenAnnotationExclusionStrategy())
.enableComplexMapKeySerialization()
.disableHtmlEscaping()
.registerTypeAdapter(LocalDateTime.class, LOCAL_DATETIME_PRINTER)
.create();
/*
Expand Down Expand Up @@ -172,7 +173,11 @@ public static String convertDateTimeFormaterToSecondFormater(String datetime) {
return convertedDatetime;
}

public static String formatTimeStampInSeconds(long timestampInSeconds, ZoneId timeZoneId) {
public static String formatTimestampInSeconds(long timestampInSeconds) {
return formatTimestampInSeconds(timestampInSeconds, TimeUtils.getSystemTimeZone().toZoneId());
}

public static String formatTimestampInSeconds(long timestampInSeconds, ZoneId timeZoneId) {
return formatTimeStampInMill(timestampInSeconds * 1000, timeZoneId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.time.Clock;
import java.time.DateTimeException;
import java.time.ZoneId;
import java.time.format.DateTimeParseException;
Expand Down Expand Up @@ -161,6 +162,13 @@ public static TimeZone getOrSystemTimeZone(String timeZone) {
return TimeZone.getTimeZone(ZoneId.of(timeZone, TIME_ZONE_ALIAS_MAP));
}

/**
* Get UNIX timestamp/Epoch second at system timezone
*/
public static long getEpochSeconds() {
return Clock.systemDefaultZone().instant().getEpochSecond();
}

public static String longToTimeString(long timeStamp, SimpleDateFormat dateFormat) {
if (timeStamp <= 0L) {
return FeConstants.NULL_STRING;
Expand Down
23 changes: 19 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import com.starrocks.analysis.TableName;
Expand All @@ -28,6 +29,7 @@
import com.starrocks.common.UserException;
import com.starrocks.common.util.DateUtils;
import com.starrocks.common.util.ParseUtil;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.load.pipe.filelist.FileListRepo;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.gson.GsonUtils;
Expand Down Expand Up @@ -98,7 +100,7 @@ public class Pipe implements GsonPostProcessable {
@SerializedName(value = "properties")
private Map<String, String> properties;
@SerializedName(value = "createdTime")
private long createdTime = -1;
private long createdTime;
@SerializedName(value = "load_status")
private LoadStatus loadStatus = new LoadStatus();
@SerializedName(value = "task_execution_variables")
Expand All @@ -120,7 +122,7 @@ protected Pipe(PipeId id, String name, TableName targetTable, FilePipeSource sou
this.state = State.RUNNING;
this.pipeSource = sourceTable;
this.originSql = originSql;
this.createdTime = System.currentTimeMillis();
this.createdTime = TimeUtils.getEpochSeconds();
this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
this.taskExecutionVariables = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
this.taskExecutionVariables.putAll(DEFAULT_TASK_EXECUTION_VARIABLES);
Expand Down Expand Up @@ -496,16 +498,26 @@ public void suspend() {
if (this.state == State.RUNNING) {
this.state = State.SUSPEND;

List<PipeFileRecord> loadingFiles = Lists.newArrayList();
for (PipeTaskDesc task : runningTasks.values()) {
task.interrupt();
if (task.isTaskRunning()) {
task.interrupt();
loadingFiles.addAll(task.getPiece().getFiles());
}
}
LOG.info("suspend pipe " + this);

if (!runningTasks.isEmpty()) {
runningTasks.clear();
LOG.info("suspend pipe {} and clear running tasks {}", this, runningTasks);
runningTasks.clear();
}
loadStatus.loadingFiles = 0;

// Change LOADING files to UNLOADED
if (CollectionUtils.isNotEmpty(loadingFiles)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does this mean?

FileListRepo repo = getPipeSource().getFileListRepo();
repo.updateFileState(loadingFiles, FileListRepo.PipeFileState.UNLOADED, null);
}
}
} finally {
lock.writeLock().unlock();
Expand Down Expand Up @@ -612,6 +624,9 @@ public ErrorInfo getLastErrorInfo() {
return lastErrorInfo;
}

/**
* Unix timestamp in seconds
*/
public long getCreatedTime() {
return createdTime;
}
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/load/pipe/PipeTaskDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ public void interrupt() {
taskManager.killTask(uniqueTaskName, true);
}

/**
* Is the task still running ?
*/
public boolean isTaskRunning() {
if (isFinished()) {
return false;
}
return future != null && !future.isDone();
}

public long getId() {
return id;
}
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/qe/ShowExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1007,9 +1007,9 @@ private void handleShowTableStatus() {
// Auto_increment
row.add(null);
// Create_time
row.add(DateUtils.formatTimeStampInSeconds(table.getCreateTime(), currentTimeZoneId));
row.add(DateUtils.formatTimestampInSeconds(table.getCreateTime(), currentTimeZoneId));
// Update_time
row.add(DateUtils.formatTimeStampInSeconds(info.getUpdate_time(), currentTimeZoneId));
row.add(DateUtils.formatTimestampInSeconds(info.getUpdate_time(), currentTimeZoneId));
// Check_time
row.add(null);
// Collation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2416,7 +2416,7 @@ public Void visitCreatePipeStatement(CreatePipeStmt statement, ConnectContext co
Authorizer.checkDbAction(context.getCurrentUserIdentity(), context.getCurrentRoleIds(),
InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME,
statement.getPipeName().getDbName(), PrivilegeType.CREATE_PIPE);
visitQueryStatement(statement.getInsertStmt().getQueryStatement(), context);
visitInsertStatement(statement.getInsertStmt(), context);
} catch (AccessDeniedException e) {
AccessDeniedException.reportAccessDenied(
InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private static void analyzeProperties(Map<String, String> properties) {
}
if (value < 0) {
ErrorReport.reportSemanticException(ErrorCode.ERR_INVALID_PARAMETER,
PROPERTY_BATCH_SIZE + " should in [0, +oo)");
PROPERTY_BATCH_SIZE + " should be greater than 0");
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.ScalarType;
import com.starrocks.common.util.DateUtils;
import com.starrocks.common.util.OrderByPair;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.load.pipe.Pipe;
import com.starrocks.qe.ShowResultSetMetaData;
import com.starrocks.server.GlobalStateMgr;
Expand Down Expand Up @@ -77,11 +77,7 @@ public static void handleShow(List<Comparable> row, Pipe pipe) {
row.add(Optional.ofNullable(pipe.getTargetTable()).map(TableName::toString).orElse(""));
row.add(pipe.getLoadStatus().toJson());
row.add(pipe.getLastErrorInfo().toJson());
if (pipe.getCreatedTime() == -1) {
row.add(null);
} else {
row.add(TimeUtils.longToTimeString(pipe.getCreatedTime()));
}
row.add(DateUtils.formatTimestampInSeconds(pipe.getCreatedTime()));
}

public static int findSlotIndex(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,37 @@ public void testExecuteFailed() throws Exception {
}
}

@Test
public void testTaskExecution() {
PipeTaskDesc task = new PipeTaskDesc(1, "task", "test", "sql", null);

// normal success
{
CompletableFuture<Constants.TaskRunState> future = new CompletableFuture<>();
future.complete(Constants.TaskRunState.SUCCESS);
task.setFuture(future);
Assert.assertFalse(task.isFinished());
Assert.assertFalse(task.isTaskRunning());
}

// exceptional
{
CompletableFuture<Constants.TaskRunState> future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("task failure"));
task.setFuture(future);
Assert.assertFalse(task.isFinished());
Assert.assertFalse(task.isTaskRunning());
}

// running
{
CompletableFuture<Constants.TaskRunState> future = new CompletableFuture<>();
task.setFuture(future);
Assert.assertFalse(task.isFinished());
Assert.assertTrue(task.isTaskRunning());
}
}

@Test
public void resumeAfterError() throws Exception {
final String pipeName = "p3";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3067,6 +3067,8 @@ public void testPipe() throws Exception {
"as insert into tbl_pipe select * from files('path'='fake://dir/', 'format'='parquet', 'auto_ingest'='false') ";
String createSql2 = "create pipe p2 " +
"as insert into tbl_pipe select * from files('path'='fake://dir/', 'format'='parquet', 'auto_ingest'='false') ";
String dropSql = "drop pipe p1";
String dropSql2 = "drop pipe p2";
ConnectContext ctx = starRocksAssert.getCtx();

ctxToTestUser();
Expand Down Expand Up @@ -3111,21 +3113,40 @@ public void testPipe() throws Exception {
DDLStmtExecutor.execute(UtFrameUtils.parseStmtWithNewParser("drop pipe p1", ctx), ctx);
}

// create pipe
String dbName = connectContext.getDatabase();
verifyGrantRevoke(
createSql,
String.format("grant CREATE PIPE on DATABASE %s to test", dbName),
String.format("revoke CREATE PIPE on DATABASE %s from test", dbName),
"Access denied; you need (at least one of) the CREATE PIPE privilege(s) on DATABASE db1 " +
"for this operation.");
verifyGrantRevoke(
createSql,
"grant CREATE PIPE on ALL DATABASES to test",
"revoke CREATE PIPE on ALL DATABASES from test",
"Access denied; you need (at least one of) the CREATE PIPE privilege(s) on DATABASE db1 " +
"for this operation.");

// test create pipe without insert privilege
{
grantRevokeSqlAsRoot(String.format("grant CREATE PIPE on DATABASE %s to test", dbName));
ctxToTestUser();
verifyGrantRevoke(createSql,
"grant INSERT on tbl_pipe to test",
"revoke INSERT on tbl_pipe from test",
"Access denied; you need (at least one of) the INSERT privilege(s) on TABLE tbl_pipe " +
"for this operation");
grantRevokeSqlAsRoot(String.format("revoke CREATE PIPE on DATABASE %s from test", dbName));

}
// grant insert privilege to user
grantRevokeSqlAsRoot("grant INSERT on tbl_pipe to test");

// create pipe
{
verifyGrantRevoke(
createSql,
String.format("grant CREATE PIPE on DATABASE %s to test", dbName),
String.format("revoke CREATE PIPE on DATABASE %s from test", dbName),
"Access denied; you need (at least one of) the CREATE PIPE privilege(s) on DATABASE db1 " +
"for this operation.");
verifyGrantRevoke(
createSql,
"grant CREATE PIPE on ALL DATABASES to test",
"revoke CREATE PIPE on ALL DATABASES from test",
"Access denied; you need (at least one of) the CREATE PIPE privilege(s) on DATABASE db1 " +
"for this operation.");
}

// ctxToRoot();
DDLStmtExecutor.execute(UtFrameUtils.parseStmtWithNewParser(createSql, ctx), ctx);
DDLStmtExecutor.execute(UtFrameUtils.parseStmtWithNewParser(createSql2, ctx), ctx);

Expand All @@ -3141,6 +3162,8 @@ public void testPipe() throws Exception {
{
// show grants
Assert.assertEquals(ImmutableList.of(
ImmutableList.of("'test'@'%'", "default_catalog",
"GRANT INSERT ON TABLE db1.tbl_pipe TO USER 'test'@'%'"),
ImmutableList.of("'test'@'%'", "default_catalog", "GRANT USAGE ON PIPE db1.p1 TO USER 'test'@'%'")
), starRocksAssert.show("show grants for test"));

Expand All @@ -3149,7 +3172,7 @@ public void testPipe() throws Exception {
req.setType(TGrantsToType.USER);
TGetGrantsToRolesOrUserResponse response = GrantsTo.getGrantsTo(req);
List<TGetGrantsToRolesOrUserItem> items = response.getGrants_to();
String grant = items.get(0).toString();
String grant = items.get(1).toString();
Assert.assertEquals("TGetGrantsToRolesOrUserItem(grantee:'test'@'%', " +
"object_catalog:default_catalog, object_database:db1, object_name:p1, object_type:PIPE, " +
"privilege_type:USAGE, is_grantable:false)", grant);
Expand All @@ -3158,6 +3181,8 @@ public void testPipe() throws Exception {
starRocksAssert.ddl("grant CREATE PIPE ON ALL DATABASES to test");
// show grants
Assert.assertEquals(ImmutableList.of(
ImmutableList.of("'test'@'%'", "default_catalog",
"GRANT INSERT ON TABLE db1.tbl_pipe TO USER 'test'@'%'"),
ImmutableList.of("'test'@'%'", "default_catalog",
"GRANT CREATE PIPE ON ALL DATABASES TO USER 'test'@'%'"),
ImmutableList.of("'test'@'%'", "default_catalog", "GRANT USAGE ON PIPE db1.p1 TO USER 'test'@'%'")
Expand All @@ -3168,22 +3193,30 @@ public void testPipe() throws Exception {
req.setType(TGrantsToType.USER);
response = GrantsTo.getGrantsTo(req);
grant = response.toString();
Assert.assertEquals("TGetGrantsToRolesOrUserResponse(grants_to:" +
"[TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db1, object_type:DATABASE, privilege_type:CREATE PIPE, is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db2, object_type:DATABASE, privilege_type:CREATE PIPE, is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db3, object_type:DATABASE, privilege_type:CREATE PIPE, is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db1, object_name:p1, object_type:PIPE, privilege_type:USAGE, is_grantable:false)])",
grant);
Assert.assertEquals("TGetGrantsToRolesOrUserResponse(grants_to:[" +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', " +
"object_catalog:default_catalog, object_database:db1, object_type:DATABASE, " +
"privilege_type:CREATE PIPE, is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db2, object_type:DATABASE, privilege_type:CREATE PIPE, is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db3, object_type:DATABASE, privilege_type:CREATE PIPE, is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db1, object_name:tbl_pipe, object_type:TABLE, privilege_type:INSERT, " +
"is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db1, object_name:p1, object_type:PIPE, privilege_type:USAGE, " +
"is_grantable:false)])", grant);
starRocksAssert.ddl("revoke CREATE PIPE on ALL DATABASES from test");
}
DDLStmtExecutor.execute(UtFrameUtils.parseStmtWithNewParser("revoke USAGE on PIPE db1.p1 from test", ctx), ctx);
res = new ShowExecutor(ctx, (ShowStmt) UtFrameUtils.parseStmtWithNewParser("show pipes", ctx)).execute();
Assert.assertEquals(0, res.getResultRows().size());
Assert.assertEquals(ImmutableList.of(), starRocksAssert.show("show grants for test"));
Assert.assertEquals(ImmutableList.of(
ImmutableList.of("'test'@'%'", "default_catalog",
"GRANT INSERT ON TABLE db1.tbl_pipe TO USER 'test'@'%'")),
starRocksAssert.show("show grants for test"));


// test desc pipe
verifyGrantRevoke(
Expand Down Expand Up @@ -3253,6 +3286,7 @@ public void testPipe() throws Exception {
"Access denied; you need (at least one of) the DROP privilege(s) on PIPE db1.p1 " +
"for this operation");

grantRevokeSqlAsRoot("revoke INSERT on tbl_pipe from test");
starRocksAssert.dropTable("tbl_pipe");
starRocksAssert.getCtx().setDatabase(null);
}
Expand Down