Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Remove all methods of Cluster #25185

Merged
merged 1 commit into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[Refactor] Remove all methods of Cluster
Signed-off-by: imay <buaa.zhaoc@gmail.com>
  • Loading branch information
imay committed Jun 13, 2023
commit c3940d09e9bd0d59bcd1ed2e5d3bf0f1cc6dc794
172 changes: 24 additions & 148 deletions fe/fe-core/src/main/java/com/starrocks/cluster/Cluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,49 +34,21 @@

package com.starrocks.cluster;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.starrocks.catalog.system.SystemId;
import com.starrocks.catalog.system.information.InfoSchemaDb;
import com.starrocks.catalog.system.starrocks.StarRocksDb;
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.persist.LinkDbInfo;
import com.starrocks.system.SystemInfoService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;

// Now Cluster don't have read interface, in order to be back compatible.
// We will remove the persistent format later.
@Deprecated
public class Cluster implements Writable {
private static final Logger LOG = LogManager.getLogger(Cluster.class);

private Long id;
private String name;
// backend which cluster own
private Set<Long> backendIdSet = ConcurrentHashMap.newKeySet();
// compute node which cluster own
private Set<Long> computeNodeIdSet = ConcurrentHashMap.newKeySet();

private ConcurrentHashMap<String, LinkDbInfo> linkDbNames = new ConcurrentHashMap<>();
private ConcurrentHashMap<Long, LinkDbInfo> linkDbIds = new ConcurrentHashMap<>();

private Set<Long> dbIds = ConcurrentHashMap.newKeySet();
private Set<String> dbNames = ConcurrentHashMap.newKeySet();
private ConcurrentHashMap<String, Long> dbNameToIDs = new ConcurrentHashMap<>();

// lock to perform atomic operations
private ReentrantLock lock = new ReentrantLock(true);

private Cluster() {
// for persist
Expand All @@ -87,79 +59,14 @@ public Cluster(String name, long id) {
this.id = id;
}

private void lock() {
this.lock.lock();
}

private void unlock() {
this.lock.unlock();
}

public Long getId() {
return id;
}

public void addDb(String name, long id) {
if (Strings.isNullOrEmpty(name)) {
return;
}
lock();
try {
dbNames.add(name);
dbIds.add(id);
dbNameToIDs.put(name, id);
} finally {
unlock();
}
}

public void removeDb(String name, long id) {
lock();
try {
dbNames.remove(name);
dbIds.remove(id);
} finally {
unlock();
}
}

// Just for check
public boolean isEmpty() {
return backendIdSet == null || backendIdSet.isEmpty();
}

public boolean isDefaultCluster() {
return SystemInfoService.DEFAULT_CLUSTER.equalsIgnoreCase(name);
}

public List<Long> getComputeNodeIdList() {
return Lists.newArrayList(computeNodeIdSet);
}

public void setBackendIdList(List<Long> backendIdList) {
if (backendIdList == null) {
return;
}
backendIdSet = ConcurrentHashMap.newKeySet();
backendIdSet.addAll(backendIdList);
}

public void addComputeNode(long computeNodeId) {
computeNodeIdSet.add(computeNodeId);
}

public void addBackend(long backendId) {
backendIdSet.add(backendId);
}

public void removeComputeNode(long removedComputeNodeId) {
computeNodeIdSet.remove((Long) removedComputeNodeId);
}

public void removeBackend(long removedBackendId) {
backendIdSet.remove((Long) removedBackendId);
}

public static Cluster read(DataInput in) throws IOException {
Cluster cluster = new Cluster();
cluster.readFields(in);
Expand All @@ -171,89 +78,58 @@ public void write(DataOutput out) throws IOException {
out.writeLong(id);
Text.writeString(out, name);

out.writeLong(backendIdSet.size());
for (Long id : backendIdSet) {
out.writeLong(id);
}
// backendIdSet: Set<Long>
out.writeLong(0);

int dbCount = dbIds.size();
if (dbNames.contains(InfoSchemaDb.DATABASE_NAME)) {
dbCount--;
}
// dbNames: Set<String>
out.writeInt(0);

if (dbNames.contains(StarRocksDb.DATABASE_NAME) &&
dbNameToIDs.get(StarRocksDb.DATABASE_NAME).equals(SystemId.STARROCKS_DB_ID)) {
dbCount--;
}
// dbIds: Set<Long>
out.writeInt(0);

out.writeInt(dbCount);
// don't persist InfoSchemaDb meta
for (String name : dbNames) {
if (name.equals(InfoSchemaDb.DATABASE_NAME)) {
dbIds.remove(dbNameToIDs.get(name));
} else if (name.equals(StarRocksDb.DATABASE_NAME) &&
dbNameToIDs.get(StarRocksDb.DATABASE_NAME).equals(SystemId.STARROCKS_DB_ID)) {
dbIds.remove(dbNameToIDs.get(name));
} else {
Text.writeString(out, ClusterNamespace.getFullName(name));
}
}

String errMsg = String.format("%d vs %d, fatal error, Write cluster meta failed!", dbCount, dbIds.size());

// ensure we have removed InfoSchemaDb id
Preconditions.checkState(dbCount == dbIds.size(), errMsg);

out.writeInt(dbCount);
for (long id : dbIds) {
out.writeLong(id);
}
// linkDbNames: ConcurrentHashMap<String, LinkDbInfo>
out.writeInt(0);

out.writeInt(linkDbNames.size());
for (Map.Entry<String, LinkDbInfo> infoMap : linkDbNames.entrySet()) {
Text.writeString(out, infoMap.getKey());
infoMap.getValue().write(out);
}

out.writeInt(linkDbIds.size());
for (Map.Entry<Long, LinkDbInfo> infoMap : linkDbIds.entrySet()) {
out.writeLong(infoMap.getKey());
infoMap.getValue().write(out);
}
// linkDbIds: ConcurrentHashMap<Long, LinkDbInfo>
out.writeInt(0);
}

public void readFields(DataInput in) throws IOException {
id = in.readLong();
name = Text.readString(in);
Long len = in.readLong();

// backendIdSet: Set<Long>
long len = in.readLong();
while (len-- > 0) {
Long id = in.readLong();
backendIdSet.add(id);
in.readLong();
}

// dbNames: Set<String>
int count = in.readInt();
while (count-- > 0) {
dbNames.add(ClusterNamespace.getNameFromFullName(Text.readString(in)));
Text.readString(in);
}

// dbIds: Set<Long>
count = in.readInt();
while (count-- > 0) {
dbIds.add(in.readLong());
in.readLong();
}

// linkDbNames: ConcurrentHashMap<String, LinkDbInfo>
count = in.readInt();
while (count-- > 0) {
final String key = Text.readString(in);
Text.readString(in);
final LinkDbInfo value = new LinkDbInfo();
value.readFields(in);
linkDbNames.put(key, value);
}

// linkDbIds: ConcurrentHashMap<Long, LinkDbInfo>
count = in.readInt();
while (count-- > 0) {
final long key = in.readLong();
in.readLong();
final LinkDbInfo value = new LinkDbInfo();
value.readFields(in);
linkDbIds.put(key, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.DataOutput;
import java.io.IOException;

@Deprecated
public class LinkDbInfo implements Writable {
private String cluster;
private String name;
Expand All @@ -34,12 +35,6 @@ public LinkDbInfo() {
this.name = "";
}

public LinkDbInfo(String name, long id) {
this.name = name;
this.id = id;
this.cluster = "";
}

public String getName() {
return name;
}
Expand Down
33 changes: 0 additions & 33 deletions fe/fe-core/src/main/java/com/starrocks/server/LocalMetastore.java
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,6 @@ public void unprotectCreateDb(Database db) {
db.writeLock();
db.setExist(true);
db.writeUnlock();
final Cluster cluster = defaultCluster;
cluster.addDb(db.getFullName(), db.getId());
stateMgr.getGlobalTransactionMgr().addDatabaseTransactionMgr(db.getId());
}

Expand Down Expand Up @@ -473,8 +471,6 @@ public void dropDb(String dbName, boolean isForceDrop) throws DdlException, Meta
// 3. remove db from globalStateMgr
idToDb.remove(db.getId());
fullNameToDb.remove(db.getFullName());
final Cluster cluster = defaultCluster;
cluster.removeDb(dbName, db.getId());

// 4. drop mv task
TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager();
Expand Down Expand Up @@ -528,8 +524,6 @@ public void replayDropDb(String dbName, boolean isForceDrop) throws DdlException

fullNameToDb.remove(dbName);
idToDb.remove(db.getId());
final Cluster cluster = defaultCluster;
cluster.removeDb(dbName, db.getId());

LOG.info("finish replay drop db, name: {}, id: {}", dbName, db.getId());
} finally {
Expand Down Expand Up @@ -565,8 +559,6 @@ public void recoverDatabase(RecoverDbStmt recoverStmt) throws DdlException {
db.writeLock();
db.setExist(true);
db.writeUnlock();
final Cluster cluster = defaultCluster;
cluster.addDb(db.getFullName(), db.getId());

List<MaterializedView> materializedViews = db.getMaterializedViews();
TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager();
Expand Down Expand Up @@ -696,15 +688,10 @@ public void renameDatabase(AlterDatabaseRenameStatement stmt) throws DdlExceptio
}

Database db;
Cluster cluster;
if (!tryLock(false)) {
throw new DdlException("Failed to acquire globalStateMgr lock. Try again");
}
try {
cluster = defaultCluster;
if (cluster == null) {
ErrorReport.reportDdlException(ErrorCode.ERR_CLUSTER_NO_EXISTS, SystemInfoService.DEFAULT_CLUSTER);
}
// check if db exists
db = fullNameToDb.get(fullDbName);
if (db == null) {
Expand All @@ -715,8 +702,6 @@ public void renameDatabase(AlterDatabaseRenameStatement stmt) throws DdlExceptio
if (fullNameToDb.get(newFullDbName) != null) {
throw new DdlException("Database name[" + newFullDbName + "] is already used");
}
cluster.removeDb(db.getFullName(), db.getId());
cluster.addDb(newFullDbName, db.getId());
// 1. rename db
db.setNameWithLock(newFullDbName);

Expand All @@ -738,10 +723,7 @@ public void replayRenameDatabase(String dbName, String newDbName) {
tryLock(true);
try {
Database db = fullNameToDb.get(dbName);
Cluster cluster = defaultCluster;
cluster.removeDb(db.getFullName(), db.getId());
db.setName(newDbName);
cluster.addDb(newDbName, db.getId());
fullNameToDb.remove(dbName);
fullNameToDb.put(newDbName, db);

Expand Down Expand Up @@ -3858,7 +3840,6 @@ private void unprotectCreateCluster(Cluster cluster) {
// This is only used for initDefaultCluster and in that case the backendIdList is empty.
// So ASSERT the cluster's backend list size.
Preconditions.checkState(cluster.isDefaultCluster(), "Cluster must be default cluster");
Preconditions.checkState(cluster.isEmpty(), "Cluster backendIdList must be 0");

defaultCluster = cluster;

Expand All @@ -3884,12 +3865,6 @@ public long loadCluster(DataInputStream dis, long checksum) throws IOException {
checksum ^= cluster.getId();

Preconditions.checkState(cluster.isDefaultCluster(), "Cluster must be default_cluster");
List<Long> latestBackendIds = stateMgr.getClusterInfo().getBackendIds();

// The number of BE in cluster is not same as in SystemInfoService, when perform 'ALTER
// SYSTEM ADD BACKEND TO ...' or 'ALTER SYSTEM ADD BACKEND ...', because both of them are
// for adding BE to some Cluster, but loadCluster is after loadBackend.
cluster.setBackendIdList(latestBackendIds);

String dbName = InfoSchemaDb.DATABASE_NAME;
InfoSchemaDb db;
Expand All @@ -3908,7 +3883,6 @@ public long loadCluster(DataInputStream dis, long checksum) throws IOException {
Preconditions.checkState(db.getId() < NEXT_ID_INIT_VALUE, errMsg);
idToDb.put(db.getId(), db);
fullNameToDb.put(db.getFullName(), db);
cluster.addDb(dbName, db.getId());

if (getFullNameToDb().containsKey(StarRocksDb.DATABASE_NAME)) {
LOG.warn("Since the the database of mysql already exists, " +
Expand All @@ -3918,7 +3892,6 @@ public long loadCluster(DataInputStream dis, long checksum) throws IOException {
Preconditions.checkState(starRocksDb.getId() < NEXT_ID_INIT_VALUE, errMsg);
idToDb.put(starRocksDb.getId(), starRocksDb);
fullNameToDb.put(starRocksDb.getFullName(), starRocksDb);
cluster.addDb(StarRocksDb.DATABASE_NAME, starRocksDb.getId());
}
defaultCluster = cluster;
}
Expand Down Expand Up @@ -3951,11 +3924,7 @@ public void initDefaultCluster() {

// we create default_cluster to meet the need for ease of use, because
// most users hava no multi tenant needs.
cluster.setBackendIdList(backendList);
unprotectCreateCluster(cluster);
for (Database db : idToDb.values()) {
cluster.addDb(db.getFullName(), db.getId());
}

// no matter default_cluster is created or not,
// mark isDefaultClusterCreated as true
Expand All @@ -3980,8 +3949,6 @@ public long saveCluster(DataOutputStream dos, long checksum) throws IOException
public void replayUpdateClusterAndBackends(BackendIdsUpdateInfo info) {
for (long id : info.getBackendList()) {
final Backend backend = stateMgr.getClusterInfo().getBackend(id);
final Cluster cluster = defaultCluster;
cluster.removeBackend(id);
backend.setDecommissioned(false);
backend.clearClusterName();
backend.setBackendState(Backend.BackendState.free);
Expand Down
Loading