Skip to content

Commit

Permalink
[ehmancement](binlog) Add show proc '/binlog' impl (#30770)
Browse files Browse the repository at this point in the history
Signed-off-by: Jack Drogon <jack.xsuperman@gmail.com>
  • Loading branch information
JackDrogon authored Feb 2, 2024
1 parent dd02ea0 commit 0f7b735
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public BinlogConfig getTableBinlogConfig(long dbId, long tableId) {
return null;
}

Table table = db.getTableOrMetaException(tableId);
Table table = db.getTableNullable(tableId);
if (table == null) {
LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId);
return null;
Expand All @@ -109,6 +109,8 @@ public BinlogConfig getTableBinlogConfig(long dbId, long tableId) {

OlapTable olapTable = (OlapTable) table;
tableBinlogConfig = olapTable.getBinlogConfig();
// get table binlog config, when table modify binlogConfig
// it create a new binlog, not update inplace, so we don't need to clone binlogConfig
dbTableBinlogEnableMap.put(tableId, tableBinlogConfig);
return tableBinlogConfig;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.common.proc.ProcResult;
import org.apache.doris.persist.AlterDatabasePropertyInfo;
import org.apache.doris.persist.BarrierLog;
import org.apache.doris.persist.BatchModifyPartitionsInfo;
Expand All @@ -36,6 +38,7 @@
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
Expand All @@ -54,6 +57,11 @@

public class BinlogManager {
private static final int BUFFER_SIZE = 16 * 1024;
private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>().add("Name")
.add("Type").add("Id").add("Dropped").add("BinlogLength").add("FirstBinlogCommittedTime")
.add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime")
.add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds")
.build();

private static final Logger LOG = LogManager.getLogger(BinlogManager.class);

Expand Down Expand Up @@ -545,6 +553,22 @@ public long read(DataInputStream dis, long checksum) throws IOException {
return checksum;
}

public ProcResult getBinlogInfo() {
BaseProcResult result = new BaseProcResult();
result.setNames(TITLE_NAMES);

lock.readLock().lock();
try {
for (DBBinlog dbBinlog : dbBinlogMap.values()) {
dbBinlog.getBinlogInfo(result);
}
} finally {
lock.readLock().unlock();
}

return result;
}

// remove DB
// remove Table
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,8 @@ public static long getExpiredMs(long ttlSeconds) {
long expireSeconds = currentSeconds - ttlSeconds;
return expireSeconds * 1000;
}

public static String convertTimeToReadable(long time) {
return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(time));
}
}
64 changes: 64 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@
package org.apache.doris.binlog;

import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
Expand All @@ -30,6 +33,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -448,4 +452,64 @@ public void removeTable(long tableId) {
lock.writeLock().unlock();
}
}

public void getBinlogInfo(BaseProcResult result) {
BinlogConfig binlogConfig = binlogConfigCache.getDBBinlogConfig(dbId);

String dbName = "(dropped)";
String dropped = "true";
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db != null) {
dbName = db.getFullName();
dropped = "false";
}

lock.readLock().lock();
try {
boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId);
if (dbBinlogEnable) {
List<String> info = new ArrayList<>();

info.add(dbName);
String type = "db";
info.add(type);
String id = String.valueOf(dbId);
info.add(id);
info.add(dropped);
String binlogLength = String.valueOf(allBinlogs.size());
info.add(binlogLength);
String firstBinlogCommittedTime = null;
String readableFirstBinlogCommittedTime = null;
if (!timestamps.isEmpty()) {
long timestamp = timestamps.get(0).second;
firstBinlogCommittedTime = String.valueOf(timestamp);
readableFirstBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp);
}
info.add(firstBinlogCommittedTime);
info.add(readableFirstBinlogCommittedTime);
String lastBinlogCommittedTime = null;
String readableLastBinlogCommittedTime = null;
if (!timestamps.isEmpty()) {
long timestamp = timestamps.get(timestamps.size() - 1).second;
lastBinlogCommittedTime = String.valueOf(timestamp);
readableLastBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp);
}
info.add(lastBinlogCommittedTime);
info.add(readableLastBinlogCommittedTime);
String binlogTtlSeconds = null;
if (binlogConfig != null) {
binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds());
}
info.add(binlogTtlSeconds);

result.addRow(info);
} else {
for (TableBinlog tableBinlog : tableBinlogMap.values()) {
tableBinlog.getBinlogInfo(db, result);
}
}
} finally {
lock.readLock().unlock();
}
}
}
82 changes: 82 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
package org.apache.doris.binlog;

import org.apache.doris.catalog.BinlogConfig;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Pair;
import org.apache.doris.common.proc.BaseProcResult;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TBinlogType;
import org.apache.doris.thrift.TStatus;
Expand All @@ -27,8 +31,10 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -233,4 +239,80 @@ public void replayGc(long largestExpiredCommitSeq) {
lock.writeLock().unlock();
}
}

public void getBinlogInfo(Database db, BaseProcResult result) {
BinlogConfig binlogConfig = binlogConfigCache.getTableBinlogConfig(dbId, tableId);

String tableName = null;
String dropped = null;
if (db == null) {
tableName = "(dropped).(unknown)";
dropped = "true";
} else {
String dbName = db.getFullName();
Table table = db.getTableNullable(tableId);
if (table == null) {
dropped = "true";
tableName = dbName + ".(dropped)";
}

dropped = "false";
if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
tableName = dbName + "." + olapTable.getName();
} else {
tableName = dbName + ".(not_olaptable)";
}
}

lock.readLock().lock();
try {
List<String> info = new ArrayList<>();

info.add(tableName);
String type = "table";
info.add(type);

String id = String.valueOf(tableId);
info.add(id);
info.add(dropped);
String binlogLength = String.valueOf(binlogs.size());
info.add(binlogLength);
String firstBinlogCommittedTime = null;
String readableFirstBinlogCommittedTime = null;
for (TBinlog binlog : binlogs) {
long timestamp = binlog.getTimestamp();
if (timestamp != -1) {
firstBinlogCommittedTime = String.valueOf(timestamp);
readableFirstBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp);
break;
}
}
info.add(firstBinlogCommittedTime);
info.add(readableFirstBinlogCommittedTime);
String lastBinlogCommittedTime = null;
String readableLastBinlogCommittedTime = null;
Iterator<TBinlog> iterator = binlogs.descendingIterator();
while (iterator.hasNext()) {
TBinlog binlog = iterator.next();
long timestamp = binlog.getTimestamp();
if (timestamp != -1) {
lastBinlogCommittedTime = String.valueOf(timestamp);
readableLastBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp);
break;
}
}
info.add(lastBinlogCommittedTime);
info.add(readableLastBinlogCommittedTime);
String binlogTtlSeconds = null;
if (binlogConfig != null) {
binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds());
}
info.add(binlogTtlSeconds);

result.addRow(info);
} finally {
lock.readLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.proc;

import org.apache.doris.binlog.BinlogManager;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;


public class BinlogProcDir implements ProcDirInterface {
@Override
public boolean register(String name, ProcNodeInterface node) {
return false;
}

@Override
public ProcNodeInterface lookup(String name) throws AnalysisException {
throw new AnalysisException("not implemented");
}

@Override
public ProcResult fetchResult() throws AnalysisException {
BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager();
if (binlogManager == null) {
throw new AnalysisException("binlog manager is not initialized");
}

return binlogManager.getBinlogInfo();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ private ProcService() {
root.register("colocation_group", new ColocationGroupProcDir());
root.register("bdbje", new BDBJEProcDir());
root.register("diagnose", new DiagnoseProcDir());
root.register("binlog", new BinlogProcDir());
}

// 通过指定的路径获得对应的PROC Node
Expand Down

0 comments on commit 0f7b735

Please sign in to comment.