Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
- Addressing typo in documentation.
- Minor tweaks to admin guide.
- Adding support to stand up a ZooKeeperServer when a quorum peer is not distributed (ie supporting both embedded standalone and cluster).

Signed-off-by: Aldrin Piri <aldrin@apache.org>
  • Loading branch information
mcgilman authored and apiri committed Feb 1, 2016
1 parent f471682 commit 72c8467
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 4 deletions.
4 changes: 3 additions & 1 deletion nifi-docs/src/main/asciidoc/administration-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ However, it is up to the administrator to determine the number of nodes most app
If the `nifi.state.management.embedded.zookeeper.start` property is set to `true`, the `nifi.state.management.embedded.zookeeper.properties` property
in _nifi.properties_ also becomes relevant. This specifies the ZooKeeper properties file to use. At a minimum, this properties file needs to be populated
with the list of ZooKeeper servers. Each of these servers is configured as <hostname>:<client port>[:<leader election port>]. For example, `myhost:2888:3888`.
with the list of ZooKeeper servers. Each of these servers is configured as <hostname>:<quorum port>[:<leader election port>]. For example, `myhost:2888:3888`.
This list of nodes should be the same nodes in the NiFi cluster that have the `nifi.state.management.embedded.zookeeper.start`
property set to `true`. Also note that because ZooKeeper will be listening on these ports, the firewall may need to be configured to open these ports
for incoming traffic, at least between nodes in the cluster. Additionally, the port to listen on for client connections must be opened in the firewall.
Expand All @@ -460,6 +460,8 @@ mkdir state/zookeeper
echo 1 > state/zookeeper/myid
For the next NiFi Node that will run ZooKeeper, we can accomplish this by performing the following commands:
[source]
cd $NIFI_HOME
mkdir state
mkdir state/zookeeper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@

import org.apache.nifi.util.NiFiProperties;
import org.apache.zookeeper.server.ServerCnxnFactory;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZKDatabase;
import org.apache.zookeeper.server.ZooKeeperServer;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
import org.apache.zookeeper.server.quorum.QuorumPeer;
Expand All @@ -43,6 +45,7 @@ public class ZooKeeperStateServer extends ZooKeeperServerMain {

private ServerCnxnFactory connectionFactory;
private FileTxnSnapLog transactionLog;
private ZooKeeperServer embeddedZkServer;
private QuorumPeer quorumPeer;

private ZooKeeperStateServer(final Properties zkProperties) throws IOException, ConfigException {
Expand All @@ -51,8 +54,45 @@ private ZooKeeperStateServer(final Properties zkProperties) throws IOException,
}

public synchronized void start() throws IOException {
if (quorumPeerConfig.isDistributed()) {
startDistributed();
} else {
startStandalone();
}
}

private void startStandalone() throws IOException {
logger.info("Starting Embedded ZooKeeper Server");

final ServerConfig config = new ServerConfig();
config.readFrom(quorumPeerConfig);
try {
started = true;

transactionLog = new FileTxnSnapLog(new File(config.getDataLogDir()), new File(config.getDataDir()));

embeddedZkServer = new ZooKeeperServer();
embeddedZkServer.setTxnLogFactory(transactionLog);
embeddedZkServer.setTickTime(config.getTickTime());
embeddedZkServer.setMinSessionTimeout(config.getMinSessionTimeout());
embeddedZkServer.setMaxSessionTimeout(config.getMaxSessionTimeout());

connectionFactory = ServerCnxnFactory.createFactory();
connectionFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());
connectionFactory.startup(embeddedZkServer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Embedded ZooKeeper Server interrupted", e);
} catch (final IOException ioe) {
throw new IOException("Failed to start embedded ZooKeeper Server", ioe);
} catch (final Exception e) {
throw new RuntimeException("Failed to start embedded ZooKeeper Server", e);
}
}

private void startDistributed() throws IOException {
logger.info("Starting Embedded ZooKeeper Peer");

try {
started = true;

Expand Down Expand Up @@ -81,9 +121,9 @@ public synchronized void start() throws IOException {

quorumPeer.start();
} catch (final IOException ioe) {
throw new IOException("Failed to start embedded ZooKeeper Server", ioe);
throw new IOException("Failed to start embedded ZooKeeper Peer", ioe);
} catch (final Exception e) {
throw new RuntimeException("Failed to start embedded ZooKeeper Server", e);
throw new RuntimeException("Failed to start embedded ZooKeeper Peer", e);
}
}

Expand All @@ -107,6 +147,10 @@ public synchronized void shutdown() {
if (quorumPeer != null && quorumPeer.isRunning()) {
quorumPeer.shutdown();
}

if (embeddedZkServer != null && embeddedZkServer.isRunning()) {
embeddedZkServer.shutdown();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
Connect String - A comma-separated list of host:port pairs to connect to ZooKeeper. For example, myhost.mydomain:2181,host2.mydomain:5555,host3:6666
Session Timeout - Specifies how long this instance of NiFi is allowed to be disconnected from ZooKeeper before creating a new ZooKeeper Session. Default value is "3 seconds"
Session Timeout - Specifies how long this instance of NiFi is allowed to be disconnected from ZooKeeper before creating a new ZooKeeper Session. Default value is "30 seconds"
Access Control - Specifies which Access Controls will be applied to the ZooKeeper ZNodes that are created by this State Provider. This value must be set to one of:
- Open : ZNodes will be open to any ZooKeeper client.
Expand Down

0 comments on commit 72c8467

Please sign in to comment.