From 0f7b73509cc75e7a1277d006ff628a95ea4c01e1 Mon Sep 17 00:00:00 2001 From: Jack Drogon Date: Fri, 2 Feb 2024 21:37:44 +0800 Subject: [PATCH] [ehmancement](binlog) Add show proc '/binlog' impl (#30770) Signed-off-by: Jack Drogon --- .../doris/binlog/BinlogConfigCache.java | 4 +- .../apache/doris/binlog/BinlogManager.java | 24 ++++++ .../org/apache/doris/binlog/BinlogUtils.java | 4 + .../org/apache/doris/binlog/DBBinlog.java | 64 +++++++++++++++ .../org/apache/doris/binlog/TableBinlog.java | 82 +++++++++++++++++++ .../doris/common/proc/BinlogProcDir.java | 45 ++++++++++ .../apache/doris/common/proc/ProcService.java | 1 + 7 files changed, 223 insertions(+), 1 deletion(-) create mode 100644 fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java index c414b853078012..30641bae8c6f27 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogConfigCache.java @@ -97,7 +97,7 @@ public BinlogConfig getTableBinlogConfig(long dbId, long tableId) { return null; } - Table table = db.getTableOrMetaException(tableId); + Table table = db.getTableNullable(tableId); if (table == null) { LOG.warn("fail to get table. db: {}, table id: {}", db.getFullName(), tableId); return null; @@ -109,6 +109,8 @@ public BinlogConfig getTableBinlogConfig(long dbId, long tableId) { OlapTable olapTable = (OlapTable) table; tableBinlogConfig = olapTable.getBinlogConfig(); + // get table binlog config, when table modify binlogConfig + // it create a new binlog, not update inplace, so we don't need to clone binlogConfig dbTableBinlogEnableMap.put(tableId, tableBinlogConfig); return tableBinlogConfig; } catch (Exception e) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java index 8c68f908ce59be..8187e966561dcc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogManager.java @@ -22,6 +22,8 @@ import org.apache.doris.catalog.Env; import org.apache.doris.common.Config; import org.apache.doris.common.Pair; +import org.apache.doris.common.proc.BaseProcResult; +import org.apache.doris.common.proc.ProcResult; import org.apache.doris.persist.AlterDatabasePropertyInfo; import org.apache.doris.persist.BarrierLog; import org.apache.doris.persist.BatchModifyPartitionsInfo; @@ -36,6 +38,7 @@ import org.apache.doris.thrift.TStatus; import org.apache.doris.thrift.TStatusCode; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.logging.log4j.LogManager; @@ -54,6 +57,11 @@ public class BinlogManager { private static final int BUFFER_SIZE = 16 * 1024; + private static final ImmutableList TITLE_NAMES = new ImmutableList.Builder().add("Name") + .add("Type").add("Id").add("Dropped").add("BinlogLength").add("FirstBinlogCommittedTime") + .add("ReadableFirstBinlogCommittedTime").add("LastBinlogCommittedTime") + .add("ReadableLastBinlogCommittedTime").add("BinlogTtlSeconds") + .build(); private static final Logger LOG = LogManager.getLogger(BinlogManager.class); @@ -545,6 +553,22 @@ public long read(DataInputStream dis, long checksum) throws IOException { return checksum; } + public ProcResult getBinlogInfo() { + BaseProcResult result = new BaseProcResult(); + result.setNames(TITLE_NAMES); + + lock.readLock().lock(); + try { + for (DBBinlog dbBinlog : dbBinlogMap.values()) { + dbBinlog.getBinlogInfo(result); + } + } finally { + lock.readLock().unlock(); + } + + return result; + } + // remove DB // remove Table } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java index 4e134104b6d49c..0f6c2308248931 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/BinlogUtils.java @@ -90,4 +90,8 @@ public static long getExpiredMs(long ttlSeconds) { long expireSeconds = currentSeconds - ttlSeconds; return expireSeconds * 1000; } + + public static String convertTimeToReadable(long time) { + return new java.text.SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new java.util.Date(time)); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java index 4ba1416cd5cbb8..a3133bfb5c7828 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/DBBinlog.java @@ -18,7 +18,10 @@ package org.apache.doris.binlog; import org.apache.doris.catalog.BinlogConfig; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; import org.apache.doris.common.Pair; +import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; @@ -30,6 +33,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; import java.util.List; @@ -448,4 +452,64 @@ public void removeTable(long tableId) { lock.writeLock().unlock(); } } + + public void getBinlogInfo(BaseProcResult result) { + BinlogConfig binlogConfig = binlogConfigCache.getDBBinlogConfig(dbId); + + String dbName = "(dropped)"; + String dropped = "true"; + Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId); + if (db != null) { + dbName = db.getFullName(); + dropped = "false"; + } + + lock.readLock().lock(); + try { + boolean dbBinlogEnable = binlogConfigCache.isEnableDB(dbId); + if (dbBinlogEnable) { + List info = new ArrayList<>(); + + info.add(dbName); + String type = "db"; + info.add(type); + String id = String.valueOf(dbId); + info.add(id); + info.add(dropped); + String binlogLength = String.valueOf(allBinlogs.size()); + info.add(binlogLength); + String firstBinlogCommittedTime = null; + String readableFirstBinlogCommittedTime = null; + if (!timestamps.isEmpty()) { + long timestamp = timestamps.get(0).second; + firstBinlogCommittedTime = String.valueOf(timestamp); + readableFirstBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp); + } + info.add(firstBinlogCommittedTime); + info.add(readableFirstBinlogCommittedTime); + String lastBinlogCommittedTime = null; + String readableLastBinlogCommittedTime = null; + if (!timestamps.isEmpty()) { + long timestamp = timestamps.get(timestamps.size() - 1).second; + lastBinlogCommittedTime = String.valueOf(timestamp); + readableLastBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp); + } + info.add(lastBinlogCommittedTime); + info.add(readableLastBinlogCommittedTime); + String binlogTtlSeconds = null; + if (binlogConfig != null) { + binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds()); + } + info.add(binlogTtlSeconds); + + result.addRow(info); + } else { + for (TableBinlog tableBinlog : tableBinlogMap.values()) { + tableBinlog.getBinlogInfo(db, result); + } + } + } finally { + lock.readLock().unlock(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java index 0857ae7abb1922..3dd290a07f81ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/binlog/TableBinlog.java @@ -18,7 +18,11 @@ package org.apache.doris.binlog; import org.apache.doris.catalog.BinlogConfig; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; import org.apache.doris.common.Pair; +import org.apache.doris.common.proc.BaseProcResult; import org.apache.doris.thrift.TBinlog; import org.apache.doris.thrift.TBinlogType; import org.apache.doris.thrift.TStatus; @@ -27,8 +31,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.ArrayList; import java.util.Comparator; import java.util.Iterator; +import java.util.List; import java.util.TreeSet; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -233,4 +239,80 @@ public void replayGc(long largestExpiredCommitSeq) { lock.writeLock().unlock(); } } + + public void getBinlogInfo(Database db, BaseProcResult result) { + BinlogConfig binlogConfig = binlogConfigCache.getTableBinlogConfig(dbId, tableId); + + String tableName = null; + String dropped = null; + if (db == null) { + tableName = "(dropped).(unknown)"; + dropped = "true"; + } else { + String dbName = db.getFullName(); + Table table = db.getTableNullable(tableId); + if (table == null) { + dropped = "true"; + tableName = dbName + ".(dropped)"; + } + + dropped = "false"; + if (table instanceof OlapTable) { + OlapTable olapTable = (OlapTable) table; + tableName = dbName + "." + olapTable.getName(); + } else { + tableName = dbName + ".(not_olaptable)"; + } + } + + lock.readLock().lock(); + try { + List info = new ArrayList<>(); + + info.add(tableName); + String type = "table"; + info.add(type); + + String id = String.valueOf(tableId); + info.add(id); + info.add(dropped); + String binlogLength = String.valueOf(binlogs.size()); + info.add(binlogLength); + String firstBinlogCommittedTime = null; + String readableFirstBinlogCommittedTime = null; + for (TBinlog binlog : binlogs) { + long timestamp = binlog.getTimestamp(); + if (timestamp != -1) { + firstBinlogCommittedTime = String.valueOf(timestamp); + readableFirstBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp); + break; + } + } + info.add(firstBinlogCommittedTime); + info.add(readableFirstBinlogCommittedTime); + String lastBinlogCommittedTime = null; + String readableLastBinlogCommittedTime = null; + Iterator iterator = binlogs.descendingIterator(); + while (iterator.hasNext()) { + TBinlog binlog = iterator.next(); + long timestamp = binlog.getTimestamp(); + if (timestamp != -1) { + lastBinlogCommittedTime = String.valueOf(timestamp); + readableLastBinlogCommittedTime = BinlogUtils.convertTimeToReadable(timestamp); + break; + } + } + info.add(lastBinlogCommittedTime); + info.add(readableLastBinlogCommittedTime); + String binlogTtlSeconds = null; + if (binlogConfig != null) { + binlogTtlSeconds = String.valueOf(binlogConfig.getTtlSeconds()); + } + info.add(binlogTtlSeconds); + + result.addRow(info); + } finally { + lock.readLock().unlock(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java new file mode 100644 index 00000000000000..f72ed30d2b640f --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/BinlogProcDir.java @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.proc; + +import org.apache.doris.binlog.BinlogManager; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; + + +public class BinlogProcDir implements ProcDirInterface { + @Override + public boolean register(String name, ProcNodeInterface node) { + return false; + } + + @Override + public ProcNodeInterface lookup(String name) throws AnalysisException { + throw new AnalysisException("not implemented"); + } + + @Override + public ProcResult fetchResult() throws AnalysisException { + BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager(); + if (binlogManager == null) { + throw new AnalysisException("binlog manager is not initialized"); + } + + return binlogManager.getBinlogInfo(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java index e54ee4d5d11440..42010ccbd204ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/proc/ProcService.java @@ -58,6 +58,7 @@ private ProcService() { root.register("colocation_group", new ColocationGroupProcDir()); root.register("bdbje", new BDBJEProcDir()); root.register("diagnose", new DiagnoseProcDir()); + root.register("binlog", new BinlogProcDir()); } // 通过指定的路径获得对应的PROC Node