Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@
<version>${project.version}</version>
</dependency>
<!-- inter-project -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,15 @@
*/
package org.apache.hadoop.hive.common.log;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import jline.TerminalFactory;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.fusesource.jansi.Ansi;

import javax.annotation.Nullable;
import java.io.PrintStream;
import java.io.StringWriter;
import java.text.DecimalFormat;
import java.util.List;
import java.util.stream.Collectors;

import static org.fusesource.jansi.Ansi.ansi;
import static org.fusesource.jansi.internal.CLibrary.*;
Expand Down Expand Up @@ -158,12 +155,8 @@ public void render(ProgressMonitor monitor) {


// Map 1 .......... container SUCCEEDED 7 7 0 0 0 0
List<String> printReady = Lists.transform(monitor.rows(), new Function<List<String>, String>() {
@Override
public String apply(List<String> row) {
return String.format(VERTEX_FORMAT, row.toArray());
}
});
List<String> printReady =
monitor.rows().stream().map(row -> String.format(VERTEX_FORMAT, row.toArray())).collect(Collectors.toList());
reprintMultiLine(StringUtils.join(printReady, "\n"));

// -------------------------------------------------------------------------------
Expand Down
4 changes: 4 additions & 0 deletions llap-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@
</dependency>

<!-- inter-project -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public void onFailure(Throwable t) {
LOG.warn("RequestManager shutdown with error", t);
}
}
});
}, MoreExecutors.directExecutor());
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions llap-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@
<version>${project.version}</version>
</dependency>
<!-- inter-project -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ public void onFailure(Throwable t) {
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), t);
}
}
});
}, MoreExecutors.directExecutor());
// TODO: why is this needed? we could just save the host and port?
nodeId = LlapNodeId.getInstance(localAddress.get().getHostName(), localAddress.get().getPort());
LOG.info("AMReporter running with DaemonId: {}, NodeId: {}", daemonId, nodeId);
Expand Down Expand Up @@ -276,7 +276,7 @@ public void onFailure(Throwable t) {
LOG.warn("Failed to send taskKilled for {}. The attempt will likely time out.",
taskAttemptId);
}
});
}, MoreExecutors.directExecutor());
}

public void queryComplete(QueryIdentifier queryIdentifier) {
Expand Down Expand Up @@ -344,7 +344,7 @@ public void onFailure(Throwable t) {
amNodeInfo.amNodeId, currentQueryIdentifier, t);
queryFailedHandler.queryFailed(currentQueryIdentifier);
}
});
}, MoreExecutors.directExecutor());
}
}
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public synchronized void registerTask(RuntimeTask task,
sendCounterInterval, maxEventsToGet, requestCounter, containerIdStr, initialEvent,
fragmentRequestId, wmCounters);
ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
Futures.addCallback(future, new HeartbeatCallback(errorReporter));
Futures.addCallback(future, new HeartbeatCallback(errorReporter), MoreExecutors.directExecutor());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public TaskExecutorService(int numExecutors, int waitQueueSize,
executionCompletionExecutorService = MoreExecutors.listeningDecorator(
executionCompletionExecutorServiceRaw);
ListenableFuture<?> future = waitQueueExecutorService.submit(new WaitQueueWorker());
Futures.addCallback(future, new WaitQueueWorkerCallback());
Futures.addCallback(future, new WaitQueueWorkerCallback(), MoreExecutors.directExecutor());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -806,15 +806,17 @@ public void run() {
}, 0, 10000L, TimeUnit.MILLISECONDS);

nodeEnablerFuture = nodeEnabledExecutor.submit(nodeEnablerCallable);
Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG));
Futures.addCallback(nodeEnablerFuture, new LoggingFutureCallback("NodeEnablerThread", LOG),
MoreExecutors.directExecutor());

delayedTaskSchedulerFuture =
delayedTaskSchedulerExecutor.submit(delayedTaskSchedulerCallable);
Futures.addCallback(delayedTaskSchedulerFuture,
new LoggingFutureCallback("DelayedTaskSchedulerThread", LOG));
Futures.addCallback(delayedTaskSchedulerFuture, new LoggingFutureCallback("DelayedTaskSchedulerThread", LOG),
MoreExecutors.directExecutor());

schedulerFuture = schedulerExecutor.submit(schedulerCallable);
Futures.addCallback(schedulerFuture, new LoggingFutureCallback("SchedulerThread", LOG));
Futures.addCallback(schedulerFuture, new LoggingFutureCallback("SchedulerThread", LOG),
MoreExecutors.directExecutor());

registry.start();
activeInstances = registry.getInstances();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

Expand Down Expand Up @@ -1102,7 +1103,7 @@ private static int transferSessionsToDestroy(Collection<WmTezSession> source,
}

private void failOnFutureFailure(ListenableFuture<?> future) {
Futures.addCallback(future, FATAL_ERROR_CALLBACK);
Futures.addCallback(future, FATAL_ERROR_CALLBACK, MoreExecutors.directExecutor());
}

private void queueGetRequestOnMasterThread(
Expand Down Expand Up @@ -1936,7 +1937,7 @@ public SessionInitContext(SettableFuture<WmTezSession> future,

public void start() throws Exception {
ListenableFuture<WmTezSession> getFuture = tezAmPool.getSessionAsync();
Futures.addCallback(getFuture, this);
Futures.addCallback(getFuture, this, MoreExecutors.directExecutor());
}

@Override
Expand Down Expand Up @@ -1990,7 +1991,7 @@ public void onSuccess(WmTezSession session) {
case GETTING: {
ListenableFuture<WmTezSession> waitFuture = session.waitForAmRegistryAsync(
amRegistryTimeoutMs, timeoutPool);
Futures.addCallback(waitFuture, this);
Futures.addCallback(waitFuture, this, MoreExecutors.directExecutor());
break;
}
case WAITING_FOR_REGISTRY: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import java.io.IOException;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -128,7 +129,7 @@ public void onSuccess(Boolean result) {
public void onFailure(Throwable t) {
future.setException(t);
}
});
}, MoreExecutors.directExecutor());
return future;
}

Expand Down
4 changes: 4 additions & 0 deletions standalone-metastore/metastore-tools/tools-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@
<groupId>org.apache.hive</groupId>
<artifactId>hive-common</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
Expand Down
5 changes: 2 additions & 3 deletions storage-api/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,13 @@
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

<!-- test inter-project -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>test</scope>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, All. This seems to cause a regression at hive-storage-api 2.8.0.

</dependency>

<!-- test inter-project -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down