Skip to content

Commit 8b37656

Browse files
GregBestlandtolbertam
authored andcommitted
JAVA-1211: Improve cluster init and shutdown (#1074)
Motivation: To to improve messaging when a user attempts to use a Cluster that had previously been closed or failed to initialize. In particular, this change addresses JAVA-1211, JAVA-1220 and JAVA-1929. Modifications: Adds additional null checks in Cluster.close so if a Cluster failed to completely initialize the resources that were created are closed (JAVA-1211). Keeps reference to an Exception raised during Cluster initialization. This is used to notify the user that the Cluster cannot be used because initialization failed and also prevents successive Cluster.init calls from trying to initialize the Cluster (JAVA-1220). If Session is closed, preempt Session.execute queries with a specific exception indicating that the Session is closed (JAVA-1929). Result: User is now given more clear Exception messages if attempting to use or initialize a Cluster that had previously failed to initialize or was closed. User is also provided more explicit Exception messages when attempting to execute queries against closed Sessions.
1 parent 0c0eb6c commit 8b37656

File tree

5 files changed

+265
-138
lines changed

5 files changed

+265
-138
lines changed

changelog/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
- [improvement] JAVA-1388: Add dynamic port discovery for system.peers\_v2.
2121
- [documentation] JAVA-1810: Note which setters are not propagated to PreparedStatement.
2222
- [bug] JAVA-1944: Surface Read and WriteFailureException to RetryPolicy.
23+
- [bug] JAVA-1211: Fix NPE in cluster close when cluster init fails.
24+
- [bug] JAVA-1220: Fail fast on cluster init if previous init failed.
25+
- [bug] JAVA-1929: Preempt session execute queries if session was closed.
2326

2427
Merged from 3.5.x:
2528

driver-core/src/main/java/com/datastax/driver/core/Cluster.java

Lines changed: 161 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -633,9 +633,14 @@ public boolean isClosed() {
633633
}
634634

635635
private static void checkNotClosed(Manager manager) {
636-
if (manager.isClosed())
636+
if (manager.errorDuringInit()) {
637+
throw new IllegalStateException(
638+
"Can't use this cluster instance because it encountered an error in its initialization",
639+
manager.getInitException());
640+
} else if (manager.isClosed()) {
637641
throw new IllegalStateException(
638642
"Can't use this cluster instance because it was previously closed");
643+
}
639644
}
640645

641646
/**
@@ -1385,9 +1390,9 @@ private static String generateClusterName() {
13851390
class Manager implements Connection.DefaultResponseHandler {
13861391

13871392
final String clusterName;
1388-
private boolean isInit;
1393+
private volatile boolean isInit;
13891394
private volatile boolean isFullyInit;
1390-
1395+
private Exception initException;
13911396
// Initial contacts point
13921397
final List<InetSocketAddress> contactPoints;
13931398
final Set<SessionManager> sessions = new CopyOnWriteArraySet<SessionManager>();
@@ -1448,119 +1453,121 @@ private Manager(
14481453
// on it so synchronized is good enough.
14491454
synchronized void init() {
14501455
checkNotClosed(this);
1451-
if (isInit) return;
1456+
if (isInit) {
1457+
return;
1458+
}
14521459
isInit = true;
1460+
try {
1461+
logger.debug("Starting new cluster with contact points " + contactPoints);
1462+
1463+
this.configuration.register(this);
1464+
1465+
ThreadingOptions threadingOptions = this.configuration.getThreadingOptions();
1466+
1467+
// executor
1468+
ExecutorService tmpExecutor = threadingOptions.createExecutor(clusterName);
1469+
this.executorQueue =
1470+
(tmpExecutor instanceof ThreadPoolExecutor)
1471+
? ((ThreadPoolExecutor) tmpExecutor).getQueue()
1472+
: null;
1473+
this.executor = MoreExecutors.listeningDecorator(tmpExecutor);
1474+
1475+
// blocking executor
1476+
ExecutorService tmpBlockingExecutor = threadingOptions.createBlockingExecutor(clusterName);
1477+
this.blockingExecutorQueue =
1478+
(tmpBlockingExecutor instanceof ThreadPoolExecutor)
1479+
? ((ThreadPoolExecutor) tmpBlockingExecutor).getQueue()
1480+
: null;
1481+
this.blockingExecutor = MoreExecutors.listeningDecorator(tmpBlockingExecutor);
1482+
1483+
// reconnection executor
1484+
this.reconnectionExecutor = threadingOptions.createReconnectionExecutor(clusterName);
1485+
this.reconnectionExecutorQueue =
1486+
(reconnectionExecutor instanceof ThreadPoolExecutor)
1487+
? ((ThreadPoolExecutor) reconnectionExecutor).getQueue()
1488+
: null;
1489+
1490+
// scheduled tasks executor
1491+
this.scheduledTasksExecutor = threadingOptions.createScheduledTasksExecutor(clusterName);
1492+
this.scheduledTasksExecutorQueue =
1493+
(scheduledTasksExecutor instanceof ThreadPoolExecutor)
1494+
? ((ThreadPoolExecutor) scheduledTasksExecutor).getQueue()
1495+
: null;
1496+
1497+
this.reaper = new ConnectionReaper(threadingOptions.createReaperExecutor(clusterName));
1498+
this.metadata = new Metadata(this);
1499+
this.connectionFactory = new Connection.Factory(this, configuration);
1500+
this.controlConnection = new ControlConnection(this);
1501+
this.metrics = configuration.getMetricsOptions().isEnabled() ? new Metrics(this) : null;
1502+
this.preparedQueries = new MapMaker().weakValues().makeMap();
1503+
1504+
// create debouncers - at this stage, they are not running yet
1505+
final QueryOptions queryOptions = configuration.getQueryOptions();
1506+
this.nodeListRefreshRequestDebouncer =
1507+
new EventDebouncer<NodeListRefreshRequest>(
1508+
"Node list refresh",
1509+
scheduledTasksExecutor,
1510+
new NodeListRefreshRequestDeliveryCallback()) {
14531511

1454-
logger.debug("Starting new cluster with contact points " + contactPoints);
1455-
1456-
this.configuration.register(this);
1457-
1458-
ThreadingOptions threadingOptions = this.configuration.getThreadingOptions();
1459-
1460-
// executor
1461-
ExecutorService tmpExecutor = threadingOptions.createExecutor(clusterName);
1462-
this.executorQueue =
1463-
(tmpExecutor instanceof ThreadPoolExecutor)
1464-
? ((ThreadPoolExecutor) tmpExecutor).getQueue()
1465-
: null;
1466-
this.executor = MoreExecutors.listeningDecorator(tmpExecutor);
1467-
1468-
// blocking executor
1469-
ExecutorService tmpBlockingExecutor = threadingOptions.createBlockingExecutor(clusterName);
1470-
this.blockingExecutorQueue =
1471-
(tmpBlockingExecutor instanceof ThreadPoolExecutor)
1472-
? ((ThreadPoolExecutor) tmpBlockingExecutor).getQueue()
1473-
: null;
1474-
this.blockingExecutor = MoreExecutors.listeningDecorator(tmpBlockingExecutor);
1475-
1476-
// reconnection executor
1477-
this.reconnectionExecutor = threadingOptions.createReconnectionExecutor(clusterName);
1478-
this.reconnectionExecutorQueue =
1479-
(reconnectionExecutor instanceof ThreadPoolExecutor)
1480-
? ((ThreadPoolExecutor) reconnectionExecutor).getQueue()
1481-
: null;
1482-
1483-
// scheduled tasks executor
1484-
this.scheduledTasksExecutor = threadingOptions.createScheduledTasksExecutor(clusterName);
1485-
this.scheduledTasksExecutorQueue =
1486-
(scheduledTasksExecutor instanceof ThreadPoolExecutor)
1487-
? ((ThreadPoolExecutor) scheduledTasksExecutor).getQueue()
1488-
: null;
1489-
1490-
this.reaper = new ConnectionReaper(threadingOptions.createReaperExecutor(clusterName));
1491-
this.metadata = new Metadata(this);
1492-
this.connectionFactory = new Connection.Factory(this, configuration);
1493-
this.controlConnection = new ControlConnection(this);
1494-
this.metrics = configuration.getMetricsOptions().isEnabled() ? new Metrics(this) : null;
1495-
this.preparedQueries = new MapMaker().weakValues().makeMap();
1496-
1497-
// create debouncers - at this stage, they are not running yet
1498-
final QueryOptions queryOptions = configuration.getQueryOptions();
1499-
this.nodeListRefreshRequestDebouncer =
1500-
new EventDebouncer<NodeListRefreshRequest>(
1501-
"Node list refresh",
1502-
scheduledTasksExecutor,
1503-
new NodeListRefreshRequestDeliveryCallback()) {
1504-
1505-
@Override
1506-
int maxPendingEvents() {
1507-
return configuration.getQueryOptions().getMaxPendingRefreshNodeListRequests();
1508-
}
1512+
@Override
1513+
int maxPendingEvents() {
1514+
return configuration.getQueryOptions().getMaxPendingRefreshNodeListRequests();
1515+
}
15091516

1510-
@Override
1511-
long delayMs() {
1512-
return configuration.getQueryOptions().getRefreshNodeListIntervalMillis();
1513-
}
1514-
};
1515-
this.nodeRefreshRequestDebouncer =
1516-
new EventDebouncer<NodeRefreshRequest>(
1517-
"Node refresh", scheduledTasksExecutor, new NodeRefreshRequestDeliveryCallback()) {
1517+
@Override
1518+
long delayMs() {
1519+
return configuration.getQueryOptions().getRefreshNodeListIntervalMillis();
1520+
}
1521+
};
1522+
this.nodeRefreshRequestDebouncer =
1523+
new EventDebouncer<NodeRefreshRequest>(
1524+
"Node refresh", scheduledTasksExecutor, new NodeRefreshRequestDeliveryCallback()) {
15181525

1519-
@Override
1520-
int maxPendingEvents() {
1521-
return configuration.getQueryOptions().getMaxPendingRefreshNodeRequests();
1522-
}
1526+
@Override
1527+
int maxPendingEvents() {
1528+
return configuration.getQueryOptions().getMaxPendingRefreshNodeRequests();
1529+
}
15231530

1524-
@Override
1525-
long delayMs() {
1526-
return configuration.getQueryOptions().getRefreshNodeIntervalMillis();
1527-
}
1528-
};
1529-
this.schemaRefreshRequestDebouncer =
1530-
new EventDebouncer<SchemaRefreshRequest>(
1531-
"Schema refresh",
1532-
scheduledTasksExecutor,
1533-
new SchemaRefreshRequestDeliveryCallback()) {
1531+
@Override
1532+
long delayMs() {
1533+
return configuration.getQueryOptions().getRefreshNodeIntervalMillis();
1534+
}
1535+
};
1536+
this.schemaRefreshRequestDebouncer =
1537+
new EventDebouncer<SchemaRefreshRequest>(
1538+
"Schema refresh",
1539+
scheduledTasksExecutor,
1540+
new SchemaRefreshRequestDeliveryCallback()) {
15341541

1535-
@Override
1536-
int maxPendingEvents() {
1537-
return configuration.getQueryOptions().getMaxPendingRefreshSchemaRequests();
1538-
}
1542+
@Override
1543+
int maxPendingEvents() {
1544+
return configuration.getQueryOptions().getMaxPendingRefreshSchemaRequests();
1545+
}
15391546

1540-
@Override
1541-
long delayMs() {
1542-
return configuration.getQueryOptions().getRefreshSchemaIntervalMillis();
1543-
}
1544-
};
1547+
@Override
1548+
long delayMs() {
1549+
return configuration.getQueryOptions().getRefreshSchemaIntervalMillis();
1550+
}
1551+
};
15451552

1546-
this.scheduledTasksExecutor.scheduleWithFixedDelay(
1547-
new CleanupIdleConnectionsTask(), 10, 10, TimeUnit.SECONDS);
1553+
this.scheduledTasksExecutor.scheduleWithFixedDelay(
1554+
new CleanupIdleConnectionsTask(), 10, 10, TimeUnit.SECONDS);
15481555

1549-
for (InetSocketAddress address : contactPoints) {
1550-
// We don't want to signal -- call onAdd() -- because nothing is ready
1551-
// yet (loadbalancing policy, control connection, ...). All we want is
1552-
// create the Host object so we can initialize the control connection.
1553-
metadata.addIfAbsent(metadata.newHost(address));
1554-
}
1556+
for (InetSocketAddress address : contactPoints) {
1557+
// We don't want to signal -- call onAdd() -- because nothing is ready
1558+
// yet (loadbalancing policy, control connection, ...). All we want is
1559+
// create the Host object so we can initialize the control connection.
1560+
metadata.addIfAbsent(metadata.newHost(address));
1561+
}
15551562

1556-
Collection<Host> allHosts = metadata.allHosts();
1563+
Collection<Host> allHosts = metadata.allHosts();
15571564

1558-
// At this stage, metadata.allHosts() only contains the contact points, that's what we want to
1559-
// pass to LBP.init().
1560-
// But the control connection will initialize first and discover more hosts, so make a copy.
1561-
Set<Host> contactPointHosts = Sets.newHashSet(allHosts);
1565+
// At this stage, metadata.allHosts() only contains the contact points, that's what we want
1566+
// to
1567+
// pass to LBP.init().
1568+
// But the control connection will initialize first and discover more hosts, so make a copy.
1569+
Set<Host> contactPointHosts = Sets.newHashSet(allHosts);
15621570

1563-
try {
15641571
negotiateProtocolVersionAndConnect();
15651572

15661573
// The control connection can mark hosts down if it failed to connect to them, or remove
@@ -1631,7 +1638,8 @@ long delayMs() {
16311638
this.nodeRefreshRequestDebouncer.start();
16321639

16331640
isFullyInit = true;
1634-
} catch (NoHostAvailableException e) {
1641+
} catch (RuntimeException e) {
1642+
initException = e;
16351643
close();
16361644
throw e;
16371645
}
@@ -1717,6 +1725,14 @@ boolean isClosed() {
17171725
return closeFuture.get() != null;
17181726
}
17191727

1728+
boolean errorDuringInit() {
1729+
return (isInit && initException != null);
1730+
}
1731+
1732+
Exception getInitException() {
1733+
return initException;
1734+
}
1735+
17201736
private CloseFuture close() {
17211737

17221738
CloseFuture future = closeFuture.get();
@@ -1726,9 +1742,15 @@ private CloseFuture close() {
17261742
logger.debug("Shutting down");
17271743

17281744
// stop debouncers
1729-
nodeListRefreshRequestDebouncer.stop();
1730-
nodeRefreshRequestDebouncer.stop();
1731-
schemaRefreshRequestDebouncer.stop();
1745+
if (nodeListRefreshRequestDebouncer != null) {
1746+
nodeListRefreshRequestDebouncer.stop();
1747+
}
1748+
if (nodeRefreshRequestDebouncer != null) {
1749+
nodeRefreshRequestDebouncer.stop();
1750+
}
1751+
if (schemaRefreshRequestDebouncer != null) {
1752+
schemaRefreshRequestDebouncer.stop();
1753+
}
17321754

17331755
// If we're shutting down, there is no point in waiting on scheduled reconnections, nor on
17341756
// notifications
@@ -1739,7 +1761,9 @@ private CloseFuture close() {
17391761

17401762
// but for the worker executor, we want to let submitted tasks finish unless the shutdown is
17411763
// forced.
1742-
executor.shutdown();
1764+
if (executor != null) {
1765+
executor.shutdown();
1766+
}
17431767

17441768
// We also close the metrics
17451769
if (metrics != null) metrics.shutdown();
@@ -1756,7 +1780,9 @@ private CloseFuture close() {
17561780

17571781
// Then we shutdown all connections
17581782
List<CloseFuture> futures = new ArrayList<CloseFuture>(sessions.size() + 1);
1759-
futures.add(controlConnection.closeAsync());
1783+
if (controlConnection != null) {
1784+
futures.add(controlConnection.closeAsync());
1785+
}
17601786
for (Session session : sessions) futures.add(session.closeAsync());
17611787

17621788
future = new ClusterCloseFuture(futures);
@@ -1771,11 +1797,13 @@ private CloseFuture close() {
17711797
}
17721798

17731799
private void shutdownNow(ExecutorService executor) {
1774-
List<Runnable> pendingTasks = executor.shutdownNow();
1775-
// If some tasks were submitted to this executor but not yet commenced, make sure the
1776-
// corresponding futures complete
1777-
for (Runnable pendingTask : pendingTasks) {
1778-
if (pendingTask instanceof FutureTask<?>) ((FutureTask<?>) pendingTask).cancel(false);
1800+
if (executor != null) {
1801+
List<Runnable> pendingTasks = executor.shutdownNow();
1802+
// If some tasks were submitted to this executor but not yet commenced, make sure the
1803+
// corresponding futures complete
1804+
for (Runnable pendingTask : pendingTasks) {
1805+
if (pendingTask instanceof FutureTask<?>) ((FutureTask<?>) pendingTask).cancel(false);
1806+
}
17791807
}
17801808
}
17811809

@@ -2729,17 +2757,27 @@ public void run() {
27292757
// user
27302758
// call force(), we'll never really block forever.
27312759
try {
2732-
reconnectionExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
2733-
scheduledTasksExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
2734-
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
2735-
blockingExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
2760+
if (reconnectionExecutor != null) {
2761+
reconnectionExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
2762+
}
2763+
if (scheduledTasksExecutor != null) {
2764+
scheduledTasksExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
2765+
}
2766+
if (executor != null) {
2767+
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
2768+
}
2769+
if (blockingExecutor != null) {
2770+
blockingExecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
2771+
}
27362772

27372773
// Some of the jobs on the executors can be doing query stuff, so close the
27382774
// connectionFactory at the very last
2739-
connectionFactory.shutdown();
2740-
2741-
reaper.shutdown();
2742-
2775+
if (connectionFactory != null) {
2776+
connectionFactory.shutdown();
2777+
}
2778+
if (reaper != null) {
2779+
reaper.shutdown();
2780+
}
27432781
set(null);
27442782
} catch (InterruptedException e) {
27452783
Thread.currentThread().interrupt();

0 commit comments

Comments
 (0)