Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Refactor GlobalStateMgr #6268

Merged
merged 6 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.starrocks.common.DdlException;
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReport;
import com.starrocks.common.FeMetaVersion;
import com.starrocks.common.Pair;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.MasterDaemon;
Expand All @@ -63,7 +64,9 @@
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -601,6 +604,20 @@ public void readFields(DataInput in) throws IOException {
dbIdToBackupOrRestoreJob.put(job.getDbId(), job);
}
}

public long saveBackupHandler(DataOutputStream dos, long checksum) throws IOException {
write(dos);
return checksum;
}

public long loadBackupHandler(DataInputStream dis, long checksum, GlobalStateMgr globalStateMgr) throws IOException {
if (GlobalStateMgr.getCurrentStateJournalVersion() >= FeMetaVersion.VERSION_42) {
readFields(dis);
}
setCatalog(globalStateMgr);
LOG.info("finished replay backupHandler from image");
return checksum;
}
}
stephen-shelby marked this conversation as resolved.
Show resolved Hide resolved


Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -57,6 +59,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.server.GlobalStateMgr.isCheckpointThread;

public class CatalogRecycleBin extends MasterDaemon implements Writable {
private static final Logger LOG = LogManager.getLogger(CatalogRecycleBin.class);
// erase meta at least after MIN_ERASE_LATENCY milliseconds
Expand Down Expand Up @@ -325,7 +329,7 @@ public synchronized void replayEraseTable(long tableId) {
if (tableInfo != null) {
Table table = tableInfo.getTable();
nameToTableInfo.remove(dbId, table.getName());
if (table.getType() == TableType.OLAP && !GlobalStateMgr.isCheckpointThread()) {
if (table.getType() == TableType.OLAP && !isCheckpointThread()) {
GlobalStateMgr.getCurrentState().onEraseOlapTable((OlapTable) table, true);
}
}
Expand Down Expand Up @@ -386,7 +390,7 @@ public synchronized void replayErasePartition(long partitionId) {
idToRecycleTime.remove(partitionId);

Partition partition = partitionInfo.getPartition();
if (!GlobalStateMgr.isCheckpointThread()) {
if (!isCheckpointThread()) {
GlobalStateMgr.getCurrentState().onErasePartition(partition);
}

Expand Down Expand Up @@ -926,4 +930,26 @@ public void readFields(DataInput in) throws IOException {
public synchronized List<Long> getAllDbIds() {
return Lists.newArrayList(idToDatabase.keySet());
}

public long loadRecycleBin(DataInputStream dis, long checksum) throws IOException {
if (GlobalStateMgr.getCurrentStateJournalVersion() >= FeMetaVersion.VERSION_10) {
readFields(dis);
if (!isCheckpointThread()) {
// add tablet in Recycle bin to TabletInvertedIndex
addTabletToInvertedIndex();
}
// create DatabaseTransactionMgr for db in recycle bin.
// these dbs do not exist in `idToDb` of the globalStateMgr.
for (Long dbId : getAllDbIds()) {
GlobalStateMgr.getCurrentGlobalTransactionMgr().addDatabaseTransactionMgr(dbId);
}
}
LOG.info("finished replay recycleBin from image");
return checksum;
}

public long saveRecycleBin(DataOutputStream dos, long checksum) throws IOException {
write(dos);
return checksum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,21 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.starrocks.common.DdlException;
import com.starrocks.common.FeMetaVersion;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.persist.ColocatePersistInfo;
import com.starrocks.persist.TablePropertyInfo;
import com.starrocks.server.GlobalStateMgr;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -704,4 +709,116 @@ public void setBackendsSetByIdxForGroup(GroupId groupId, int tabletOrderIdx, Set
writeUnlock();
}
}

public long loadColocateTableIndex(DataInputStream dis, long checksum) throws IOException {
if (GlobalStateMgr.getCurrentStateJournalVersion() >= FeMetaVersion.VERSION_46) {
GlobalStateMgr.getCurrentColocateIndex().readFields(dis);
}
LOG.info("finished replay colocateTableIndex from image");
return checksum;
}

public long saveColocateTableIndex(DataOutputStream dos, long checksum) throws IOException {
write(dos);
return checksum;
}

// the invoker should keep db write lock
public void modifyTableColocate(Database db, OlapTable table, String colocateGroup, boolean isReplay,
GroupId assignedGroupId)
throws DdlException {

String oldGroup = table.getColocateGroup();
GroupId groupId = null;
if (!Strings.isNullOrEmpty(colocateGroup)) {
String fullGroupName = db.getId() + "_" + colocateGroup;
ColocateGroupSchema groupSchema = getGroupSchema(fullGroupName);
if (groupSchema == null) {
// user set a new colocate group,
// check if all partitions all this table has same buckets num and same replication number
PartitionInfo partitionInfo = table.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE) {
int bucketsNum = -1;
short replicationNum = -1;
for (Partition partition : table.getPartitions()) {
if (bucketsNum == -1) {
bucketsNum = partition.getDistributionInfo().getBucketNum();
} else if (bucketsNum != partition.getDistributionInfo().getBucketNum()) {
throw new DdlException(
"Partitions in table " + table.getName() + " have different buckets number");
}

if (replicationNum == -1) {
replicationNum = partitionInfo.getReplicationNum(partition.getId());
} else if (replicationNum != partitionInfo.getReplicationNum(partition.getId())) {
throw new DdlException(
"Partitions in table " + table.getName() + " have different replication number");
}
}
}
} else {
// set to an already exist colocate group, check if this table can be added to this group.
groupSchema.checkColocateSchema(table);
}

List<List<Long>> backendsPerBucketSeq = null;
if (groupSchema == null) {
// assign to a newly created group, set backends sequence.
// we arbitrarily choose a tablet backends sequence from this table,
// let the colocation balancer do the work.
backendsPerBucketSeq = table.getArbitraryTabletBucketsSeq();
}
// change group after getting backends sequence(if has), in case 'getArbitraryTabletBucketsSeq' failed
groupId = changeGroup(db.getId(), table, oldGroup, colocateGroup, assignedGroupId);

if (groupSchema == null) {
Preconditions.checkNotNull(backendsPerBucketSeq);
addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
}

// set this group as unstable
markGroupUnstable(groupId, false /* edit log is along with modify table log */);
table.setColocateGroup(colocateGroup);
} else {
// unset colocation group
if (Strings.isNullOrEmpty(oldGroup)) {
// this table is not a colocate table, do nothing
return;
}

// when replayModifyTableColocate, we need the groupId info
String fullGroupName = db.getId() + "_" + oldGroup;
groupId = getGroupSchema(fullGroupName).getGroupId();

removeTable(table.getId());
table.setColocateGroup(null);
}

if (!isReplay) {
Map<String, String> properties = Maps.newHashMapWithExpectedSize(1);
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, colocateGroup);
TablePropertyInfo info = new TablePropertyInfo(table.getId(), groupId, properties);
GlobalStateMgr.getCurrentState().getEditLog().logModifyTableColocate(info);
}
LOG.info("finished modify table's colocation property. table: {}, is replay: {}",
table.getName(), isReplay);
}

public void replayModifyTableColocate(TablePropertyInfo info) {
long tableId = info.getTableId();
Map<String, String> properties = info.getPropertyMap();

Database db = GlobalStateMgr.getCurrentState().getDb(info.getGroupId().dbId);
db.writeLock();
try {
OlapTable table = (OlapTable) db.getTable(tableId);
modifyTableColocate(db, table, properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH), true,
info.getGroupId());
} catch (DdlException e) {
// should not happen
LOG.warn("failed to replay modify table colocate", e);
} finally {
db.writeUnlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -239,4 +240,9 @@ public ProcResult fetchResult() {
return result;
}
}

public long saveResources(DataOutputStream out, long checksum) throws IOException {
write(out);
return checksum;
}
}
Astralidea marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import com.starrocks.thrift.TWorkGroupOp;
import com.starrocks.thrift.TWorkGroupOpType;
import com.starrocks.thrift.TWorkGroupType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -37,6 +41,8 @@

// WorkGroupMgr is employed by GlobalStateMgr to manage WorkGroup in FE.
public class WorkGroupMgr implements Writable {
private static final Logger LOG = LogManager.getLogger(WorkGroupMgr.class);

private GlobalStateMgr globalStateMgr;
private Map<String, WorkGroup> workGroupMap = new HashMap<>();
private Map<Long, WorkGroup> id2WorkGroupMap = new HashMap<>();
Expand Down Expand Up @@ -183,6 +189,21 @@ public void readFields(DataInputStream dis) throws IOException {
}
}

public long loadWorkGroups(DataInputStream dis, long checksum) throws IOException {
try {
readFields(dis);
LOG.info("finished replaying WorkGroups from image");
} catch (EOFException e) {
LOG.info("no WorkGroups to replay.");
}
return checksum;
}

public long saveWorkGroups(DataOutputStream dos, long checksum) throws IOException {
write(dos);
return checksum;
}

private void replayAddWorkGroup(WorkGroup workgroup) {
addWorkGroupInternal(workgroup);
WorkGroupOpEntry op = new WorkGroupOpEntry(TWorkGroupOpType.WORKGROUP_OP_CREATE, workgroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.starrocks.catalog.Database;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.FeMetaVersion;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.server.GlobalStateMgr;
Expand All @@ -41,7 +42,9 @@

import java.io.BufferedInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -523,4 +526,17 @@ public void readFields(DataInput in) throws IOException {
}
}
}

public long loadSmallFiles(DataInputStream in, long checksum) throws IOException {
if (GlobalStateMgr.getCurrentStateJournalVersion() >= FeMetaVersion.VERSION_52) {
readFields(in);
}
LOG.info("finished replay smallFiles from image");
return checksum;
}

public long saveSmallFiles(DataOutputStream out, long checksum) throws IOException {
write(out);
return checksum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.starrocks.common.DdlException;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.UserException;
import com.starrocks.sql.ast.CreateMaterializedViewStatement;

import java.util.List;

Expand Down Expand Up @@ -96,6 +97,8 @@ default void renamePartition(Database db, Table table, PartitionRenameClause ren
default void createMaterializedView(CreateMaterializedViewStmt stmt)
throws AnalysisException, DdlException {}

default void createMaterializedView(CreateMaterializedViewStatement statement) throws DdlException {}

gengjun-git marked this conversation as resolved.
Show resolved Hide resolved
default void dropMaterializedView(DropMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException {}

default void createView(CreateViewStmt stmt) throws DdlException {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@

import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -968,6 +969,11 @@ public static DeleteHandler read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(json, DeleteHandler.class);
}

public long saveDeleteHandler(DataOutputStream dos, long checksum) throws IOException {
write(dos);
return checksum;
}

public void removeOldDeleteInfo() {
Astralidea marked this conversation as resolved.
Show resolved Hide resolved
long currentTimeMs = System.currentTimeMillis();
Iterator<Entry<Long, List<MultiDeleteInfo>>> logIterator = dbToDeleteInfos.entrySet().iterator();
Expand Down
Loading