Skip to content

HBASE-23221 Polish the WAL interface after HBASE-23181 #774

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ public static boolean matchingRow(final Cell left, final Cell right) {

/**
* @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. Instead use
* {@link #matchingRows(Cell, byte[]))}
* {@link #matchingRows(Cell, byte[])}
*/
@Deprecated
public static boolean matchingRow(final Cell left, final byte[] buf) {
Expand Down Expand Up @@ -894,8 +894,15 @@ public static boolean matchingQualifier(final Cell left, final byte[] buf, final
}

public static boolean matchingColumn(final Cell left, final byte[] fam, final byte[] qual) {
if (!matchingFamily(left, fam)) return false;
return matchingQualifier(left, qual);
return matchingFamily(left, fam) && matchingQualifier(left, qual);
}

/**
* @return True if matching column family and the qualifier starts with <code>qual</code>
*/
public static boolean matchingColumnFamilyAndQualifierPrefix(final Cell left, final byte[] fam,
final byte[] qual) {
return matchingFamily(left, fam) && PrivateCellUtil.qualifierStartsWith(left, qual);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,10 @@ public void map(WALKey key, WALEdit value, Context context)
Delete del = null;
Cell lastCell = null;
for (Cell cell : value.getCells()) {
// filtering WAL meta entries
// Filtering WAL meta marker entries.
if (WALEdit.isMetaEditFamily(cell)) {
continue;
}

// Allow a subclass filter out this cell.
if (filter(context, cell)) {
// A WALEdit may contain multiple operations (HBASE-3584) and/or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3485,7 +3485,7 @@ public List<ReplicationPeerDescription> listReplicationPeers(String regex)
if (cpHost != null) {
cpHost.preListReplicationPeers(regex);
}
LOG.info(getClientIdAuditPrefix() + " list replication peers, regex=" + regex);
LOG.debug("{} list replication peers, regex={}", getClientIdAuditPrefix(), regex);
Pattern pattern = regex == null ? null : Pattern.compile(regex);
List<ReplicationPeerDescription> peers =
this.replicationPeerManager.listPeers(pattern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1725,7 +1725,7 @@ public Pair<byte[], Collection<HStoreFile>> call() throws IOException {

status.setStatus("Writing region close event to WAL");
// Always write close marker to wal even for read only table. This is not a big problem as we
// do not write any data into the region.
// do not write any data into the region; it is just a meta edit in the WAL file.
if (!abort && wal != null && getRegionServerServices() != null &&
RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
writeRegionCloseMarker(wal);
Expand Down Expand Up @@ -2691,7 +2691,8 @@ private void logFatLineOnFlush(Collection<HStore> storesToFlush, long sequenceId
}
}
MemStoreSize mss = this.memStoreSizing.getMemStoreSize();
LOG.info("Flushing " + storesToFlush.size() + "/" + stores.size() + " column families," +
LOG.info("Flushing " + this.getRegionInfo().getEncodedName() + " " +
storesToFlush.size() + "/" + stores.size() + " column families," +
" dataSize=" + StringUtils.byteDesc(mss.getDataSize()) +
" heapSize=" + StringUtils.byteDesc(mss.getHeapSize()) +
((perCfExtras != null && perCfExtras.length() > 0)? perCfExtras.toString(): "") +
Expand Down Expand Up @@ -4818,7 +4819,7 @@ private long replayRecoveredEdits(final Path edits,
for (Cell cell: val.getCells()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
if (WALEdit.isMetaEditFamily(cell)) {
// if region names don't match, skipp replaying compaction marker
if (!checkRowWithinBoundary) {
//this is a special edit, we should handle it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,7 @@ private void startServices() throws IOException {
healthCheckChore = new HealthCheckChore(sleepTime, this, getConfiguration());
}

this.walRoller = new LogRoller(this, this);
this.walRoller = new LogRoller(this);
this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
this.procedureResultReporter = new RemoteProcedureResultReporter(this);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/**
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
Expand Down Expand Up @@ -56,7 +55,6 @@
public class LogRoller extends HasThread implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(LogRoller.class);
private final ConcurrentMap<WAL, Boolean> walNeedsRoll = new ConcurrentHashMap<>();
private final Server server;
protected final RegionServerServices services;
private volatile long lastRollTime = System.currentTimeMillis();
// Period to roll log.
Expand Down Expand Up @@ -99,16 +97,14 @@ public void requestRollAll() {
}
}

/** @param server */
public LogRoller(final Server server, final RegionServerServices services) {
public LogRoller(RegionServerServices services) {
super("LogRoller");
this.server = server;
this.services = services;
this.rollPeriod = this.server.getConfiguration().
this.rollPeriod = this.services.getConfiguration().
getLong("hbase.regionserver.logroll.period", 3600000);
this.threadWakeFrequency = this.server.getConfiguration().
this.threadWakeFrequency = this.services.getConfiguration().
getInt(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
this.checkLowReplicationInterval = this.server.getConfiguration().getLong(
this.checkLowReplicationInterval = this.services.getConfiguration().getLong(
"hbase.regionserver.hlog.check.lowreplication.interval", 30 * 1000);
}

Expand Down Expand Up @@ -144,7 +140,7 @@ private void abort(String reason, Throwable cause) {
LOG.warn("Failed to shutdown wal", e);
}
}
server.abort(reason, cause);
this.services.abort(reason, cause);
}

@Override
Expand All @@ -156,7 +152,7 @@ public void run() {
periodic = (now - this.lastRollTime) > this.rollPeriod;
if (periodic) {
// Time for periodic roll, fall through
LOG.debug("Wal roll period {} ms elapsed", this.rollPeriod);
LOG.debug("WAL roll period {} ms elapsed", this.rollPeriod);
} else {
synchronized (this) {
if (walNeedsRoll.values().stream().anyMatch(Boolean::booleanValue)) {
Expand All @@ -183,9 +179,9 @@ public void run() {
WAL wal = entry.getKey();
// reset the flag in front to avoid missing roll request before we return from rollWriter.
walNeedsRoll.put(wal, Boolean.FALSE);
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an array of actual region names.
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
// Force the roll if the logroll.period is elapsed or if a roll was requested.
// The returned value is an array of actual region names.
byte[][] regionsToFlush = wal.rollWriter(periodic || entry.getValue().booleanValue());
if (regionsToFlush != null) {
for (byte[] r : regionsToFlush) {
scheduleFlush(Bytes.toString(r));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
private static final class WalProps {

/**
* Map the encoded region name to the highest sequence id. Contain all the regions it has
* entries of
* Map the encoded region name to the highest sequence id.
* <p/>Contains all the regions it has an entry for.
*/
public final Map<byte[], Long> encodedName2HighestSequenceId;

Expand Down Expand Up @@ -585,9 +585,9 @@ public int getNumLogFiles() {
}

/**
* If the number of un-archived WAL files is greater than maximum allowed, check the first
* (oldest) WAL file, and returns those regions which should be flushed so that it can be
* archived.
* If the number of un-archived WAL files ('live' WALs) is greater than maximum allowed,
* check the first (oldest) WAL, and return those regions which should be flushed so that
* it can be let-go/'archived'.
* @return regions (encodedRegionNames) to flush in order to archive oldest WAL file.
*/
byte[][] findRegionsToForceFlush() throws IOException {
Expand Down Expand Up @@ -860,10 +860,6 @@ public void close() throws IOException {
/**
* updates the sequence number of a specific store. depending on the flag: replaces current seq
* number if the given seq id is bigger, or even if it is lower than existing one
* @param encodedRegionName
* @param familyName
* @param sequenceid
* @param onlyIfGreater
*/
@Override
public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequenceid,
Expand Down Expand Up @@ -973,7 +969,7 @@ protected final void postSync(final long timeInNanos, final int handlerSyncs) {
}

protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKeyImpl key,
WALEdit edits, boolean inMemstore, boolean closeRegion, RingBuffer<RingBufferTruck> ringBuffer)
WALEdit edits, boolean inMemstore, RingBuffer<RingBufferTruck> ringBuffer)
throws IOException {
if (this.closed) {
throw new IOException(
Expand All @@ -987,7 +983,7 @@ protected final long stampSequenceIdAndPublishToRingBuffer(RegionInfo hri, WALKe
ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
.filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
try (TraceScope scope = TraceUtil.createTrace(implClassName + ".append")) {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, closeRegion, rpcCall);
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
ringBuffer.get(txid).load(entry);
} finally {
Expand Down Expand Up @@ -1025,13 +1021,13 @@ public OptionalLong getLogFileSizeIfBeingWritten(Path path) {

@Override
public long appendData(RegionInfo info, WALKeyImpl key, WALEdit edits) throws IOException {
return append(info, key, edits, true, false);
return append(info, key, edits, true);
}

@Override
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean closeRegion)
public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits)
throws IOException {
return append(info, key, edits, false, closeRegion);
return append(info, key, edits, false);
}

/**
Expand All @@ -1055,17 +1051,17 @@ public long appendMarker(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean
* @param key Modified by this call; we add to it this edits region edit/sequence id.
* @param edits Edits to append. MAY CONTAIN NO EDITS for case where we want to get an edit
* sequence id that is after all currently appended edits.
* @param inMemstore Always true except for case where we are writing a region event marker, for
* example, a compaction completion record into the WAL; in this case the entry is just
* so we can finish an unfinished compaction -- it is not an edit for memstore.
* @param closeRegion Whether this is a region close marker, i.e, the last wal edit for this
* region on this region server. The WAL implementation should remove all the related
* stuff, for example, the sequence id accounting.
* @param inMemstore Always true except for case where we are writing a region event meta
* marker edit, for example, a compaction completion record into the WAL or noting a
* Region Open event. In these cases the entry is just so we can finish an unfinished
* compaction after a crash when the new Server reads the WAL on recovery, etc. These
* transition event 'Markers' do not go via the memstore. When memstore is false,
* we presume a Marker event edit.
* @return Returns a 'transaction id' and <code>key</code> will have the region edit/sequence id
* in it.
*/
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore,
boolean closeRegion) throws IOException;
protected abstract long append(RegionInfo info, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException;

protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,9 +558,9 @@ private boolean shouldScheduleConsumer() {
}

@Override
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore,
boolean closeRegion) throws IOException {
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
protected long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException {
long txid = stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
waitingConsumePayloads);
if (shouldScheduleConsumer()) {
consumeExecutor.execute(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ protected void doShutdown() throws IOException {

@Override
protected long append(final RegionInfo hri, final WALKeyImpl key, final WALEdit edits,
final boolean inMemstore, boolean closeRegion) throws IOException {
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, closeRegion,
final boolean inMemstore) throws IOException {
return stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore,
disruptor.getRingBuffer());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
/**
* A WAL Entry for {@link AbstractFSWAL} implementation. Immutable.
* A subclass of {@link Entry} that carries extra info across the ring buffer such as
* region sequence id (we want to use this later, just before we write the WAL to ensure region
* region sequenceid (we want to use this later, just before we write the WAL to ensure region
* edits maintain order). The extra info added here is not 'serialized' as part of the WALEdit
* hence marked 'transient' to underline this fact. It also adds mechanism so we can wait on
* the assign of the region sequence id. See #stampRegionSequenceId().
Expand All @@ -50,25 +50,40 @@ class FSWALEntry extends Entry {
// The below data members are denoted 'transient' just to highlight these are not persisted;
// they are only in memory and held here while passing over the ring buffer.
private final transient long txid;

/**
* If false, means this is a meta edit written by the hbase system itself. It was not in
* memstore. HBase uses these edit types to note in the log operational transitions such
* as compactions, flushes, or region open/closes.
*/
private final transient boolean inMemstore;

/**
* Set if this is a meta edit and it is of close region type.
*/
private final transient boolean closeRegion;

private final transient RegionInfo regionInfo;
private final transient Set<byte[]> familyNames;
private final transient ServerCall<?> rpcCall;

/**
* @param inMemstore If true, then this is a data edit, one that came from client. If false, it
* is a meta edit made by the hbase system itself and is for the WAL only.
*/
FSWALEntry(final long txid, final WALKeyImpl key, final WALEdit edit, final RegionInfo regionInfo,
final boolean inMemstore, boolean closeRegion, ServerCall<?> rpcCall) {
final boolean inMemstore, ServerCall<?> rpcCall) {
super(key, edit);
this.inMemstore = inMemstore;
this.closeRegion = closeRegion;
this.closeRegion = !inMemstore && edit.isRegionCloseMarker();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, so we added a flag to WALEdit, then let's also purge the inMemstore flag here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, inMemstore is false for all meta/market types. It is consulted when we go to do update on SequenceIdAccounting. If it is false, we don't bother updating 'lowest' sequenceid.

Unless I'm missing something, we need it still.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean we do not need to pass inMemstore flag in WAL.append, also add an isInMemstore method in WALEdit class, just like what we have done for isRegionCloseMarker here.

this.regionInfo = regionInfo;
this.txid = txid;
if (inMemstore) {
// construct familyNames here to reduce the work of log sinker.
Set<byte[]> families = edit.getFamilies();
this.familyNames = families != null ? families : collectFamilies(edit.getCells());
} else {
this.familyNames = Collections.<byte[]> emptySet();
this.familyNames = Collections.emptySet();
}
this.rpcCall = rpcCall;
if (rpcCall != null) {
Expand All @@ -83,7 +98,7 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
} else {
Set<byte[]> set = new TreeSet<>(Bytes.BYTES_COMPARATOR);
for (Cell cell: cells) {
if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
if (!WALEdit.isMetaEditFamily(cell)) {
set.add(CellUtil.cloneFamily(cell));
}
}
Expand All @@ -94,7 +109,7 @@ static Set<byte[]> collectFamilies(List<Cell> cells) {
@Override
public String toString() {
return "sequence=" + this.txid + ", " + super.toString();
};
}

boolean isInMemStore() {
return this.inMemstore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,11 @@
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
* <p>
* Accounting of sequence ids per region and then by column family. So we can our accounting
* Accounting of sequence ids per region and then by column family. So we can keep our accounting
* current, call startCacheFlush and then finishedCacheFlush or abortCacheFlush so this instance can
* keep abreast of the state of sequence id persistence. Also call update per append.
* </p>
* <p>
* For the implementation, we assume that all the {@code encodedRegionName} passed in is gotten by
* For the implementation, we assume that all the {@code encodedRegionName} passed in are gotten by
* {@link org.apache.hadoop.hbase.client.RegionInfo#getEncodedNameAsBytes()}. So it is safe to use
* it as a hash key. And for family name, we use {@link ImmutableByteArray} as key. This is because
* hash based map is much faster than RBTree or CSLM and here we are on the critical write path. See
Expand All @@ -53,8 +51,8 @@
*/
@InterfaceAudience.Private
class SequenceIdAccounting {

private static final Logger LOG = LoggerFactory.getLogger(SequenceIdAccounting.class);

/**
* This lock ties all operations on {@link SequenceIdAccounting#flushingSequenceIds} and
* {@link #lowestUnflushedSequenceIds} Maps. {@link #lowestUnflushedSequenceIds} has the
Expand Down Expand Up @@ -110,7 +108,6 @@ class SequenceIdAccounting {

/**
* Returns the lowest unflushed sequence id for the region.
* @param encodedRegionName
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionName</code>. Will
* return {@link HConstants#NO_SEQNUM} when none.
*/
Expand All @@ -125,8 +122,6 @@ long getLowestSequenceId(final byte[] encodedRegionName) {
}

/**
* @param encodedRegionName
* @param familyName
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionname</code> and
* <code>familyName</code>. Returned sequenceid may be for an edit currently being
* flushed.
Expand Down
Loading