Skip to content

Commit

Permalink
[Refactor] Refactor GlobalStateMgr (StarRocks#6268)
Browse files Browse the repository at this point in the history
Goals:
- Refactoring GlobalStateMgr (the original Fe catalog) is to prepare for building the connector framework.
- Reduce globalstatemgr redundant code, The original code is reduced from 8000 lines to 3000 lines.

Most of the code in globalstatemgr is transferred to three parts:
- NodeMgr: 
   It is mainly used to manage Fe/Be/Broker, heartbeat and Fe start BDB nodes.
- LocalMetastore: 
   It is mainly responsible for managing metadata related to internal catalog. It includes Cluster/Database/Table/Partition/Tablet/ Replica, and implements the API of `ConnectorMetadata`
- XXXMgr in GlobalStateMgr
   Some methods, such as loadxxx and savexxx, transfer them to the corresponding manager instance.  For example, transfer `loadExportJob()` and `saveExportJob()` in `GlobalStateMgr` to `ExportMgr`
  • Loading branch information
stephen-shelby authored May 19, 2022
1 parent ea5f4cc commit a502fb5
Show file tree
Hide file tree
Showing 22 changed files with 856 additions and 4,948 deletions.
17 changes: 17 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/backup/BackupHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import com.starrocks.common.DdlException;
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReport;
import com.starrocks.common.FeMetaVersion;
import com.starrocks.common.Pair;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.MasterDaemon;
Expand All @@ -63,7 +64,9 @@
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -601,6 +604,20 @@ public void readFields(DataInput in) throws IOException {
dbIdToBackupOrRestoreJob.put(job.getDbId(), job);
}
}

public long saveBackupHandler(DataOutputStream dos, long checksum) throws IOException {
write(dos);
return checksum;
}

public long loadBackupHandler(DataInputStream dis, long checksum, GlobalStateMgr globalStateMgr) throws IOException {
if (GlobalStateMgr.getCurrentStateJournalVersion() >= FeMetaVersion.VERSION_42) {
readFields(dis);
}
setCatalog(globalStateMgr);
LOG.info("finished replay backupHandler from image");
return checksum;
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
Expand All @@ -57,6 +59,8 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.server.GlobalStateMgr.isCheckpointThread;

public class CatalogRecycleBin extends MasterDaemon implements Writable {
private static final Logger LOG = LogManager.getLogger(CatalogRecycleBin.class);
// erase meta at least after MIN_ERASE_LATENCY milliseconds
Expand Down Expand Up @@ -325,7 +329,7 @@ public synchronized void replayEraseTable(long tableId) {
if (tableInfo != null) {
Table table = tableInfo.getTable();
nameToTableInfo.remove(dbId, table.getName());
if (table.getType() == TableType.OLAP && !GlobalStateMgr.isCheckpointThread()) {
if (table.getType() == TableType.OLAP && !isCheckpointThread()) {
GlobalStateMgr.getCurrentState().onEraseOlapTable((OlapTable) table, true);
}
}
Expand Down Expand Up @@ -386,7 +390,7 @@ public synchronized void replayErasePartition(long partitionId) {
idToRecycleTime.remove(partitionId);

Partition partition = partitionInfo.getPartition();
if (!GlobalStateMgr.isCheckpointThread()) {
if (!isCheckpointThread()) {
GlobalStateMgr.getCurrentState().onErasePartition(partition);
}

Expand Down Expand Up @@ -926,4 +930,26 @@ public void readFields(DataInput in) throws IOException {
public synchronized List<Long> getAllDbIds() {
return Lists.newArrayList(idToDatabase.keySet());
}

public long loadRecycleBin(DataInputStream dis, long checksum) throws IOException {
if (GlobalStateMgr.getCurrentStateJournalVersion() >= FeMetaVersion.VERSION_10) {
readFields(dis);
if (!isCheckpointThread()) {
// add tablet in Recycle bin to TabletInvertedIndex
addTabletToInvertedIndex();
}
// create DatabaseTransactionMgr for db in recycle bin.
// these dbs do not exist in `idToDb` of the globalStateMgr.
for (Long dbId : getAllDbIds()) {
GlobalStateMgr.getCurrentGlobalTransactionMgr().addDatabaseTransactionMgr(dbId);
}
}
LOG.info("finished replay recycleBin from image");
return checksum;
}

public long saveRecycleBin(DataOutputStream dos, long checksum) throws IOException {
write(dos);
return checksum;
}
}
117 changes: 117 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/ColocateTableIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,21 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.starrocks.common.DdlException;
import com.starrocks.common.FeMetaVersion;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.persist.ColocatePersistInfo;
import com.starrocks.persist.TablePropertyInfo;
import com.starrocks.server.GlobalStateMgr;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -704,4 +709,116 @@ public void setBackendsSetByIdxForGroup(GroupId groupId, int tabletOrderIdx, Set
writeUnlock();
}
}

public long loadColocateTableIndex(DataInputStream dis, long checksum) throws IOException {
if (GlobalStateMgr.getCurrentStateJournalVersion() >= FeMetaVersion.VERSION_46) {
GlobalStateMgr.getCurrentColocateIndex().readFields(dis);
}
LOG.info("finished replay colocateTableIndex from image");
return checksum;
}

public long saveColocateTableIndex(DataOutputStream dos, long checksum) throws IOException {
write(dos);
return checksum;
}

// the invoker should keep db write lock
public void modifyTableColocate(Database db, OlapTable table, String colocateGroup, boolean isReplay,
GroupId assignedGroupId)
throws DdlException {

String oldGroup = table.getColocateGroup();
GroupId groupId = null;
if (!Strings.isNullOrEmpty(colocateGroup)) {
String fullGroupName = db.getId() + "_" + colocateGroup;
ColocateGroupSchema groupSchema = getGroupSchema(fullGroupName);
if (groupSchema == null) {
// user set a new colocate group,
// check if all partitions all this table has same buckets num and same replication number
PartitionInfo partitionInfo = table.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE) {
int bucketsNum = -1;
short replicationNum = -1;
for (Partition partition : table.getPartitions()) {
if (bucketsNum == -1) {
bucketsNum = partition.getDistributionInfo().getBucketNum();
} else if (bucketsNum != partition.getDistributionInfo().getBucketNum()) {
throw new DdlException(
"Partitions in table " + table.getName() + " have different buckets number");
}

if (replicationNum == -1) {
replicationNum = partitionInfo.getReplicationNum(partition.getId());
} else if (replicationNum != partitionInfo.getReplicationNum(partition.getId())) {
throw new DdlException(
"Partitions in table " + table.getName() + " have different replication number");
}
}
}
} else {
// set to an already exist colocate group, check if this table can be added to this group.
groupSchema.checkColocateSchema(table);
}

List<List<Long>> backendsPerBucketSeq = null;
if (groupSchema == null) {
// assign to a newly created group, set backends sequence.
// we arbitrarily choose a tablet backends sequence from this table,
// let the colocation balancer do the work.
backendsPerBucketSeq = table.getArbitraryTabletBucketsSeq();
}
// change group after getting backends sequence(if has), in case 'getArbitraryTabletBucketsSeq' failed
groupId = changeGroup(db.getId(), table, oldGroup, colocateGroup, assignedGroupId);

if (groupSchema == null) {
Preconditions.checkNotNull(backendsPerBucketSeq);
addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
}

// set this group as unstable
markGroupUnstable(groupId, false /* edit log is along with modify table log */);
table.setColocateGroup(colocateGroup);
} else {
// unset colocation group
if (Strings.isNullOrEmpty(oldGroup)) {
// this table is not a colocate table, do nothing
return;
}

// when replayModifyTableColocate, we need the groupId info
String fullGroupName = db.getId() + "_" + oldGroup;
groupId = getGroupSchema(fullGroupName).getGroupId();

removeTable(table.getId());
table.setColocateGroup(null);
}

if (!isReplay) {
Map<String, String> properties = Maps.newHashMapWithExpectedSize(1);
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, colocateGroup);
TablePropertyInfo info = new TablePropertyInfo(table.getId(), groupId, properties);
GlobalStateMgr.getCurrentState().getEditLog().logModifyTableColocate(info);
}
LOG.info("finished modify table's colocation property. table: {}, is replay: {}",
table.getName(), isReplay);
}

public void replayModifyTableColocate(TablePropertyInfo info) {
long tableId = info.getTableId();
Map<String, String> properties = info.getPropertyMap();

Database db = GlobalStateMgr.getCurrentState().getDb(info.getGroupId().dbId);
db.writeLock();
try {
OlapTable table = (OlapTable) db.getTable(tableId);
modifyTableColocate(db, table, properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH), true,
info.getGroupId());
} catch (DdlException e) {
// should not happen
LOG.warn("failed to replay modify table colocate", e);
} finally {
db.writeUnlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -239,4 +240,9 @@ public ProcResult fetchResult() {
return result;
}
}

public long saveResources(DataOutputStream out, long checksum) throws IOException {
write(out);
return checksum;
}
}
21 changes: 21 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/WorkGroupMgr.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
import com.starrocks.thrift.TWorkGroupOp;
import com.starrocks.thrift.TWorkGroupOpType;
import com.starrocks.thrift.TWorkGroupType;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -37,6 +41,8 @@

// WorkGroupMgr is employed by GlobalStateMgr to manage WorkGroup in FE.
public class WorkGroupMgr implements Writable {
private static final Logger LOG = LogManager.getLogger(WorkGroupMgr.class);

private GlobalStateMgr globalStateMgr;
private Map<String, WorkGroup> workGroupMap = new HashMap<>();
private Map<Long, WorkGroup> id2WorkGroupMap = new HashMap<>();
Expand Down Expand Up @@ -183,6 +189,21 @@ public void readFields(DataInputStream dis) throws IOException {
}
}

public long loadWorkGroups(DataInputStream dis, long checksum) throws IOException {
try {
readFields(dis);
LOG.info("finished replaying WorkGroups from image");
} catch (EOFException e) {
LOG.info("no WorkGroups to replay.");
}
return checksum;
}

public long saveWorkGroups(DataOutputStream dos, long checksum) throws IOException {
write(dos);
return checksum;
}

private void replayAddWorkGroup(WorkGroup workgroup) {
addWorkGroupInternal(workgroup);
WorkGroupOpEntry op = new WorkGroupOpEntry(TWorkGroupOpType.WORKGROUP_OP_CREATE, workgroup);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.starrocks.catalog.Database;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.FeMetaVersion;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.server.GlobalStateMgr;
Expand All @@ -41,7 +42,9 @@

import java.io.BufferedInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -523,4 +526,17 @@ public void readFields(DataInput in) throws IOException {
}
}
}

public long loadSmallFiles(DataInputStream in, long checksum) throws IOException {
if (GlobalStateMgr.getCurrentStateJournalVersion() >= FeMetaVersion.VERSION_52) {
readFields(in);
}
LOG.info("finished replay smallFiles from image");
return checksum;
}

public long saveSmallFiles(DataOutputStream out, long checksum) throws IOException {
write(out);
return checksum;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.starrocks.common.DdlException;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.UserException;
import com.starrocks.sql.ast.CreateMaterializedViewStatement;

import java.util.List;

Expand Down Expand Up @@ -96,6 +97,8 @@ default void renamePartition(Database db, Table table, PartitionRenameClause ren
default void createMaterializedView(CreateMaterializedViewStmt stmt)
throws AnalysisException, DdlException {}

default void createMaterializedView(CreateMaterializedViewStatement statement) throws DdlException {}

default void dropMaterializedView(DropMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException {}

default void createView(CreateViewStmt stmt) throws DdlException {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@

import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -968,6 +969,11 @@ public static DeleteHandler read(DataInput in) throws IOException {
return GsonUtils.GSON.fromJson(json, DeleteHandler.class);
}

public long saveDeleteHandler(DataOutputStream dos, long checksum) throws IOException {
write(dos);
return checksum;
}

public void removeOldDeleteInfo() {
long currentTimeMs = System.currentTimeMillis();
Iterator<Entry<Long, List<MultiDeleteInfo>>> logIterator = dbToDeleteInfos.entrySet().iterator();
Expand Down
Loading

0 comments on commit a502fb5

Please sign in to comment.