Skip to content

Commit

Permalink
addressing feedback comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nsivabalan committed Nov 21, 2023
1 parent 7ece54c commit 697114b
Showing 1 changed file with 61 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.hudi.client.embedded;

import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
Expand All @@ -40,7 +39,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -53,8 +52,8 @@ public class EmbeddedTimelineService {

private static final Logger LOG = LoggerFactory.getLogger(EmbeddedTimelineService.class);
private static final AtomicInteger NUM_SERVERS_RUNNING = new AtomicInteger(0);
// Map of port to existing timeline service running on that port
private static final Map<Integer, EmbeddedTimelineService> RUNNING_SERVICES = new HashMap<>();
// Map of TimelineServiceIdentifier to existing timeline service running
private static final Map<TimelineServiceIdentifier, EmbeddedTimelineService> RUNNING_SERVICES = new HashMap<>();
private static final Registry METRICS_REGISTRY = Registry.getRegistry("TimelineService");
private static final String NUM_EMBEDDED_TIMELINE_SERVERS = "numEmbeddedTimelineServers";
private int serverPort;
Expand All @@ -63,15 +62,18 @@ public class EmbeddedTimelineService {
private final SerializableConfiguration hadoopConf;
private final HoodieWriteConfig writeConfig;
private TimelineService.Config serviceConfig;
private final TimelineServiceIdentifier timelineServiceIdentifier;
private final Set<String> basePaths; // the set of base paths using this EmbeddedTimelineService

private transient FileSystemViewManager viewManager;
private transient TimelineService server;

private EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) {
private EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig,
TimelineServiceIdentifier timelineServiceIdentifier) {
setHostAddr(embeddedTimelineServiceHostAddr);
this.context = context;
this.writeConfig = writeConfig;
this.timelineServiceIdentifier = timelineServiceIdentifier;
this.basePaths = new HashSet<>();
this.basePaths.add(writeConfig.getBasePath());
this.hadoopConf = context.getHadoopConf();
Expand All @@ -92,29 +94,32 @@ public static EmbeddedTimelineService getOrStartEmbeddedTimelineService(HoodieEn

static EmbeddedTimelineService getOrStartEmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig,
TimelineServiceCreator timelineServiceCreator) throws IOException {
TimelineServiceIdentifier timelineServiceIdentifier = getTimelineServiceIdentifier(embeddedTimelineServiceHostAddr, writeConfig);
// if reuse is enabled, check if any existing instances are compatible
if (writeConfig.isEmbeddedTimelineServerReuseEnabled()) {
synchronized (SERVICE_LOCK) {
for (EmbeddedTimelineService service : RUNNING_SERVICES.values()) {
if (service.canReuseFor(writeConfig, embeddedTimelineServiceHostAddr)) {
service.addBasePath(writeConfig.getBasePath());
LOG.info("Reusing existing embedded timeline server with configuration: " + service.serviceConfig);
return service;
for (Map.Entry<TimelineServiceIdentifier, EmbeddedTimelineService> entry: RUNNING_SERVICES.entrySet()) {
if (entry.getKey().equals(timelineServiceIdentifier)) {
entry.getValue().addBasePath(writeConfig.getBasePath());
LOG.info("Reusing existing embedded timeline server with configuration: " + entry.getValue().serviceConfig);
return entry.getValue();
}
}
// if no compatible instance is found, create a new one
EmbeddedTimelineService service = createAndStartService(context, embeddedTimelineServiceHostAddr, writeConfig, timelineServiceCreator);
RUNNING_SERVICES.put(service.serverPort, service);
EmbeddedTimelineService service = createAndStartService(context, embeddedTimelineServiceHostAddr, writeConfig,
timelineServiceCreator, timelineServiceIdentifier);
RUNNING_SERVICES.put(timelineServiceIdentifier, service);
return service;
}
}
// if not, create a new instance. If reuse is not enabled, there is no need to add it to RUNNING_SERVICES
return createAndStartService(context, embeddedTimelineServiceHostAddr, writeConfig, timelineServiceCreator);
return createAndStartService(context, embeddedTimelineServiceHostAddr, writeConfig, timelineServiceCreator, timelineServiceIdentifier);
}

private static EmbeddedTimelineService createAndStartService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig,
TimelineServiceCreator timelineServiceCreator) throws IOException {
EmbeddedTimelineService service = new EmbeddedTimelineService(context, embeddedTimelineServiceHostAddr, writeConfig);
TimelineServiceCreator timelineServiceCreator,
TimelineServiceIdentifier timelineServiceIdentifier) throws IOException {
EmbeddedTimelineService service = new EmbeddedTimelineService(context, embeddedTimelineServiceHostAddr, writeConfig, timelineServiceIdentifier);
service.startServer(timelineServiceCreator);
METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, NUM_SERVERS_RUNNING.incrementAndGet());
return service;
Expand Down Expand Up @@ -222,35 +227,6 @@ private void addBasePath(String basePath) {
basePaths.add(basePath);
}

private boolean canReuseFor(HoodieWriteConfig newWriteConfig, String newHostAddr) {
if (server == null || viewManager == null) {
return false; // service is not running
}
if (basePaths.contains(newWriteConfig.getBasePath())) {
return true; // already running for this base path
}
if (newHostAddr != null && !newHostAddr.equals(this.hostAddr)) {
return false; // different host address
}
if (writeConfig.getMarkersType() != newWriteConfig.getMarkersType()) {
return false; // different marker type
}
return metadataConfigsAreEquivalent(writeConfig.getMetadataConfig().getProps(), newWriteConfig.getMetadataConfig().getProps());
}

private boolean metadataConfigsAreEquivalent(Properties properties1, Properties properties2) {
Set<Object> metadataConfigs = new HashSet<>(properties1.keySet());
metadataConfigs.addAll(properties2.keySet());
return metadataConfigs.stream()
.filter(key -> ((String) key).startsWith(HoodieMetadataConfig.METADATA_PREFIX))
.allMatch(key -> {
String value1 = properties1.getProperty((String) key, "");
String value2 = properties2.getProperty((String) key, "");
return value1.equals(value2);
});

}

/**
* Stops the embedded timeline service for the given base path. If a timeline service is managing multiple tables, it will only be shutdown once all tables have been stopped.
* @param basePath For the table to stop the service for
Expand All @@ -259,7 +235,7 @@ public void stopForBasePath(String basePath) {
synchronized (SERVICE_LOCK) {
basePaths.remove(basePath);
if (basePaths.isEmpty()) {
RUNNING_SERVICES.remove(serverPort);
RUNNING_SERVICES.remove(timelineServiceIdentifier);
}
}
if (this.server != null) {
Expand All @@ -275,4 +251,44 @@ public void stopForBasePath(String basePath) {
LOG.info("Closed Timeline server");
}
}

private static TimelineServiceIdentifier getTimelineServiceIdentifier(String hostAddr, HoodieWriteConfig writeConfig) {
return new TimelineServiceIdentifier(hostAddr, writeConfig.getMarkersType(), writeConfig.isMetadataTableEnabled(),
writeConfig.isEarlyConflictDetectionEnable(), writeConfig.isTimelineServerBasedInstantStateEnabled());
}

static class TimelineServiceIdentifier {
private final String hostAddr;
private final MarkerType markerType;
private final boolean isMetadataEnabled;
private final boolean isEarlyConflictDetectionEnable;
private final boolean isTimelineServerBasedInstantStateEnabled;

public TimelineServiceIdentifier(String hostAddr, MarkerType markerType, boolean isMetadataEnabled, boolean isEarlyConflictDetectionEnable,
boolean isTimelineServerBasedInstantStateEnabled) {
this.hostAddr = hostAddr;
this.markerType = markerType;
this.isMetadataEnabled = isMetadataEnabled;
this.isEarlyConflictDetectionEnable = isEarlyConflictDetectionEnable;
this.isTimelineServerBasedInstantStateEnabled = isTimelineServerBasedInstantStateEnabled;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof TimelineServiceIdentifier)) {
return false;
}
TimelineServiceIdentifier that = (TimelineServiceIdentifier) o;
return isMetadataEnabled == that.isMetadataEnabled && isEarlyConflictDetectionEnable == that.isEarlyConflictDetectionEnable
&& isTimelineServerBasedInstantStateEnabled == that.isTimelineServerBasedInstantStateEnabled && hostAddr.equals(that.hostAddr) && markerType == that.markerType;
}

@Override
public int hashCode() {
return Objects.hash(hostAddr, markerType, isMetadataEnabled, isEarlyConflictDetectionEnable, isTimelineServerBasedInstantStateEnabled);
}
}
}

0 comments on commit 697114b

Please sign in to comment.