Skip to content

Commit

Permalink
[HUDI-7112] Reuse existing timeline server and performance improvemen…
Browse files Browse the repository at this point in the history
…ts (apache#10122)

- Reuse timeline server across tables. 

---------

Co-authored-by: sivabalan <n.siva.b@gmail.com>
  • Loading branch information
the-other-tim-brown and nsivabalan authored Nov 23, 2023
1 parent 72ff9a7 commit 3d21285
Show file tree
Hide file tree
Showing 25 changed files with 566 additions and 173 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig)
if (timelineServer.isPresent() && shouldStopTimelineServer) {
// Stop only if owner
LOG.info("Stopping Timeline service !!");
timelineServer.get().stop();
timelineServer.get().stopForBasePath(basePath);
}

timelineServer = Option.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,66 +23,42 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/**
* Helper class to instantiate embedded timeline service.
*/
public class EmbeddedTimelineServerHelper {

private static final Logger LOG = LoggerFactory.getLogger(EmbeddedTimelineService.class);

private static Option<EmbeddedTimelineService> TIMELINE_SERVER = Option.empty();

/**
* Instantiate Embedded Timeline Server.
* @param context Hoodie Engine Context
* @param config Hoodie Write Config
* @return TimelineServer if configured to run
* @throws IOException
*/
public static synchronized Option<EmbeddedTimelineService> createEmbeddedTimelineService(
public static Option<EmbeddedTimelineService> createEmbeddedTimelineService(
HoodieEngineContext context, HoodieWriteConfig config) throws IOException {
if (config.isEmbeddedTimelineServerReuseEnabled()) {
if (!TIMELINE_SERVER.isPresent() || !TIMELINE_SERVER.get().canReuseFor(config.getBasePath())) {
TIMELINE_SERVER = Option.of(startTimelineService(context, config));
} else {
updateWriteConfigWithTimelineServer(TIMELINE_SERVER.get(), config);
}
return TIMELINE_SERVER;
}
if (config.isEmbeddedTimelineServerEnabled()) {
return Option.of(startTimelineService(context, config));
Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
EmbeddedTimelineService timelineService = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(context, hostAddr.orElse(null), config);
updateWriteConfigWithTimelineServer(timelineService, config);
return Option.of(timelineService);
} else {
return Option.empty();
}
}

private static EmbeddedTimelineService startTimelineService(
HoodieEngineContext context, HoodieWriteConfig config) throws IOException {
// Run Embedded Timeline Server
LOG.info("Starting Timeline service !!");
Option<String> hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST);
EmbeddedTimelineService timelineService = new EmbeddedTimelineService(
context, hostAddr.orElse(null), config);
timelineService.startServer();
updateWriteConfigWithTimelineServer(timelineService, config);
return timelineService;
}

/**
* Adjusts hoodie write config with timeline server settings.
* @param timelineServer Embedded Timeline Server
* @param config Hoodie Write Config
*/
public static void updateWriteConfigWithTimelineServer(EmbeddedTimelineService timelineServer,
HoodieWriteConfig config) {
HoodieWriteConfig config) {
// Allow executor to find this newly instantiated timeline service
if (config.isEmbeddedTimelineServerEnabled()) {
config.setViewStorageConfig(timelineServer.getRemoteFileSystemViewConfig());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.table.marker.MarkerType;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
Expand All @@ -29,37 +30,109 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.timeline.service.TimelineService;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Timeline Service that runs as part of write client.
*/
public class EmbeddedTimelineService {
// lock used when starting/stopping/modifying embedded services
private static final Object SERVICE_LOCK = new Object();

private static final Logger LOG = LoggerFactory.getLogger(EmbeddedTimelineService.class);

private static final AtomicInteger NUM_SERVERS_RUNNING = new AtomicInteger(0);
// Map of TimelineServiceIdentifier to existing timeline service running
private static final Map<TimelineServiceIdentifier, EmbeddedTimelineService> RUNNING_SERVICES = new HashMap<>();
private static final Registry METRICS_REGISTRY = Registry.getRegistry("TimelineService");
private static final String NUM_EMBEDDED_TIMELINE_SERVERS = "numEmbeddedTimelineServers";
private int serverPort;
private String hostAddr;
private HoodieEngineContext context;
private final HoodieEngineContext context;
private final SerializableConfiguration hadoopConf;
private final HoodieWriteConfig writeConfig;
private final String basePath;
private TimelineService.Config serviceConfig;
private final TimelineServiceIdentifier timelineServiceIdentifier;
private final Set<String> basePaths; // the set of base paths using this EmbeddedTimelineService

private transient FileSystemViewManager viewManager;
private transient TimelineService server;

public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
private EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig,
TimelineServiceIdentifier timelineServiceIdentifier) {
setHostAddr(embeddedTimelineServiceHostAddr);
this.context = context;
this.writeConfig = writeConfig;
this.basePath = writeConfig.getBasePath();
this.timelineServiceIdentifier = timelineServiceIdentifier;
this.basePaths = new HashSet<>();
this.basePaths.add(writeConfig.getBasePath());
this.hadoopConf = context.getHadoopConf();
this.viewManager = createViewManager();
}

/**
* Returns an existing embedded timeline service if one is running for the given configuration and reuse is enabled, or starts a new one.
* @param context The {@link HoodieEngineContext} for the client
* @param embeddedTimelineServiceHostAddr The host address to use for the service (nullable)
* @param writeConfig The {@link HoodieWriteConfig} for the client
* @return A running {@link EmbeddedTimelineService}
* @throws IOException if an error occurs while starting the service
*/
public static EmbeddedTimelineService getOrStartEmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) throws IOException {
return getOrStartEmbeddedTimelineService(context, embeddedTimelineServiceHostAddr, writeConfig, TimelineService::new);
}

static EmbeddedTimelineService getOrStartEmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig,
TimelineServiceCreator timelineServiceCreator) throws IOException {
TimelineServiceIdentifier timelineServiceIdentifier = getTimelineServiceIdentifier(embeddedTimelineServiceHostAddr, writeConfig);
// if reuse is enabled, check if any existing instances are compatible
if (writeConfig.isEmbeddedTimelineServerReuseEnabled()) {
synchronized (SERVICE_LOCK) {
if (RUNNING_SERVICES.containsKey(timelineServiceIdentifier)) {
RUNNING_SERVICES.get(timelineServiceIdentifier).addBasePath(writeConfig.getBasePath());
LOG.info("Reusing existing embedded timeline server with configuration: " + RUNNING_SERVICES.get(timelineServiceIdentifier).serviceConfig);
return RUNNING_SERVICES.get(timelineServiceIdentifier);
}
// if no compatible instance is found, create a new one
EmbeddedTimelineService service = createAndStartService(context, embeddedTimelineServiceHostAddr, writeConfig,
timelineServiceCreator, timelineServiceIdentifier);
RUNNING_SERVICES.put(timelineServiceIdentifier, service);
return service;
}
}
// if not, create a new instance. If reuse is not enabled, there is no need to add it to RUNNING_SERVICES
return createAndStartService(context, embeddedTimelineServiceHostAddr, writeConfig, timelineServiceCreator, timelineServiceIdentifier);
}

private static EmbeddedTimelineService createAndStartService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig,
TimelineServiceCreator timelineServiceCreator,
TimelineServiceIdentifier timelineServiceIdentifier) throws IOException {
EmbeddedTimelineService service = new EmbeddedTimelineService(context, embeddedTimelineServiceHostAddr, writeConfig, timelineServiceIdentifier);
service.startServer(timelineServiceCreator);
METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, NUM_SERVERS_RUNNING.incrementAndGet());
return service;
}

public static void shutdownAllTimelineServers() {
RUNNING_SERVICES.entrySet().forEach(entry -> {
LOG.info("Closing Timeline server");
entry.getValue().server.close();
METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, NUM_SERVERS_RUNNING.decrementAndGet());
LOG.info("Closed Timeline server");
});
RUNNING_SERVICES.clear();
}

private FileSystemViewManager createViewManager() {
// Using passed-in configs to build view storage configs
FileSystemViewStorageConfig.Builder builder =
Expand All @@ -73,7 +146,7 @@ private FileSystemViewManager createViewManager() {
return FileSystemViewManager.createViewManagerWithTableMetadata(context, writeConfig.getMetadataConfig(), builder.build(), writeConfig.getCommonConfig());
}

public void startServer() throws IOException {
private void startServer(TimelineServiceCreator timelineServiceCreator) throws IOException {
TimelineService.Config.Builder timelineServiceConfBuilder = TimelineService.Config.builder()
.serverPort(writeConfig.getEmbeddedTimelineServerPort())
.numThreads(writeConfig.getEmbeddedTimelineServerThreads())
Expand Down Expand Up @@ -106,12 +179,20 @@ public void startServer() throws IOException {
.enableInstantStateRequests(true);
}

server = new TimelineService(context, hadoopConf.newCopy(), timelineServiceConfBuilder.build(),
FSUtils.getFs(basePath, hadoopConf.newCopy()), viewManager);
this.serviceConfig = timelineServiceConfBuilder.build();

server = timelineServiceCreator.create(context, hadoopConf.newCopy(), serviceConfig,
FSUtils.getFs(writeConfig.getBasePath(), hadoopConf.newCopy()), createViewManager());
serverPort = server.startService();
LOG.info("Started embedded timeline server at " + hostAddr + ":" + serverPort);
}

@FunctionalInterface
interface TimelineServiceCreator {
TimelineService create(HoodieEngineContext context, Configuration hadoopConf, TimelineService.Config timelineServerConf,
FileSystem fileSystem, FileSystemViewManager globalFileSystemViewManager) throws IOException;
}

private void setHostAddr(String embeddedTimelineServiceHostAddr) {
if (embeddedTimelineServiceHostAddr != null) {
LOG.info("Overriding hostIp to (" + embeddedTimelineServiceHostAddr + ") found in spark-conf. It was " + this.hostAddr);
Expand Down Expand Up @@ -146,19 +227,80 @@ public FileSystemViewManager getViewManager() {
return viewManager;
}

public boolean canReuseFor(String basePath) {
return this.server != null
&& this.viewManager != null
&& this.basePath.equals(basePath);
/**
* Adds a new base path to the set that are managed by this instance.
* @param basePath the new base path to add
*/
private void addBasePath(String basePath) {
basePaths.add(basePath);
}

public void stop() {
if (null != server) {
/**
* Stops the embedded timeline service for the given base path. If a timeline service is managing multiple tables, it will only be shutdown once all tables have been stopped.
* @param basePath For the table to stop the service for
*/
public void stopForBasePath(String basePath) {
synchronized (SERVICE_LOCK) {
basePaths.remove(basePath);
if (basePaths.isEmpty()) {
RUNNING_SERVICES.remove(timelineServiceIdentifier);
}
}
if (this.server != null) {
this.server.unregisterBasePath(basePath);
}
// continue rest of shutdown outside of the synchronized block to avoid excess blocking
if (basePaths.isEmpty() && null != server) {
LOG.info("Closing Timeline server");
this.server.close();
METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, NUM_SERVERS_RUNNING.decrementAndGet());
this.server = null;
this.viewManager = null;
LOG.info("Closed Timeline server");
}
}
}

private static TimelineServiceIdentifier getTimelineServiceIdentifier(String hostAddr, HoodieWriteConfig writeConfig) {
return new TimelineServiceIdentifier(hostAddr, writeConfig.getMarkersType(), writeConfig.isMetadataTableEnabled(),
writeConfig.isEarlyConflictDetectionEnable(), writeConfig.isTimelineServerBasedInstantStateEnabled());
}

static class TimelineServiceIdentifier {
private final String hostAddr;
private final MarkerType markerType;
private final boolean isMetadataEnabled;
private final boolean isEarlyConflictDetectionEnable;
private final boolean isTimelineServerBasedInstantStateEnabled;

public TimelineServiceIdentifier(String hostAddr, MarkerType markerType, boolean isMetadataEnabled, boolean isEarlyConflictDetectionEnable,
boolean isTimelineServerBasedInstantStateEnabled) {
this.hostAddr = hostAddr;
this.markerType = markerType;
this.isMetadataEnabled = isMetadataEnabled;
this.isEarlyConflictDetectionEnable = isEarlyConflictDetectionEnable;
this.isTimelineServerBasedInstantStateEnabled = isTimelineServerBasedInstantStateEnabled;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TimelineServiceIdentifier)) {
return false;
}
TimelineServiceIdentifier that = (TimelineServiceIdentifier) o;
if (this.hostAddr != null && that.hostAddr != null) {
return isMetadataEnabled == that.isMetadataEnabled && isEarlyConflictDetectionEnable == that.isEarlyConflictDetectionEnable
&& isTimelineServerBasedInstantStateEnabled == that.isTimelineServerBasedInstantStateEnabled && hostAddr.equals(that.hostAddr) && markerType == that.markerType;
} else {
return (hostAddr == null && that.hostAddr == null);
}
}

@Override
public int hashCode() {
return Objects.hash(hostAddr, markerType, isMetadataEnabled, isEarlyConflictDetectionEnable, isTimelineServerBasedInstantStateEnabled);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,8 @@ public class HoodieWriteConfig extends HoodieConfig {
.key("hoodie.embed.timeline.server.reuse.enabled")
.defaultValue(false)
.markAdvanced()
.withDocumentation("Controls whether the timeline server instance should be cached and reused across the JVM (across task lifecycles)"
+ "to avoid startup costs. This should rarely be changed.");
.withDocumentation("Controls whether the timeline server instance should be cached and reused across the tables"
+ "to avoid startup costs and server overhead. This should only be used if you are running multiple writers in the same JVM.");

public static final ConfigProperty<String> EMBEDDED_TIMELINE_SERVER_PORT_NUM = ConfigProperty
.key("hoodie.embed.timeline.server.port")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
*/
public class TimelineServerBasedWriteMarkers extends WriteMarkers {
private static final Logger LOG = LoggerFactory.getLogger(TimelineServerBasedWriteMarkers.class);
private static final TypeReference<Boolean> BOOLEAN_TYPE_REFERENCE = new TypeReference<Boolean>() {};
private static final TypeReference<Set<String>> STRING_TYPE_REFERENCE = new TypeReference<Set<String>>() {};

private final HttpRequestClient httpRequestClient;

Expand All @@ -84,7 +86,7 @@ public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) {
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
return httpRequestClient.executeRequest(
DELETE_MARKER_DIR_URL, paramsMap, new TypeReference<Boolean>() {}, RequestMethod.POST);
DELETE_MARKER_DIR_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to delete marker directory " + markerDirPath.toString(), e);
}
Expand All @@ -95,7 +97,7 @@ public boolean doesMarkerDirExist() {
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
return httpRequestClient.executeRequest(
MARKERS_DIR_EXISTS_URL, paramsMap, new TypeReference<Boolean>() {}, RequestMethod.GET);
MARKERS_DIR_EXISTS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.GET);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to check marker directory " + markerDirPath.toString(), e);
}
Expand All @@ -106,7 +108,7 @@ public Set<String> createdAndMergedDataPaths(HoodieEngineContext context, int pa
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
Set<String> markerPaths = httpRequestClient.executeRequest(
CREATE_AND_MERGE_MARKERS_URL, paramsMap, new TypeReference<Set<String>>() {}, RequestMethod.GET);
CREATE_AND_MERGE_MARKERS_URL, paramsMap, STRING_TYPE_REFERENCE, RequestMethod.GET);
return markerPaths.stream().map(WriteMarkers::stripMarkerSuffix).collect(Collectors.toSet());
} catch (IOException e) {
throw new HoodieRemoteException("Failed to get CREATE and MERGE data file paths in "
Expand All @@ -119,7 +121,7 @@ public Set<String> allMarkerFilePaths() {
Map<String, String> paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
try {
return httpRequestClient.executeRequest(
ALL_MARKERS_URL, paramsMap, new TypeReference<Set<String>>() {}, RequestMethod.GET);
ALL_MARKERS_URL, paramsMap, STRING_TYPE_REFERENCE, RequestMethod.GET);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to get all markers in " + markerDirPath.toString(), e);
}
Expand Down Expand Up @@ -173,8 +175,7 @@ private boolean executeCreateMarkerRequest(Map<String, String> paramsMap, String
boolean success;
try {
success = httpRequestClient.executeRequest(
CREATE_MARKER_URL, paramsMap, new TypeReference<Boolean>() {
}, HttpRequestClient.RequestMethod.POST);
CREATE_MARKER_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, HttpRequestClient.RequestMethod.POST);
} catch (IOException e) {
throw new HoodieRemoteException("Failed to create marker file " + partitionPath + "/" + markerFileName, e);
}
Expand Down
Loading

0 comments on commit 3d21285

Please sign in to comment.