Skip to content

Commit

Permalink
[BugFix] fix several bugs of pipe (#34871)
Browse files Browse the repository at this point in the history
Signed-off-by: Murphy <mofei@starrocks.com>
  • Loading branch information
murphyatwork authored Nov 14, 2023
1 parent a49d114 commit 31b9a33
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class DateUtils {
.addSerializationExclusionStrategy(new GsonUtils.HiddenAnnotationExclusionStrategy())
.addDeserializationExclusionStrategy(new GsonUtils.HiddenAnnotationExclusionStrategy())
.enableComplexMapKeySerialization()
.disableHtmlEscaping()
.registerTypeAdapter(LocalDateTime.class, LOCAL_DATETIME_PRINTER)
.create();
/*
Expand Down Expand Up @@ -172,7 +173,11 @@ public static String convertDateTimeFormaterToSecondFormater(String datetime) {
return convertedDatetime;
}

public static String formatTimeStampInSeconds(long timestampInSeconds, ZoneId timeZoneId) {
public static String formatTimestampInSeconds(long timestampInSeconds) {
return formatTimestampInSeconds(timestampInSeconds, TimeUtils.getSystemTimeZone().toZoneId());
}

public static String formatTimestampInSeconds(long timestampInSeconds, ZoneId timeZoneId) {
return formatTimeStampInMill(timestampInSeconds * 1000, timeZoneId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.text.ParseException;
import java.text.ParsePosition;
import java.text.SimpleDateFormat;
import java.time.Clock;
import java.time.DateTimeException;
import java.time.ZoneId;
import java.time.format.DateTimeParseException;
Expand Down Expand Up @@ -161,6 +162,13 @@ public static TimeZone getOrSystemTimeZone(String timeZone) {
return TimeZone.getTimeZone(ZoneId.of(timeZone, TIME_ZONE_ALIAS_MAP));
}

/**
* Get UNIX timestamp/Epoch second at system timezone
*/
public static long getEpochSeconds() {
return Clock.systemDefaultZone().instant().getEpochSecond();
}

public static String longToTimeString(long timeStamp, SimpleDateFormat dateFormat) {
if (timeStamp <= 0L) {
return FeConstants.NULL_STRING;
Expand Down
23 changes: 19 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/load/pipe/Pipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import com.starrocks.analysis.TableName;
Expand All @@ -28,6 +29,7 @@
import com.starrocks.common.UserException;
import com.starrocks.common.util.DateUtils;
import com.starrocks.common.util.ParseUtil;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.load.pipe.filelist.FileListRepo;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.gson.GsonUtils;
Expand Down Expand Up @@ -98,7 +100,7 @@ public class Pipe implements GsonPostProcessable {
@SerializedName(value = "properties")
private Map<String, String> properties;
@SerializedName(value = "createdTime")
private long createdTime = -1;
private long createdTime;
@SerializedName(value = "load_status")
private LoadStatus loadStatus = new LoadStatus();
@SerializedName(value = "task_execution_variables")
Expand All @@ -120,7 +122,7 @@ protected Pipe(PipeId id, String name, TableName targetTable, FilePipeSource sou
this.state = State.RUNNING;
this.pipeSource = sourceTable;
this.originSql = originSql;
this.createdTime = System.currentTimeMillis();
this.createdTime = TimeUtils.getEpochSeconds();
this.properties = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
this.taskExecutionVariables = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
this.taskExecutionVariables.putAll(DEFAULT_TASK_EXECUTION_VARIABLES);
Expand Down Expand Up @@ -496,16 +498,26 @@ public void suspend() {
if (this.state == State.RUNNING) {
this.state = State.SUSPEND;

List<PipeFileRecord> loadingFiles = Lists.newArrayList();
for (PipeTaskDesc task : runningTasks.values()) {
task.interrupt();
if (task.isTaskRunning()) {
task.interrupt();
loadingFiles.addAll(task.getPiece().getFiles());
}
}
LOG.info("suspend pipe " + this);

if (!runningTasks.isEmpty()) {
runningTasks.clear();
LOG.info("suspend pipe {} and clear running tasks {}", this, runningTasks);
runningTasks.clear();
}
loadStatus.loadingFiles = 0;

// Change LOADING files to UNLOADED
if (CollectionUtils.isNotEmpty(loadingFiles)) {
FileListRepo repo = getPipeSource().getFileListRepo();
repo.updateFileState(loadingFiles, FileListRepo.PipeFileState.UNLOADED, null);
}
}
} finally {
lock.writeLock().unlock();
Expand Down Expand Up @@ -612,6 +624,9 @@ public ErrorInfo getLastErrorInfo() {
return lastErrorInfo;
}

/**
* Unix timestamp in seconds
*/
public long getCreatedTime() {
return createdTime;
}
Expand Down
10 changes: 10 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/load/pipe/PipeTaskDesc.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,16 @@ public void interrupt() {
taskManager.killTask(uniqueTaskName, true);
}

/**
* Is the task still running ?
*/
public boolean isTaskRunning() {
if (isFinished()) {
return false;
}
return future != null && !future.isDone();
}

public long getId() {
return id;
}
Expand Down
4 changes: 2 additions & 2 deletions fe/fe-core/src/main/java/com/starrocks/qe/ShowExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1007,9 +1007,9 @@ private void handleShowTableStatus() {
// Auto_increment
row.add(null);
// Create_time
row.add(DateUtils.formatTimeStampInSeconds(table.getCreateTime(), currentTimeZoneId));
row.add(DateUtils.formatTimestampInSeconds(table.getCreateTime(), currentTimeZoneId));
// Update_time
row.add(DateUtils.formatTimeStampInSeconds(info.getUpdate_time(), currentTimeZoneId));
row.add(DateUtils.formatTimestampInSeconds(info.getUpdate_time(), currentTimeZoneId));
// Check_time
row.add(null);
// Collation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2416,7 +2416,7 @@ public Void visitCreatePipeStatement(CreatePipeStmt statement, ConnectContext co
Authorizer.checkDbAction(context.getCurrentUserIdentity(), context.getCurrentRoleIds(),
InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME,
statement.getPipeName().getDbName(), PrivilegeType.CREATE_PIPE);
visitQueryStatement(statement.getInsertStmt().getQueryStatement(), context);
visitInsertStatement(statement.getInsertStmt(), context);
} catch (AccessDeniedException e) {
AccessDeniedException.reportAccessDenied(
InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private static void analyzeProperties(Map<String, String> properties) {
}
if (value < 0) {
ErrorReport.reportSemanticException(ErrorCode.ERR_INVALID_PARAMETER,
PROPERTY_BATCH_SIZE + " should in [0, +oo)");
PROPERTY_BATCH_SIZE + " should be greater than 0");
}
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.ScalarType;
import com.starrocks.common.util.DateUtils;
import com.starrocks.common.util.OrderByPair;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.load.pipe.Pipe;
import com.starrocks.qe.ShowResultSetMetaData;
import com.starrocks.server.GlobalStateMgr;
Expand Down Expand Up @@ -77,11 +77,7 @@ public static void handleShow(List<Comparable> row, Pipe pipe) {
row.add(Optional.ofNullable(pipe.getTargetTable()).map(TableName::toString).orElse(""));
row.add(pipe.getLoadStatus().toJson());
row.add(pipe.getLastErrorInfo().toJson());
if (pipe.getCreatedTime() == -1) {
row.add(null);
} else {
row.add(TimeUtils.longToTimeString(pipe.getCreatedTime()));
}
row.add(DateUtils.formatTimestampInSeconds(pipe.getCreatedTime()));
}

public static int findSlotIndex(String name) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,37 @@ public void testExecuteFailed() throws Exception {
}
}

@Test
public void testTaskExecution() {
PipeTaskDesc task = new PipeTaskDesc(1, "task", "test", "sql", null);

// normal success
{
CompletableFuture<Constants.TaskRunState> future = new CompletableFuture<>();
future.complete(Constants.TaskRunState.SUCCESS);
task.setFuture(future);
Assert.assertFalse(task.isFinished());
Assert.assertFalse(task.isTaskRunning());
}

// exceptional
{
CompletableFuture<Constants.TaskRunState> future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException("task failure"));
task.setFuture(future);
Assert.assertFalse(task.isFinished());
Assert.assertFalse(task.isTaskRunning());
}

// running
{
CompletableFuture<Constants.TaskRunState> future = new CompletableFuture<>();
task.setFuture(future);
Assert.assertFalse(task.isFinished());
Assert.assertTrue(task.isTaskRunning());
}
}

@Test
public void resumeAfterError() throws Exception {
final String pipeName = "p3";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3067,6 +3067,8 @@ public void testPipe() throws Exception {
"as insert into tbl_pipe select * from files('path'='fake://dir/', 'format'='parquet', 'auto_ingest'='false') ";
String createSql2 = "create pipe p2 " +
"as insert into tbl_pipe select * from files('path'='fake://dir/', 'format'='parquet', 'auto_ingest'='false') ";
String dropSql = "drop pipe p1";
String dropSql2 = "drop pipe p2";
ConnectContext ctx = starRocksAssert.getCtx();

ctxToTestUser();
Expand Down Expand Up @@ -3111,21 +3113,40 @@ public void testPipe() throws Exception {
DDLStmtExecutor.execute(UtFrameUtils.parseStmtWithNewParser("drop pipe p1", ctx), ctx);
}

// create pipe
String dbName = connectContext.getDatabase();
verifyGrantRevoke(
createSql,
String.format("grant CREATE PIPE on DATABASE %s to test", dbName),
String.format("revoke CREATE PIPE on DATABASE %s from test", dbName),
"Access denied; you need (at least one of) the CREATE PIPE privilege(s) on DATABASE db1 " +
"for this operation.");
verifyGrantRevoke(
createSql,
"grant CREATE PIPE on ALL DATABASES to test",
"revoke CREATE PIPE on ALL DATABASES from test",
"Access denied; you need (at least one of) the CREATE PIPE privilege(s) on DATABASE db1 " +
"for this operation.");

// test create pipe without insert privilege
{
grantRevokeSqlAsRoot(String.format("grant CREATE PIPE on DATABASE %s to test", dbName));
ctxToTestUser();
verifyGrantRevoke(createSql,
"grant INSERT on tbl_pipe to test",
"revoke INSERT on tbl_pipe from test",
"Access denied; you need (at least one of) the INSERT privilege(s) on TABLE tbl_pipe " +
"for this operation");
grantRevokeSqlAsRoot(String.format("revoke CREATE PIPE on DATABASE %s from test", dbName));

}
// grant insert privilege to user
grantRevokeSqlAsRoot("grant INSERT on tbl_pipe to test");

// create pipe
{
verifyGrantRevoke(
createSql,
String.format("grant CREATE PIPE on DATABASE %s to test", dbName),
String.format("revoke CREATE PIPE on DATABASE %s from test", dbName),
"Access denied; you need (at least one of) the CREATE PIPE privilege(s) on DATABASE db1 " +
"for this operation.");
verifyGrantRevoke(
createSql,
"grant CREATE PIPE on ALL DATABASES to test",
"revoke CREATE PIPE on ALL DATABASES from test",
"Access denied; you need (at least one of) the CREATE PIPE privilege(s) on DATABASE db1 " +
"for this operation.");
}

// ctxToRoot();
DDLStmtExecutor.execute(UtFrameUtils.parseStmtWithNewParser(createSql, ctx), ctx);
DDLStmtExecutor.execute(UtFrameUtils.parseStmtWithNewParser(createSql2, ctx), ctx);

Expand All @@ -3141,6 +3162,8 @@ public void testPipe() throws Exception {
{
// show grants
Assert.assertEquals(ImmutableList.of(
ImmutableList.of("'test'@'%'", "default_catalog",
"GRANT INSERT ON TABLE db1.tbl_pipe TO USER 'test'@'%'"),
ImmutableList.of("'test'@'%'", "default_catalog", "GRANT USAGE ON PIPE db1.p1 TO USER 'test'@'%'")
), starRocksAssert.show("show grants for test"));

Expand All @@ -3149,7 +3172,7 @@ public void testPipe() throws Exception {
req.setType(TGrantsToType.USER);
TGetGrantsToRolesOrUserResponse response = GrantsTo.getGrantsTo(req);
List<TGetGrantsToRolesOrUserItem> items = response.getGrants_to();
String grant = items.get(0).toString();
String grant = items.get(1).toString();
Assert.assertEquals("TGetGrantsToRolesOrUserItem(grantee:'test'@'%', " +
"object_catalog:default_catalog, object_database:db1, object_name:p1, object_type:PIPE, " +
"privilege_type:USAGE, is_grantable:false)", grant);
Expand All @@ -3158,6 +3181,8 @@ public void testPipe() throws Exception {
starRocksAssert.ddl("grant CREATE PIPE ON ALL DATABASES to test");
// show grants
Assert.assertEquals(ImmutableList.of(
ImmutableList.of("'test'@'%'", "default_catalog",
"GRANT INSERT ON TABLE db1.tbl_pipe TO USER 'test'@'%'"),
ImmutableList.of("'test'@'%'", "default_catalog",
"GRANT CREATE PIPE ON ALL DATABASES TO USER 'test'@'%'"),
ImmutableList.of("'test'@'%'", "default_catalog", "GRANT USAGE ON PIPE db1.p1 TO USER 'test'@'%'")
Expand All @@ -3168,22 +3193,30 @@ public void testPipe() throws Exception {
req.setType(TGrantsToType.USER);
response = GrantsTo.getGrantsTo(req);
grant = response.toString();
Assert.assertEquals("TGetGrantsToRolesOrUserResponse(grants_to:" +
"[TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db1, object_type:DATABASE, privilege_type:CREATE PIPE, is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db2, object_type:DATABASE, privilege_type:CREATE PIPE, is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db3, object_type:DATABASE, privilege_type:CREATE PIPE, is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db1, object_name:p1, object_type:PIPE, privilege_type:USAGE, is_grantable:false)])",
grant);
Assert.assertEquals("TGetGrantsToRolesOrUserResponse(grants_to:[" +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', " +
"object_catalog:default_catalog, object_database:db1, object_type:DATABASE, " +
"privilege_type:CREATE PIPE, is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db2, object_type:DATABASE, privilege_type:CREATE PIPE, is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db3, object_type:DATABASE, privilege_type:CREATE PIPE, is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db1, object_name:tbl_pipe, object_type:TABLE, privilege_type:INSERT, " +
"is_grantable:false), " +
"TGetGrantsToRolesOrUserItem(grantee:'test'@'%', object_catalog:default_catalog, " +
"object_database:db1, object_name:p1, object_type:PIPE, privilege_type:USAGE, " +
"is_grantable:false)])", grant);
starRocksAssert.ddl("revoke CREATE PIPE on ALL DATABASES from test");
}
DDLStmtExecutor.execute(UtFrameUtils.parseStmtWithNewParser("revoke USAGE on PIPE db1.p1 from test", ctx), ctx);
res = new ShowExecutor(ctx, (ShowStmt) UtFrameUtils.parseStmtWithNewParser("show pipes", ctx)).execute();
Assert.assertEquals(0, res.getResultRows().size());
Assert.assertEquals(ImmutableList.of(), starRocksAssert.show("show grants for test"));
Assert.assertEquals(ImmutableList.of(
ImmutableList.of("'test'@'%'", "default_catalog",
"GRANT INSERT ON TABLE db1.tbl_pipe TO USER 'test'@'%'")),
starRocksAssert.show("show grants for test"));


// test desc pipe
verifyGrantRevoke(
Expand Down Expand Up @@ -3253,6 +3286,7 @@ public void testPipe() throws Exception {
"Access denied; you need (at least one of) the DROP privilege(s) on PIPE db1.p1 " +
"for this operation");

grantRevokeSqlAsRoot("revoke INSERT on tbl_pipe from test");
starRocksAssert.dropTable("tbl_pipe");
starRocksAssert.getCtx().setDatabase(null);
}
Expand Down

0 comments on commit 31b9a33

Please sign in to comment.