Skip to content

HDFS-16690. Automatically format unformatted JNs with JournalNodeSyncer #6925

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1471,6 +1471,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY =
"dfs.journalnode.sync.interval";
public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L;
public static final String DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY =
"dfs.journalnode.enable.sync.format";
public static final boolean DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT = false;
public static final String DFS_JOURNALNODE_EDIT_CACHE_SIZE_KEY =
"dfs.journalnode.edit-cache-size.bytes";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.security.KerberosInfo;
Expand Down Expand Up @@ -51,4 +52,13 @@ GetEditLogManifestResponseProto getEditLogManifestFromJournal(
String jid, String nameServiceId, long sinceTxId, boolean inProgressOk)
throws IOException;

/**
* Get the storage info for the specified journal.
* @param jid the journal identifier
* @param nameServiceId the name service id
* @return the storage info object
*/
StorageInfoProto getStorageInfo(String jid, String nameServiceId)
throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocol;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos.GetStorageInfoRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;

Expand Down Expand Up @@ -60,4 +62,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
throw new ServiceException(e);
}
}

@Override
public StorageInfoProto getStorageInfo(
RpcController controller, GetStorageInfoRequestProto request)
throws ServiceException {
try {
return impl.getStorageInfo(
request.getJid().getIdentifier(),
request.hasNameServiceId() ? request.getNameServiceId() : null
);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.hadoop.hdfs.qjournal.protocolPB;

import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.qjournal.protocol.InterQJournalProtocolProtos;
import org.apache.hadoop.thirdparty.protobuf.RpcController;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
Expand Down Expand Up @@ -75,6 +77,18 @@ public GetEditLogManifestResponseProto getEditLogManifestFromJournal(
req.build()));
}

@Override
public StorageInfoProto getStorageInfo(String jid, String nameServiceId)
throws IOException {
InterQJournalProtocolProtos.GetStorageInfoRequestProto.Builder req =
InterQJournalProtocolProtos.GetStorageInfoRequestProto.newBuilder()
.setJid(convertJournalId(jid));
if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
return ipc(() -> rpcProxy.getStorageInfo(NULL_CONTROLLER, req.build()));
}

private QJournalProtocolProtos.JournalIdProto convertJournalId(String jid) {
return QJournalProtocolProtos.JournalIdProto.newBuilder()
.setIdentifier(jid)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.StorageInfoProto;
import org.apache.hadoop.thirdparty.protobuf.BlockingService;
import org.slf4j.Logger;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -71,14 +72,14 @@ public class JournalNodeRpcServer implements QJournalProtocol,

JournalNodeRpcServer(Configuration conf, JournalNode jn) throws IOException {
this.jn = jn;

Configuration confCopy = new Configuration(conf);

// Ensure that nagling doesn't kick in, which could cause latency issues.
confCopy.setBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,
true);

InetSocketAddress addr = getAddress(confCopy);
String bindHost = conf.getTrimmed(DFS_JOURNALNODE_RPC_BIND_HOST_KEY, null);
if (bindHost == null) {
Expand All @@ -104,7 +105,7 @@ public class JournalNodeRpcServer implements QJournalProtocol,
this.handlerCount = confHandlerCount;
LOG.info("The number of JournalNodeRpcServer handlers is {}.",
this.handlerCount);

this.server = new RPC.Builder(confCopy)
.setProtocol(QJournalProtocolPB.class)
.setInstance(service)
Expand Down Expand Up @@ -149,15 +150,15 @@ void start() {
public InetSocketAddress getAddress() {
return server.getListenerAddress();
}

void join() throws InterruptedException {
this.server.join();
}

void stop() {
this.server.stop();
}

static InetSocketAddress getAddress(Configuration conf) {
String addr = conf.get(
DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
Expand Down Expand Up @@ -211,7 +212,7 @@ public void journal(RequestInfo reqInfo,
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
}

@Override
public void heartbeat(RequestInfo reqInfo) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
Expand Down Expand Up @@ -245,17 +246,24 @@ public GetEditLogManifestResponseProto getEditLogManifest(
String jid, String nameServiceId,
long sinceTxId, boolean inProgressOk)
throws IOException {

RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId)
.getEditLogManifest(sinceTxId, inProgressOk);

return GetEditLogManifestResponseProto.newBuilder()
.setManifest(PBHelper.convert(manifest))
.setHttpPort(jn.getBoundHttpAddress().getPort())
.setFromURL(jn.getHttpServerURI())
.build();
}

@Override
public StorageInfoProto getStorageInfo(String jid,
String nameServiceId) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nameServiceId here is not used anymore, so why we should define it here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. This was a miss. getOrCreateJournal should be called with jid and nameServiceId. I've pushed an update. Thanks!

StorageInfo storage = jn.getOrCreateJournal(jid, nameServiceId).getStorage();
return PBHelper.convert(storage);
}

@Override
public GetJournaledEditsResponseProto getJournaledEdits(String jid,
String nameServiceId, long sinceTxId, int maxTxns) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
package org.apache.hadoop.hdfs.qjournal.server;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -79,6 +82,7 @@ public class JournalNodeSyncer {
private int numOtherJNs;
private int journalNodeIndexForSync = 0;
private final long journalSyncInterval;
private final boolean tryFormatting;
private final int logSegmentTransferTimeout;
private final DataTransferThrottler throttler;
private final JournalMetrics metrics;
Expand All @@ -98,6 +102,9 @@ public class JournalNodeSyncer {
logSegmentTransferTimeout = conf.getInt(
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY,
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
tryFormatting = conf.getBoolean(
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_KEY,
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_FORMAT_DEFAULT);
throttler = getThrottler(conf);
metrics = journal.getMetrics();
journalSyncerStarted = false;
Expand Down Expand Up @@ -171,6 +178,8 @@ private void startSyncJournalsDaemon() {
// Wait for journal to be formatted to create edits.sync directory
while(!journal.isFormatted()) {
try {
// Format the journal with namespace info from the other JNs if it is not formatted
formatWithSyncer();
Thread.sleep(journalSyncInterval);
} catch (InterruptedException e) {
LOG.error("JournalNodeSyncer daemon received Runtime exception.", e);
Expand All @@ -187,7 +196,15 @@ private void startSyncJournalsDaemon() {
while(shouldSync) {
try {
if (!journal.isFormatted()) {
LOG.warn("Journal cannot sync. Not formatted.");
LOG.warn("Journal cannot sync. Not formatted. Trying to format with the syncer");
formatWithSyncer();
if (journal.isFormatted() && !createEditsSyncDir()) {
LOG.error("Failed to create directory for downloading log " +
"segments: {}. Stopping Journal Node Sync.",
journal.getStorage().getEditsSyncDir());
return;
}
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it will drop into endless loop when tryFormatting set to false by default here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The endless loop already exists. If the JN is unformatted, it will throw an exception in a loop every time the syncer tries to sync from other JNs. tryFormatting doesn't add anything extra to the loop. But if it is set, the formatting could happen.

} else {
syncJournals();
}
Expand Down Expand Up @@ -233,6 +250,68 @@ private void syncJournals() {
journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs;
}

private void formatWithSyncer() {
if (!tryFormatting) {
return;
}
LOG.info("Trying to format the journal with the syncer");
try {
StorageInfo storage = null;
for (JournalNodeProxy jnProxy : otherJNProxies) {
if (!hasEditLogs(jnProxy)) {
// This avoids a race condition between `hdfs namenode -format` and
// JN syncer by checking if the other JN is not newly formatted.
continue;
}
try {
HdfsServerProtos.StorageInfoProto storageInfoResponse =
jnProxy.jnProxy.getStorageInfo(jid, nameServiceId);
storage = PBHelper.convert(
storageInfoResponse, HdfsServerConstants.NodeType.JOURNAL_NODE
);
if (storage.getNamespaceID() == 0) {
LOG.error("Got invalid StorageInfo from " + jnProxy);
storage = null;
continue;
}
LOG.info("Got StorageInfo " + storage + " from " + jnProxy);
break;
} catch (IOException e) {
LOG.error("Could not get StorageInfo from " + jnProxy, e);
}
}
if (storage == null) {
LOG.error("Could not get StorageInfo from any JournalNode. " +
"JournalNodeSyncer cannot format the journal.");
return;
}
NamespaceInfo nsInfo = new NamespaceInfo(storage);
journal.format(nsInfo, true);
} catch (IOException e) {
LOG.error("Exception in formatting the journal with the syncer", e);
}
}

private boolean hasEditLogs(JournalNodeProxy journalProxy) {
GetEditLogManifestResponseProto editLogManifest;
try {
editLogManifest = journalProxy.jnProxy.getEditLogManifestFromJournal(
jid, nameServiceId, 0, false);
} catch (IOException e) {
LOG.error("Could not get edit log manifest from " + journalProxy, e);
return false;
}

List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert(
editLogManifest.getManifest()).getLogs();
if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) {
LOG.warn("Journal at " + journalProxy.jnAddr + " has no edit logs");
return false;
}

return true;
}

private void syncWithJournalAtIndex(int index) {
LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":"
+ jn.getBoundIpcAddress().getPort() + " with "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,15 @@ package hadoop.hdfs.qjournal;
import "HdfsServer.proto";
import "QJournalProtocol.proto";

message GetStorageInfoRequestProto {
required JournalIdProto jid = 1;
optional string nameServiceId = 2;
}

service InterQJournalProtocolService {
rpc getEditLogManifestFromJournal(GetEditLogManifestRequestProto)
returns (GetEditLogManifestResponseProto);

rpc getStorageInfo(GetStorageInfoRequestProto)
returns (StorageInfoProto);
}
Original file line number Diff line number Diff line change
Expand Up @@ -5062,6 +5062,16 @@
</description>
</property>

<property>
<name>dfs.journalnode.enable.sync.format</name>
<value>false</value>
<description>
If true, the journal node syncer daemon that tries to sync edit
logs between journal nodes will try to format its journal if it is not.
It will query the other journal nodes for the storage info required to format.
</description>
</property>

<property>
<name>dfs.journalnode.edit-cache-size.bytes</name>
<value></value>
Expand Down
Loading
Loading