Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion common/src/test/java/org/apache/drill/test/DrillTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class DrillTest {
static MemWatcher memWatcher;
static String className;

@Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(100000);
@Rule public final TestRule TIMEOUT = TestTools.getTimeoutRule(100_000);

@Rule public final TestLogReporter logOutcome = LOG_OUTCOME;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ public double getAffinityFactor() {
* @param foremanNode The driving/foreman node for this query. (this node)
* @param queryId The queryId for this query.
* @param activeEndpoints The list of endpoints to consider for inclusion in planning this query.
* @param reader Tool used to read JSON plans
* @param rootFragment The root node of the PhysicalPlan that we will be parallelizing.
* @param session UserSession of user who launched this query.
* @param queryContextInfo Info related to the context when query has started.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ public synchronized void close() {
waitForGracePeriod();
stateManager.setState(DrillbitState.DRAINING);
// wait for all the in-flight queries to finish
manager.waitToExit(this, forcefulShutdown);
manager.waitToExit(forcefulShutdown);
//safe to exit
registrationHandle = coord.update(registrationHandle, State.OFFLINE);
stateManager.setState(DrillbitState.OFFLINE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.drill.common.SelfCleaningRunnable;
import org.apache.drill.common.concurrent.ExtendedLatch;
import org.apache.drill.exec.coord.ClusterCoordinator;
import org.apache.drill.exec.metrics.DrillMetrics;
import org.apache.drill.exec.proto.BitControl.FragmentStatus;
Expand All @@ -37,7 +36,6 @@
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.DataConnectionCreator;
import org.apache.drill.exec.server.BootStrapContext;
import org.apache.drill.exec.server.Drillbit;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.store.sys.PersistentStoreProvider;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
Expand All @@ -50,9 +48,12 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Manages the running fragments in a Drillbit. Periodically requests run-time stats updates from fragments
Expand All @@ -61,12 +62,14 @@
public class WorkManager implements AutoCloseable {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WorkManager.class);

public static final int EXIT_TIMEOUT_MS = 5000;

/*
* We use a {@see java.util.concurrent.ConcurrentHashMap} because it promises never to throw a
* {@see java.util.ConcurrentModificationException}; we need that because the statusThread may
* iterate over the map while other threads add FragmentExecutors via the {@see #WorkerBee}.
*/
private final Map<FragmentHandle, FragmentExecutor> runningFragments = new ConcurrentHashMap<>();
private final ConcurrentMap<FragmentHandle, FragmentExecutor> runningFragments = Maps.newConcurrentMap();

private final ConcurrentMap<QueryId, Foreman> queries = Maps.newConcurrentMap();

Expand All @@ -79,8 +82,8 @@ public class WorkManager implements AutoCloseable {
private final WorkEventBus workBus;
private final Executor executor;
private final StatusThread statusThread;
private long numOfRunningQueries;
private long numOfRunningFragments;
private final Lock isEmptyLock = new ReentrantLock();
private final Condition isEmptyCondition = isEmptyLock.newCondition();

/**
* How often the StatusThread collects statistics about running fragments.
Expand Down Expand Up @@ -162,51 +165,67 @@ public DrillbitContext getContext() {
return dContext;
}

private ExtendedLatch exitLatch = null; // used to wait to exit when things are still running
public void waitToExit(final boolean forcefulShutdown) {
isEmptyLock.lock();

/**
* Waits until it is safe to exit. Blocks until all currently running fragments have completed.
*
* <p>This is intended to be used by {@link org.apache.drill.exec.server.Drillbit#close()}.</p>
*/
public void waitToExit(Drillbit bit, boolean forcefulShutdown) {
synchronized(this) {
numOfRunningQueries = queries.size();
numOfRunningFragments = runningFragments.size();
if ( queries.isEmpty() && runningFragments.isEmpty()) {
return;
try {
if (forcefulShutdown) {
final long startTime = System.currentTimeMillis();
final long endTime = startTime + EXIT_TIMEOUT_MS;
long currentTime;

while (!areQueriesAndFragmentsEmpty() && (currentTime = System.currentTimeMillis()) < endTime) {
try {
if (!isEmptyCondition.await(endTime - currentTime, TimeUnit.MILLISECONDS)) {
break;
}
} catch (InterruptedException e) {
logger.error("Interrupted while waiting to exit");
}
}

if (!areQueriesAndFragmentsEmpty()) {
logger.warn("Timed out after {} millis. Shutting down before all fragments and foremen " +
"have completed.", EXIT_TIMEOUT_MS);

for (QueryId queryId: queries.keySet()) {
logger.warn("Query {} is still running.", QueryIdHelper.getQueryId(queryId));
}

for (FragmentHandle fragmentHandle: runningFragments.keySet()) {
logger.warn("Fragment {} is still running.", QueryIdHelper.getQueryIdentifier(fragmentHandle));
}
}
} else {
while (!areQueriesAndFragmentsEmpty()) {
isEmptyCondition.awaitUninterruptibly();
}
}
logger.info("Draining " + queries +" queries and "+ runningFragments+" fragments.");
exitLatch = new ExtendedLatch();
}
// Wait uninterruptibly until all the queries and running fragments on that drillbit goes down
// to zero
if( forcefulShutdown ) {
exitLatch.awaitUninterruptibly(5000);
} else {
exitLatch.awaitUninterruptibly();
} finally {
isEmptyLock.unlock();
}
}

private boolean areQueriesAndFragmentsEmpty() {
return queries.isEmpty() && runningFragments.isEmpty();
}

/**
* If it is safe to exit, and the exitLatch is in use, signals it so that waitToExit() will
* unblock. Logs the number of pending fragments and queries that are running on that
* drillbit to track the progress of shutdown process.
* A thread calling the {@link #waitToExit(boolean)} method is notified when a foreman is retired.
*/
private void indicateIfSafeToExit() {
synchronized(this) {
if (exitLatch != null) {
logger.info("Waiting for "+ queries.size() +" queries to complete before shutting down");
logger.info("Waiting for "+ runningFragments.size() +" running fragments to complete before shutting down");
if(runningFragments.size() > numOfRunningFragments|| queries.size() > numOfRunningQueries) {
logger.info("New Fragments or queries are added while drillbit is Shutting down");
}
if (queries.isEmpty() && runningFragments.isEmpty()) {
// Both Queries and Running fragments are empty.
// So its safe for the drillbit to exit.
exitLatch.countDown();
}
isEmptyLock.lock();
try {
logger.info("Waiting for "+ queries.size() +" queries to complete before shutting down");
logger.info("Waiting for "+ runningFragments.size() +" running fragments to complete before shutting down");

if (!areQueriesAndFragmentsEmpty()) {
logger.info("New Fragments or queries are added while drillbit is Shutting down");
} else {
isEmptyCondition.signal();
}
} finally {
isEmptyLock.unlock();
}
}
/**
Expand Down Expand Up @@ -256,9 +275,9 @@ public void retireForeman(final Foreman foreman) {

final QueryId queryId = foreman.getQueryId();
final boolean wasRemoved = queries.remove(queryId, foreman);

if (!wasRemoved) {
logger.warn("Couldn't find retiring Foreman for query " + queryId);
// throw new IllegalStateException("Couldn't find retiring Foreman for query " + queryId);
}

indicateIfSafeToExit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,13 +801,18 @@ public void close() throws Exception {
// Remove the Foreman from the running query list.
fragmentsRunner.getBee().retireForeman(Foreman.this);

try {
queryContext.close();
} catch (Exception e) {
logger.error("Unable to close query context for query {}", QueryIdHelper.getQueryId(queryId), e);
}

try {
queryManager.close();
} catch (final Exception e) {
logger.warn("unable to close query manager", e);
}


queryStateProcessor.close();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.drill.exec.proto.UserBitShared.QueryResult.QueryState;
import org.apache.drill.exec.proto.UserProtos.GetQueryPlanFragments;
import org.apache.drill.exec.proto.UserProtos.QueryPlanFragments;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.UserClientConnection;
import org.apache.drill.exec.server.DrillbitContext;
import org.apache.drill.exec.util.Pointer;
Expand Down Expand Up @@ -64,7 +65,7 @@ public class PlanSplitter {
public QueryPlanFragments planFragments(DrillbitContext dContext, QueryId queryId,
GetQueryPlanFragments req, UserClientConnection connection) {
QueryPlanFragments.Builder responseBuilder = QueryPlanFragments.newBuilder();
QueryContext queryContext = new QueryContext(connection.getSession(), dContext, queryId);
final QueryContext queryContext = new QueryContext(connection.getSession(), dContext, queryId);

responseBuilder.setQueryId(queryId);

Expand All @@ -79,6 +80,14 @@ public QueryPlanFragments planFragments(DrillbitContext dContext, QueryId queryI
responseBuilder.setStatus(QueryState.FAILED);
responseBuilder.setError(error);
}

try {
queryContext.close();
} catch (Exception e) {
logger.error("Error closing QueryContext when getting plan fragments for query {}.",
QueryIdHelper.getQueryId(queryId), e);
}

return responseBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ private void reset() {
closeClient();
FileUtils.cleanDirectory(udfDir);
dirTestWatcher.clear();
setupDefaultTestCluster();
setup();
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Loading