Skip to content

Commit 047f4e2

Browse files
authored
HBASE-27347 Port FileWatcher from ZK to autodetect keystore/truststore changes in TLS connections (#4869)
Signed-off-by: wchevreuil@apache.org
1 parent 085325c commit 047f4e2

File tree

7 files changed

+1046
-89
lines changed

7 files changed

+1046
-89
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.hadoop.hbase.HConstants;
2626
import org.apache.hadoop.hbase.client.MetricsConnection;
2727
import org.apache.hadoop.hbase.exceptions.X509Exception;
28+
import org.apache.hadoop.hbase.io.FileChangeWatcher;
2829
import org.apache.hadoop.hbase.io.crypto.tls.X509Util;
2930
import org.apache.hadoop.hbase.util.Pair;
3031
import org.apache.yetus.audience.InterfaceAudience;
@@ -49,6 +50,8 @@ public class NettyRpcClient extends AbstractRpcClient<NettyRpcConnection> {
4950

5051
private final boolean shutdownGroupWhenClose;
5152
private final AtomicReference<SslContext> sslContextForClient = new AtomicReference<>();
53+
private final AtomicReference<FileChangeWatcher> keyStoreWatcher = new AtomicReference<>();
54+
private final AtomicReference<FileChangeWatcher> trustStoreWatcher = new AtomicReference<>();
5255

5356
public NettyRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
5457
MetricsConnection metrics) {
@@ -85,6 +88,14 @@ protected void closeInternal() {
8588
if (shutdownGroupWhenClose) {
8689
group.shutdownGracefully();
8790
}
91+
FileChangeWatcher ks = keyStoreWatcher.getAndSet(null);
92+
if (ks != null) {
93+
ks.stop();
94+
}
95+
FileChangeWatcher ts = trustStoreWatcher.getAndSet(null);
96+
if (ts != null) {
97+
ts.stop();
98+
}
8899
}
89100

90101
SslContext getSslContext() throws X509Exception, IOException {
@@ -94,6 +105,12 @@ SslContext getSslContext() throws X509Exception, IOException {
94105
if (!sslContextForClient.compareAndSet(null, result)) {
95106
// lost the race, another thread already set the value
96107
result = sslContextForClient.get();
108+
} else if (
109+
keyStoreWatcher.get() == null && trustStoreWatcher.get() == null
110+
&& conf.getBoolean(X509Util.TLS_CERT_RELOAD, false)
111+
) {
112+
X509Util.enableCertFileReloading(conf, keyStoreWatcher, trustStoreWatcher,
113+
() -> sslContextForClient.set(null));
97114
}
98115
}
99116
return result;
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.io;
19+
20+
import java.io.IOException;
21+
import java.nio.file.ClosedWatchServiceException;
22+
import java.nio.file.FileSystem;
23+
import java.nio.file.Path;
24+
import java.nio.file.StandardWatchEventKinds;
25+
import java.nio.file.WatchEvent;
26+
import java.nio.file.WatchKey;
27+
import java.nio.file.WatchService;
28+
import java.util.function.Consumer;
29+
import org.apache.yetus.audience.InterfaceAudience;
30+
import org.apache.zookeeper.server.ZooKeeperThread;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
/**
35+
* Instances of this class can be used to watch a directory for file changes. When a file is added
36+
* to, deleted from, or is modified in the given directory, the callback provided by the user will
37+
* be called from a background thread. Some things to keep in mind:
38+
* <ul>
39+
* <li>The callback should be thread-safe.</li>
40+
* <li>Changes that happen around the time the thread is started may be missed.</li>
41+
* <li>There is a delay between a file changing and the callback firing.</li>
42+
* <li>The watch is not recursive - changes to subdirectories will not trigger a callback.</li>
43+
* </ul>
44+
* <p/>
45+
* This file has been copied from the Apache ZooKeeper project.
46+
* @see <a href=
47+
* "https://github.com/apache/zookeeper/blob/8148f966947d3ecf3db0b756d93c9ffa88174af9/zookeeper-server/src/main/java/org/apache/zookeeper/common/FileChangeWatcher.java">Base
48+
* revision</a>
49+
*/
50+
@InterfaceAudience.Private
51+
public final class FileChangeWatcher {
52+
53+
private static final Logger LOG = LoggerFactory.getLogger(FileChangeWatcher.class);
54+
55+
public enum State {
56+
NEW, // object created but start() not called yet
57+
STARTING, // start() called but background thread has not entered main loop
58+
RUNNING, // background thread is running
59+
STOPPING, // stop() called but background thread has not exited main loop
60+
STOPPED // stop() called and background thread has exited, or background thread crashed
61+
}
62+
63+
private final WatcherThread watcherThread;
64+
private State state; // protected by synchronized(this)
65+
66+
/**
67+
* Creates a watcher that watches <code>dirPath</code> and invokes <code>callback</code> on
68+
* changes.
69+
* @param dirPath the directory to watch.
70+
* @param callback the callback to invoke with events. <code>event.kind()</code> will return the
71+
* type of event, and <code>event.context()</code> will return the filename
72+
* relative to <code>dirPath</code>.
73+
* @throws IOException if there is an error creating the WatchService.
74+
*/
75+
public FileChangeWatcher(Path dirPath, Consumer<WatchEvent<?>> callback) throws IOException {
76+
FileSystem fs = dirPath.getFileSystem();
77+
WatchService watchService = fs.newWatchService();
78+
79+
LOG.debug("Registering with watch service: {}", dirPath);
80+
81+
dirPath.register(watchService,
82+
new WatchEvent.Kind<?>[] { StandardWatchEventKinds.ENTRY_CREATE,
83+
StandardWatchEventKinds.ENTRY_DELETE, StandardWatchEventKinds.ENTRY_MODIFY,
84+
StandardWatchEventKinds.OVERFLOW });
85+
state = State.NEW;
86+
this.watcherThread = new WatcherThread(watchService, callback);
87+
this.watcherThread.setDaemon(true);
88+
}
89+
90+
/**
91+
* Returns the current {@link FileChangeWatcher.State}.
92+
* @return the current state.
93+
*/
94+
public synchronized State getState() {
95+
return state;
96+
}
97+
98+
/**
99+
* Blocks until the current state becomes <code>desiredState</code>. Currently only used by tests,
100+
* thus package-private.
101+
* @param desiredState the desired state.
102+
* @throws InterruptedException if the current thread gets interrupted.
103+
*/
104+
synchronized void waitForState(State desiredState) throws InterruptedException {
105+
while (this.state != desiredState) {
106+
this.wait();
107+
}
108+
}
109+
110+
/**
111+
* Sets the state to <code>newState</code>.
112+
* @param newState the new state.
113+
*/
114+
private synchronized void setState(State newState) {
115+
state = newState;
116+
this.notifyAll();
117+
}
118+
119+
/**
120+
* Atomically sets the state to <code>update</code> if and only if the state is currently
121+
* <code>expected</code>.
122+
* @param expected the expected state.
123+
* @param update the new state.
124+
* @return true if the update succeeds, or false if the current state does not equal
125+
* <code>expected</code>.
126+
*/
127+
private synchronized boolean compareAndSetState(State expected, State update) {
128+
if (state == expected) {
129+
setState(update);
130+
return true;
131+
} else {
132+
return false;
133+
}
134+
}
135+
136+
/**
137+
* Atomically sets the state to <code>update</code> if and only if the state is currently one of
138+
* <code>expectedStates</code>.
139+
* @param expectedStates the expected states.
140+
* @param update the new state.
141+
* @return true if the update succeeds, or false if the current state does not equal any of the
142+
* <code>expectedStates</code>.
143+
*/
144+
private synchronized boolean compareAndSetState(State[] expectedStates, State update) {
145+
for (State expected : expectedStates) {
146+
if (state == expected) {
147+
setState(update);
148+
return true;
149+
}
150+
}
151+
return false;
152+
}
153+
154+
/**
155+
* Tells the background thread to start. Does not wait for it to be running. Calling this method
156+
* more than once has no effect.
157+
*/
158+
public void start() {
159+
if (!compareAndSetState(State.NEW, State.STARTING)) {
160+
// If previous state was not NEW, start() has already been called.
161+
return;
162+
}
163+
this.watcherThread.start();
164+
}
165+
166+
/**
167+
* Tells the background thread to stop. Does not wait for it to exit.
168+
*/
169+
public void stop() {
170+
if (compareAndSetState(new State[] { State.RUNNING, State.STARTING }, State.STOPPING)) {
171+
watcherThread.interrupt();
172+
}
173+
}
174+
175+
/**
176+
* Inner class that implements the watcher thread logic.
177+
*/
178+
private class WatcherThread extends ZooKeeperThread {
179+
180+
private static final String THREAD_NAME = "FileChangeWatcher";
181+
182+
final WatchService watchService;
183+
final Consumer<WatchEvent<?>> callback;
184+
185+
WatcherThread(WatchService watchService, Consumer<WatchEvent<?>> callback) {
186+
super(THREAD_NAME);
187+
this.watchService = watchService;
188+
this.callback = callback;
189+
}
190+
191+
@Override
192+
public void run() {
193+
try {
194+
LOG.info("{} thread started", getName());
195+
if (
196+
!compareAndSetState(FileChangeWatcher.State.STARTING, FileChangeWatcher.State.RUNNING)
197+
) {
198+
// stop() called shortly after start(), before
199+
// this thread started running.
200+
FileChangeWatcher.State state = FileChangeWatcher.this.getState();
201+
if (state != FileChangeWatcher.State.STOPPING) {
202+
throw new IllegalStateException("Unexpected state: " + state);
203+
}
204+
return;
205+
}
206+
runLoop();
207+
} catch (Exception e) {
208+
LOG.warn("Error in runLoop()", e);
209+
throw e;
210+
} finally {
211+
try {
212+
watchService.close();
213+
} catch (IOException e) {
214+
LOG.warn("Error closing watch service", e);
215+
}
216+
LOG.info("{} thread finished", getName());
217+
FileChangeWatcher.this.setState(FileChangeWatcher.State.STOPPED);
218+
}
219+
}
220+
221+
private void runLoop() {
222+
while (FileChangeWatcher.this.getState() == FileChangeWatcher.State.RUNNING) {
223+
WatchKey key;
224+
try {
225+
key = watchService.take();
226+
} catch (InterruptedException | ClosedWatchServiceException e) {
227+
LOG.debug("{} was interrupted and is shutting down...", getName());
228+
break;
229+
}
230+
for (WatchEvent<?> event : key.pollEvents()) {
231+
LOG.debug("Got file changed event: {} with context: {}", event.kind(), event.context());
232+
try {
233+
callback.accept(event);
234+
} catch (Throwable e) {
235+
LOG.error("Error from callback", e);
236+
}
237+
}
238+
boolean isKeyValid = key.reset();
239+
if (!isKeyValid) {
240+
// This is likely a problem, it means that file reloading is broken, probably because the
241+
// directory we are watching was deleted or otherwise became inaccessible (unmounted,
242+
// permissions
243+
// changed, ???).
244+
// For now, we log an error and exit the watcher thread.
245+
LOG.error("Watch key no longer valid, maybe the directory is inaccessible?");
246+
break;
247+
}
248+
}
249+
}
250+
}
251+
}

0 commit comments

Comments
 (0)