From 3d212853724f98080430dfb267a77f1ff9c9f6c4 Mon Sep 17 00:00:00 2001 From: Tim Brown Date: Wed, 22 Nov 2023 22:51:14 -0600 Subject: [PATCH] [HUDI-7112] Reuse existing timeline server and performance improvements (#10122) - Reuse timeline server across tables. --------- Co-authored-by: sivabalan --- .../apache/hudi/client/BaseHoodieClient.java | 2 +- .../EmbeddedTimelineServerHelper.java | 38 +--- .../embedded/EmbeddedTimelineService.java | 172 ++++++++++++++-- .../apache/hudi/config/HoodieWriteConfig.java | 4 +- .../TimelineServerBasedWriteMarkers.java | 13 +- .../apache/hudi/util/HttpRequestClient.java | 12 +- .../embedded/TestEmbeddedTimelineService.java | 189 ++++++++++++++++++ .../TestHoodieJavaWriteClientInsert.java | 6 +- .../client/TestHoodieClientMultiWriter.java | 35 +++- .../hudi/client/TestSparkRDDWriteClient.java | 6 +- ...RemoteFileSystemViewWithMetadataTable.java | 42 ++-- hudi-common/pom.xml | 4 + .../common/table/timeline/dto/DTOUtils.java | 4 +- .../view/RemoteHoodieTableFileSystemView.java | 70 ++++--- .../hudi/sink/TestWriteCopyOnWrite.java | 89 ++++++--- .../sink/TestWriteMergeOnReadWithCompact.java | 8 + .../apache/hudi/sink/utils/TestWriteBase.java | 6 +- .../apache/hudi/HoodieSparkSqlWriter.scala | 1 + .../hudi/timeline/service/RequestHandler.java | 5 +- .../timeline/service/TimelineService.java | 8 +- .../service/handlers/BaseFileHandler.java | 11 +- .../handlers/marker/MarkerDirState.java | 3 +- .../hudi/utilities/streamer/StreamSync.java | 2 +- .../TestHoodieDeltaStreamer.java | 1 - pom.xml | 8 + 25 files changed, 566 insertions(+), 173 deletions(-) create mode 100644 hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/embedded/TestEmbeddedTimelineService.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java index 533635ad774a..19faa8f4dbc0 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java @@ -120,7 +120,7 @@ private synchronized void stopEmbeddedServerView(boolean resetViewStorageConfig) if (timelineServer.isPresent() && shouldStopTimelineServer) { // Stop only if owner LOG.info("Stopping Timeline service !!"); - timelineServer.get().stop(); + timelineServer.get().stopForBasePath(basePath); } timelineServer = Option.empty(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java index b5f67fadec4c..47e1b9ee459f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineServerHelper.java @@ -23,9 +23,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.io.IOException; /** @@ -33,10 +30,6 @@ */ public class EmbeddedTimelineServerHelper { - private static final Logger LOG = LoggerFactory.getLogger(EmbeddedTimelineService.class); - - private static Option TIMELINE_SERVER = Option.empty(); - /** * Instantiate Embedded Timeline Server. * @param context Hoodie Engine Context @@ -44,45 +37,28 @@ public class EmbeddedTimelineServerHelper { * @return TimelineServer if configured to run * @throws IOException */ - public static synchronized Option createEmbeddedTimelineService( + public static Option createEmbeddedTimelineService( HoodieEngineContext context, HoodieWriteConfig config) throws IOException { - if (config.isEmbeddedTimelineServerReuseEnabled()) { - if (!TIMELINE_SERVER.isPresent() || !TIMELINE_SERVER.get().canReuseFor(config.getBasePath())) { - TIMELINE_SERVER = Option.of(startTimelineService(context, config)); - } else { - updateWriteConfigWithTimelineServer(TIMELINE_SERVER.get(), config); - } - return TIMELINE_SERVER; - } if (config.isEmbeddedTimelineServerEnabled()) { - return Option.of(startTimelineService(context, config)); + Option hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST); + EmbeddedTimelineService timelineService = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(context, hostAddr.orElse(null), config); + updateWriteConfigWithTimelineServer(timelineService, config); + return Option.of(timelineService); } else { return Option.empty(); } } - private static EmbeddedTimelineService startTimelineService( - HoodieEngineContext context, HoodieWriteConfig config) throws IOException { - // Run Embedded Timeline Server - LOG.info("Starting Timeline service !!"); - Option hostAddr = context.getProperty(EngineProperty.EMBEDDED_SERVER_HOST); - EmbeddedTimelineService timelineService = new EmbeddedTimelineService( - context, hostAddr.orElse(null), config); - timelineService.startServer(); - updateWriteConfigWithTimelineServer(timelineService, config); - return timelineService; - } - /** * Adjusts hoodie write config with timeline server settings. * @param timelineServer Embedded Timeline Server * @param config Hoodie Write Config */ public static void updateWriteConfigWithTimelineServer(EmbeddedTimelineService timelineServer, - HoodieWriteConfig config) { + HoodieWriteConfig config) { // Allow executor to find this newly instantiated timeline service if (config.isEmbeddedTimelineServerEnabled()) { config.setViewStorageConfig(timelineServer.getRemoteFileSystemViewConfig()); } } -} +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java index feda7d2b1859..5432e9b34efd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java @@ -21,6 +21,7 @@ import org.apache.hudi.common.config.SerializableConfiguration; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.metrics.Registry; import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; @@ -29,37 +30,109 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.timeline.service.TimelineService; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; /** * Timeline Service that runs as part of write client. */ public class EmbeddedTimelineService { + // lock used when starting/stopping/modifying embedded services + private static final Object SERVICE_LOCK = new Object(); private static final Logger LOG = LoggerFactory.getLogger(EmbeddedTimelineService.class); - + private static final AtomicInteger NUM_SERVERS_RUNNING = new AtomicInteger(0); + // Map of TimelineServiceIdentifier to existing timeline service running + private static final Map RUNNING_SERVICES = new HashMap<>(); + private static final Registry METRICS_REGISTRY = Registry.getRegistry("TimelineService"); + private static final String NUM_EMBEDDED_TIMELINE_SERVERS = "numEmbeddedTimelineServers"; private int serverPort; private String hostAddr; - private HoodieEngineContext context; + private final HoodieEngineContext context; private final SerializableConfiguration hadoopConf; private final HoodieWriteConfig writeConfig; - private final String basePath; + private TimelineService.Config serviceConfig; + private final TimelineServiceIdentifier timelineServiceIdentifier; + private final Set basePaths; // the set of base paths using this EmbeddedTimelineService private transient FileSystemViewManager viewManager; private transient TimelineService server; - public EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) { + private EmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig, + TimelineServiceIdentifier timelineServiceIdentifier) { setHostAddr(embeddedTimelineServiceHostAddr); this.context = context; this.writeConfig = writeConfig; - this.basePath = writeConfig.getBasePath(); + this.timelineServiceIdentifier = timelineServiceIdentifier; + this.basePaths = new HashSet<>(); + this.basePaths.add(writeConfig.getBasePath()); this.hadoopConf = context.getHadoopConf(); this.viewManager = createViewManager(); } + /** + * Returns an existing embedded timeline service if one is running for the given configuration and reuse is enabled, or starts a new one. + * @param context The {@link HoodieEngineContext} for the client + * @param embeddedTimelineServiceHostAddr The host address to use for the service (nullable) + * @param writeConfig The {@link HoodieWriteConfig} for the client + * @return A running {@link EmbeddedTimelineService} + * @throws IOException if an error occurs while starting the service + */ + public static EmbeddedTimelineService getOrStartEmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig) throws IOException { + return getOrStartEmbeddedTimelineService(context, embeddedTimelineServiceHostAddr, writeConfig, TimelineService::new); + } + + static EmbeddedTimelineService getOrStartEmbeddedTimelineService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig, + TimelineServiceCreator timelineServiceCreator) throws IOException { + TimelineServiceIdentifier timelineServiceIdentifier = getTimelineServiceIdentifier(embeddedTimelineServiceHostAddr, writeConfig); + // if reuse is enabled, check if any existing instances are compatible + if (writeConfig.isEmbeddedTimelineServerReuseEnabled()) { + synchronized (SERVICE_LOCK) { + if (RUNNING_SERVICES.containsKey(timelineServiceIdentifier)) { + RUNNING_SERVICES.get(timelineServiceIdentifier).addBasePath(writeConfig.getBasePath()); + LOG.info("Reusing existing embedded timeline server with configuration: " + RUNNING_SERVICES.get(timelineServiceIdentifier).serviceConfig); + return RUNNING_SERVICES.get(timelineServiceIdentifier); + } + // if no compatible instance is found, create a new one + EmbeddedTimelineService service = createAndStartService(context, embeddedTimelineServiceHostAddr, writeConfig, + timelineServiceCreator, timelineServiceIdentifier); + RUNNING_SERVICES.put(timelineServiceIdentifier, service); + return service; + } + } + // if not, create a new instance. If reuse is not enabled, there is no need to add it to RUNNING_SERVICES + return createAndStartService(context, embeddedTimelineServiceHostAddr, writeConfig, timelineServiceCreator, timelineServiceIdentifier); + } + + private static EmbeddedTimelineService createAndStartService(HoodieEngineContext context, String embeddedTimelineServiceHostAddr, HoodieWriteConfig writeConfig, + TimelineServiceCreator timelineServiceCreator, + TimelineServiceIdentifier timelineServiceIdentifier) throws IOException { + EmbeddedTimelineService service = new EmbeddedTimelineService(context, embeddedTimelineServiceHostAddr, writeConfig, timelineServiceIdentifier); + service.startServer(timelineServiceCreator); + METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, NUM_SERVERS_RUNNING.incrementAndGet()); + return service; + } + + public static void shutdownAllTimelineServers() { + RUNNING_SERVICES.entrySet().forEach(entry -> { + LOG.info("Closing Timeline server"); + entry.getValue().server.close(); + METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, NUM_SERVERS_RUNNING.decrementAndGet()); + LOG.info("Closed Timeline server"); + }); + RUNNING_SERVICES.clear(); + } + private FileSystemViewManager createViewManager() { // Using passed-in configs to build view storage configs FileSystemViewStorageConfig.Builder builder = @@ -73,7 +146,7 @@ private FileSystemViewManager createViewManager() { return FileSystemViewManager.createViewManagerWithTableMetadata(context, writeConfig.getMetadataConfig(), builder.build(), writeConfig.getCommonConfig()); } - public void startServer() throws IOException { + private void startServer(TimelineServiceCreator timelineServiceCreator) throws IOException { TimelineService.Config.Builder timelineServiceConfBuilder = TimelineService.Config.builder() .serverPort(writeConfig.getEmbeddedTimelineServerPort()) .numThreads(writeConfig.getEmbeddedTimelineServerThreads()) @@ -106,12 +179,20 @@ public void startServer() throws IOException { .enableInstantStateRequests(true); } - server = new TimelineService(context, hadoopConf.newCopy(), timelineServiceConfBuilder.build(), - FSUtils.getFs(basePath, hadoopConf.newCopy()), viewManager); + this.serviceConfig = timelineServiceConfBuilder.build(); + + server = timelineServiceCreator.create(context, hadoopConf.newCopy(), serviceConfig, + FSUtils.getFs(writeConfig.getBasePath(), hadoopConf.newCopy()), createViewManager()); serverPort = server.startService(); LOG.info("Started embedded timeline server at " + hostAddr + ":" + serverPort); } + @FunctionalInterface + interface TimelineServiceCreator { + TimelineService create(HoodieEngineContext context, Configuration hadoopConf, TimelineService.Config timelineServerConf, + FileSystem fileSystem, FileSystemViewManager globalFileSystemViewManager) throws IOException; + } + private void setHostAddr(String embeddedTimelineServiceHostAddr) { if (embeddedTimelineServiceHostAddr != null) { LOG.info("Overriding hostIp to (" + embeddedTimelineServiceHostAddr + ") found in spark-conf. It was " + this.hostAddr); @@ -146,19 +227,80 @@ public FileSystemViewManager getViewManager() { return viewManager; } - public boolean canReuseFor(String basePath) { - return this.server != null - && this.viewManager != null - && this.basePath.equals(basePath); + /** + * Adds a new base path to the set that are managed by this instance. + * @param basePath the new base path to add + */ + private void addBasePath(String basePath) { + basePaths.add(basePath); } - public void stop() { - if (null != server) { + /** + * Stops the embedded timeline service for the given base path. If a timeline service is managing multiple tables, it will only be shutdown once all tables have been stopped. + * @param basePath For the table to stop the service for + */ + public void stopForBasePath(String basePath) { + synchronized (SERVICE_LOCK) { + basePaths.remove(basePath); + if (basePaths.isEmpty()) { + RUNNING_SERVICES.remove(timelineServiceIdentifier); + } + } + if (this.server != null) { + this.server.unregisterBasePath(basePath); + } + // continue rest of shutdown outside of the synchronized block to avoid excess blocking + if (basePaths.isEmpty() && null != server) { LOG.info("Closing Timeline server"); this.server.close(); + METRICS_REGISTRY.set(NUM_EMBEDDED_TIMELINE_SERVERS, NUM_SERVERS_RUNNING.decrementAndGet()); this.server = null; this.viewManager = null; LOG.info("Closed Timeline server"); } } -} + + private static TimelineServiceIdentifier getTimelineServiceIdentifier(String hostAddr, HoodieWriteConfig writeConfig) { + return new TimelineServiceIdentifier(hostAddr, writeConfig.getMarkersType(), writeConfig.isMetadataTableEnabled(), + writeConfig.isEarlyConflictDetectionEnable(), writeConfig.isTimelineServerBasedInstantStateEnabled()); + } + + static class TimelineServiceIdentifier { + private final String hostAddr; + private final MarkerType markerType; + private final boolean isMetadataEnabled; + private final boolean isEarlyConflictDetectionEnable; + private final boolean isTimelineServerBasedInstantStateEnabled; + + public TimelineServiceIdentifier(String hostAddr, MarkerType markerType, boolean isMetadataEnabled, boolean isEarlyConflictDetectionEnable, + boolean isTimelineServerBasedInstantStateEnabled) { + this.hostAddr = hostAddr; + this.markerType = markerType; + this.isMetadataEnabled = isMetadataEnabled; + this.isEarlyConflictDetectionEnable = isEarlyConflictDetectionEnable; + this.isTimelineServerBasedInstantStateEnabled = isTimelineServerBasedInstantStateEnabled; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof TimelineServiceIdentifier)) { + return false; + } + TimelineServiceIdentifier that = (TimelineServiceIdentifier) o; + if (this.hostAddr != null && that.hostAddr != null) { + return isMetadataEnabled == that.isMetadataEnabled && isEarlyConflictDetectionEnable == that.isEarlyConflictDetectionEnable + && isTimelineServerBasedInstantStateEnabled == that.isTimelineServerBasedInstantStateEnabled && hostAddr.equals(that.hostAddr) && markerType == that.markerType; + } else { + return (hostAddr == null && that.hostAddr == null); + } + } + + @Override + public int hashCode() { + return Objects.hash(hostAddr, markerType, isMetadataEnabled, isEarlyConflictDetectionEnable, isTimelineServerBasedInstantStateEnabled); + } + } +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 6a36e5025bcc..51895751101c 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -476,8 +476,8 @@ public class HoodieWriteConfig extends HoodieConfig { .key("hoodie.embed.timeline.server.reuse.enabled") .defaultValue(false) .markAdvanced() - .withDocumentation("Controls whether the timeline server instance should be cached and reused across the JVM (across task lifecycles)" - + "to avoid startup costs. This should rarely be changed."); + .withDocumentation("Controls whether the timeline server instance should be cached and reused across the tables" + + "to avoid startup costs and server overhead. This should only be used if you are running multiple writers in the same JVM."); public static final ConfigProperty EMBEDDED_TIMELINE_SERVER_PORT_NUM = ConfigProperty .key("hoodie.embed.timeline.server.port") diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java index 2306763beb80..e6391673e7de 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java @@ -62,6 +62,8 @@ */ public class TimelineServerBasedWriteMarkers extends WriteMarkers { private static final Logger LOG = LoggerFactory.getLogger(TimelineServerBasedWriteMarkers.class); + private static final TypeReference BOOLEAN_TYPE_REFERENCE = new TypeReference() {}; + private static final TypeReference> STRING_TYPE_REFERENCE = new TypeReference>() {}; private final HttpRequestClient httpRequestClient; @@ -84,7 +86,7 @@ public boolean deleteMarkerDir(HoodieEngineContext context, int parallelism) { Map paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); try { return httpRequestClient.executeRequest( - DELETE_MARKER_DIR_URL, paramsMap, new TypeReference() {}, RequestMethod.POST); + DELETE_MARKER_DIR_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST); } catch (IOException e) { throw new HoodieRemoteException("Failed to delete marker directory " + markerDirPath.toString(), e); } @@ -95,7 +97,7 @@ public boolean doesMarkerDirExist() { Map paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); try { return httpRequestClient.executeRequest( - MARKERS_DIR_EXISTS_URL, paramsMap, new TypeReference() {}, RequestMethod.GET); + MARKERS_DIR_EXISTS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.GET); } catch (IOException e) { throw new HoodieRemoteException("Failed to check marker directory " + markerDirPath.toString(), e); } @@ -106,7 +108,7 @@ public Set createdAndMergedDataPaths(HoodieEngineContext context, int pa Map paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); try { Set markerPaths = httpRequestClient.executeRequest( - CREATE_AND_MERGE_MARKERS_URL, paramsMap, new TypeReference>() {}, RequestMethod.GET); + CREATE_AND_MERGE_MARKERS_URL, paramsMap, STRING_TYPE_REFERENCE, RequestMethod.GET); return markerPaths.stream().map(WriteMarkers::stripMarkerSuffix).collect(Collectors.toSet()); } catch (IOException e) { throw new HoodieRemoteException("Failed to get CREATE and MERGE data file paths in " @@ -119,7 +121,7 @@ public Set allMarkerFilePaths() { Map paramsMap = Collections.singletonMap(MARKER_DIR_PATH_PARAM, markerDirPath.toString()); try { return httpRequestClient.executeRequest( - ALL_MARKERS_URL, paramsMap, new TypeReference>() {}, RequestMethod.GET); + ALL_MARKERS_URL, paramsMap, STRING_TYPE_REFERENCE, RequestMethod.GET); } catch (IOException e) { throw new HoodieRemoteException("Failed to get all markers in " + markerDirPath.toString(), e); } @@ -173,8 +175,7 @@ private boolean executeCreateMarkerRequest(Map paramsMap, String boolean success; try { success = httpRequestClient.executeRequest( - CREATE_MARKER_URL, paramsMap, new TypeReference() { - }, HttpRequestClient.RequestMethod.POST); + CREATE_MARKER_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, HttpRequestClient.RequestMethod.POST); } catch (IOException e) { throw new HoodieRemoteException("Failed to create marker file " + partitionPath + "/" + markerFileName, e); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/HttpRequestClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/HttpRequestClient.java index 65131cc77428..8f758b935ceb 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/HttpRequestClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/util/HttpRequestClient.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import org.apache.http.client.fluent.Request; import org.apache.http.client.fluent.Response; import org.apache.http.client.utils.URIBuilder; @@ -37,7 +38,7 @@ */ public class HttpRequestClient { private static final Logger LOG = LoggerFactory.getLogger(HttpRequestClient.class); - private final ObjectMapper mapper; + private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); private final String serverHost; private final int serverPort; private final int timeoutSecs; @@ -51,7 +52,6 @@ public HttpRequestClient(HoodieWriteConfig writeConfig) { } public HttpRequestClient(String serverHost, int serverPort, int timeoutSecs, int maxRetry) { - this.mapper = new ObjectMapper(); this.serverHost = serverHost; this.serverPort = serverPort; this.timeoutSecs = timeoutSecs; @@ -59,7 +59,7 @@ public HttpRequestClient(String serverHost, int serverPort, int timeoutSecs, int } public T executeRequestWithRetry(String requestPath, Map queryParameters, - TypeReference reference, RequestMethod method) { + TypeReference reference, RequestMethod method) { int retry = maxRetry; while (--retry >= 0) { try { @@ -72,14 +72,14 @@ public T executeRequestWithRetry(String requestPath, Map que } public T executeRequest(String requestPath, Map queryParameters, - TypeReference reference, RequestMethod method) throws IOException { + TypeReference reference, RequestMethod method) throws IOException { URIBuilder builder = new URIBuilder().setHost(serverHost).setPort(serverPort).setPath(requestPath).setScheme("http"); queryParameters.forEach(builder::addParameter); String url = builder.toString(); - LOG.debug("Sending request : (" + url + ")"); + LOG.debug("Sending request : ( {} )", url); Response response; int timeout = this.timeoutSecs * 1000; // msec switch (method) { @@ -92,7 +92,7 @@ public T executeRequest(String requestPath, Map queryParamet break; } String content = response.returnContent().asString(); - return (T) mapper.readValue(content, reference); + return MAPPER.readValue(content, reference); } public enum RequestMethod { diff --git a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/embedded/TestEmbeddedTimelineService.java b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/embedded/TestEmbeddedTimelineService.java new file mode 100644 index 000000000000..f863316bc088 --- /dev/null +++ b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/embedded/TestEmbeddedTimelineService.java @@ -0,0 +1,189 @@ +package org.apache.hudi.client.embedded; +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.engine.HoodieLocalEngineContext; +import org.apache.hudi.common.testutils.HoodieCommonTestHarness; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.timeline.service.TimelineService; + +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * These tests are mainly focused on testing the creation and reuse of the embedded timeline server. + */ +public class TestEmbeddedTimelineService extends HoodieCommonTestHarness { + + @Test + public void embeddedTimelineServiceReused() throws Exception { + HoodieEngineContext engineContext = new HoodieLocalEngineContext(new Configuration()); + HoodieWriteConfig writeConfig1 = HoodieWriteConfig.newBuilder() + .withPath(tempDir.resolve("table1").toString()) + .withEmbeddedTimelineServerEnabled(true) + .withEmbeddedTimelineServerReuseEnabled(true) + .build(); + EmbeddedTimelineService.TimelineServiceCreator mockCreator = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class); + TimelineService mockService = Mockito.mock(TimelineService.class); + when(mockCreator.create(any(), any(), any(), any(), any())).thenReturn(mockService); + when(mockService.startService()).thenReturn(123); + EmbeddedTimelineService service1 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig1, mockCreator); + + HoodieWriteConfig writeConfig2 = HoodieWriteConfig.newBuilder() + .withPath(tempDir.resolve("table2").toString()) + .withEmbeddedTimelineServerEnabled(true) + .withEmbeddedTimelineServerReuseEnabled(true) + .build(); + EmbeddedTimelineService.TimelineServiceCreator mockCreator2 = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class); + // do not mock the create method since that should never be called + EmbeddedTimelineService service2 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig2, mockCreator2); + assertSame(service1, service2); + + // test shutdown happens after the last path is removed + service1.stopForBasePath(writeConfig2.getBasePath()); + verify(mockService, never()).close(); + verify(mockService, times(1)).unregisterBasePath(writeConfig2.getBasePath()); + + service2.stopForBasePath(writeConfig1.getBasePath()); + verify(mockService, times(1)).unregisterBasePath(writeConfig1.getBasePath()); + verify(mockService, times(1)).close(); + } + + @Test + public void embeddedTimelineServiceCreatedForDifferentMetadataConfig() throws Exception { + HoodieEngineContext engineContext = new HoodieLocalEngineContext(new Configuration()); + HoodieWriteConfig writeConfig1 = HoodieWriteConfig.newBuilder() + .withPath(tempDir.resolve("table1").toString()) + .withEmbeddedTimelineServerEnabled(true) + .withEmbeddedTimelineServerReuseEnabled(true) + .build(); + EmbeddedTimelineService.TimelineServiceCreator mockCreator = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class); + TimelineService mockService = Mockito.mock(TimelineService.class); + when(mockCreator.create(any(), any(), any(), any(), any())).thenReturn(mockService); + when(mockService.startService()).thenReturn(321); + EmbeddedTimelineService service1 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig1, mockCreator); + + HoodieWriteConfig writeConfig2 = HoodieWriteConfig.newBuilder() + .withPath(tempDir.resolve("table2").toString()) + .withEmbeddedTimelineServerEnabled(true) + .withEmbeddedTimelineServerReuseEnabled(true) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(false) + .build()) + .build(); + EmbeddedTimelineService.TimelineServiceCreator mockCreator2 = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class); + TimelineService mockService2 = Mockito.mock(TimelineService.class); + when(mockCreator2.create(any(), any(), any(), any(), any())).thenReturn(mockService2); + when(mockService2.startService()).thenReturn(456); + EmbeddedTimelineService service2 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig2, mockCreator2); + assertNotSame(service1, service2); + + // test shutdown happens immediately since each server has only one path associated with it + service1.stopForBasePath(writeConfig1.getBasePath()); + verify(mockService, times(1)).close(); + + service2.stopForBasePath(writeConfig2.getBasePath()); + verify(mockService2, times(1)).close(); + } + + @Test + public void embeddedTimelineServerNotReusedIfReuseDisabled() throws Exception { + HoodieEngineContext engineContext = new HoodieLocalEngineContext(new Configuration()); + HoodieWriteConfig writeConfig1 = HoodieWriteConfig.newBuilder() + .withPath(tempDir.resolve("table1").toString()) + .withEmbeddedTimelineServerEnabled(true) + .withEmbeddedTimelineServerReuseEnabled(true) + .build(); + EmbeddedTimelineService.TimelineServiceCreator mockCreator = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class); + TimelineService mockService = Mockito.mock(TimelineService.class); + when(mockCreator.create(any(), any(), any(), any(), any())).thenReturn(mockService); + when(mockService.startService()).thenReturn(789); + EmbeddedTimelineService service1 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig1, mockCreator); + + HoodieWriteConfig writeConfig2 = HoodieWriteConfig.newBuilder() + .withPath(tempDir.resolve("table2").toString()) + .withEmbeddedTimelineServerEnabled(true) + .withEmbeddedTimelineServerReuseEnabled(false) + .build(); + EmbeddedTimelineService.TimelineServiceCreator mockCreator2 = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class); + TimelineService mockService2 = Mockito.mock(TimelineService.class); + when(mockCreator2.create(any(), any(), any(), any(), any())).thenReturn(mockService2); + when(mockService2.startService()).thenReturn(987); + EmbeddedTimelineService service2 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig2, mockCreator2); + assertNotSame(service1, service2); + + // test shutdown happens immediately since each server has only one path associated with it + service1.stopForBasePath(writeConfig1.getBasePath()); + verify(mockService, times(1)).unregisterBasePath(writeConfig1.getBasePath()); + verify(mockService, times(1)).close(); + + service2.stopForBasePath(writeConfig2.getBasePath()); + verify(mockService2, times(1)).unregisterBasePath(writeConfig2.getBasePath()); + verify(mockService2, times(1)).close(); + } + + @Test + public void embeddedTimelineServerIsNotReusedAfterStopped() throws Exception { + HoodieEngineContext engineContext = new HoodieLocalEngineContext(new Configuration()); + HoodieWriteConfig writeConfig1 = HoodieWriteConfig.newBuilder() + .withPath(tempDir.resolve("table1").toString()) + .withEmbeddedTimelineServerEnabled(true) + .withEmbeddedTimelineServerReuseEnabled(true) + .build(); + EmbeddedTimelineService.TimelineServiceCreator mockCreator = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class); + TimelineService mockService = Mockito.mock(TimelineService.class); + when(mockCreator.create(any(), any(), any(), any(), any())).thenReturn(mockService); + when(mockService.startService()).thenReturn(555); + EmbeddedTimelineService service1 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig1, mockCreator); + + service1.stopForBasePath(writeConfig1.getBasePath()); + + HoodieWriteConfig writeConfig2 = HoodieWriteConfig.newBuilder() + .withPath(tempDir.resolve("table2").toString()) + .withEmbeddedTimelineServerEnabled(true) + .withEmbeddedTimelineServerReuseEnabled(true) + .build(); + EmbeddedTimelineService.TimelineServiceCreator mockCreator2 = Mockito.mock(EmbeddedTimelineService.TimelineServiceCreator.class); + TimelineService mockService2 = Mockito.mock(TimelineService.class); + when(mockCreator2.create(any(), any(), any(), any(), any())).thenReturn(mockService2); + when(mockService2.startService()).thenReturn(111); + EmbeddedTimelineService service2 = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(engineContext, null, writeConfig2, mockCreator2); + // a new service will be started since the original was shutdown already + assertNotSame(service1, service2); + + // test shutdown happens immediately since each server has only one path associated with it + service1.stopForBasePath(writeConfig1.getBasePath()); + verify(mockService, times(1)).unregisterBasePath(writeConfig1.getBasePath()); + verify(mockService, times(1)).close(); + + service2.stopForBasePath(writeConfig2.getBasePath()); + verify(mockService2, times(1)).unregisterBasePath(writeConfig2.getBasePath()); + verify(mockService2, times(1)).close(); + } +} \ No newline at end of file diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java index ea13939ad2e6..f6d984f83f8c 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestHoodieJavaWriteClientInsert.java @@ -116,9 +116,7 @@ public void testWriteClientAndTableServiceClientWithTimelineServer( HoodieJavaWriteClient writeClient; if (passInTimelineServer) { - EmbeddedTimelineService timelineService = - new EmbeddedTimelineService(context, null, writeConfig); - timelineService.startServer(); + EmbeddedTimelineService timelineService = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(context, null, writeConfig); writeConfig.setViewStorageConfig(timelineService.getRemoteFileSystemViewConfig()); writeClient = new HoodieJavaWriteClient(context, writeConfig, true, Option.of(timelineService)); // Both the write client and the table service client should use the same passed-in @@ -127,7 +125,7 @@ public void testWriteClientAndTableServiceClientWithTimelineServer( assertEquals(timelineService, writeClient.getTableServiceClient().getTimelineServer().get()); // Write config should not be changed assertEquals(writeConfig, writeClient.getConfig()); - timelineService.stop(); + timelineService.stopForBasePath(writeConfig.getBasePath()); } else { writeClient = new HoodieJavaWriteClient(context, writeConfig); // Only one timeline server should be instantiated, and the same timeline server diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java index 88ca30bd6e43..d0896645427e 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java @@ -164,6 +164,18 @@ private static Iterable providerClassResolutionStrategyAndTableType() return opts; } + @ParameterizedTest + @MethodSource("configParamsDirectBased") + public void testHoodieClientBasicMultiWriterWithEarlyConflictDetectionDirect(String tableType, String earlyConflictDetectionStrategy) throws Exception { + testHoodieClientBasicMultiWriterWithEarlyConflictDetection(tableType, MarkerType.DIRECT.name(), earlyConflictDetectionStrategy); + } + + @ParameterizedTest + @MethodSource("configParamsTimelineServerBased") + public void testHoodieClientBasicMultiWriterWithEarlyConflictDetectionTimelineServerBased(String tableType, String earlyConflictDetectionStrategy) throws Exception { + testHoodieClientBasicMultiWriterWithEarlyConflictDetection(tableType, MarkerType.TIMELINE_SERVER_BASED.name(), earlyConflictDetectionStrategy); + } + /** * Test multi-writers with early conflict detect enable, including * 1. MOR + Direct marker @@ -184,9 +196,7 @@ private static Iterable providerClassResolutionStrategyAndTableType() * @param markerType * @throws Exception */ - @ParameterizedTest - @MethodSource("configParams") - public void testHoodieClientBasicMultiWriterWithEarlyConflictDetection(String tableType, String markerType, String earlyConflictDetectionStrategy) throws Exception { + private void testHoodieClientBasicMultiWriterWithEarlyConflictDetection(String tableType, String markerType, String earlyConflictDetectionStrategy) throws Exception { if (tableType.equalsIgnoreCase(HoodieTableType.MERGE_ON_READ.name())) { setUpMORTestTable(); } @@ -955,14 +965,21 @@ private JavaRDD startCommitForUpdate(HoodieWriteConfig writeConfig, return result; } - public static Stream configParams() { + public static Stream configParamsTimelineServerBased() { + Object[][] data = + new Object[][] { + {"COPY_ON_WRITE", AsyncTimelineServerBasedDetectionStrategy.class.getName()}, + {"MERGE_ON_READ", AsyncTimelineServerBasedDetectionStrategy.class.getName()} + }; + return Stream.of(data).map(Arguments::of); + } + + public static Stream configParamsDirectBased() { Object[][] data = new Object[][] { - {"COPY_ON_WRITE", MarkerType.TIMELINE_SERVER_BASED.name(), AsyncTimelineServerBasedDetectionStrategy.class.getName()}, - {"MERGE_ON_READ", MarkerType.TIMELINE_SERVER_BASED.name(), AsyncTimelineServerBasedDetectionStrategy.class.getName()}, - {"MERGE_ON_READ", MarkerType.DIRECT.name(), SimpleDirectMarkerBasedDetectionStrategy.class.getName()}, - {"COPY_ON_WRITE", MarkerType.DIRECT.name(), SimpleDirectMarkerBasedDetectionStrategy.class.getName()}, - {"COPY_ON_WRITE", MarkerType.DIRECT.name(), SimpleTransactionDirectMarkerBasedDetectionStrategy.class.getName()} + {"MERGE_ON_READ", SimpleDirectMarkerBasedDetectionStrategy.class.getName()}, + {"COPY_ON_WRITE", SimpleDirectMarkerBasedDetectionStrategy.class.getName()}, + {"COPY_ON_WRITE", SimpleTransactionDirectMarkerBasedDetectionStrategy.class.getName()} }; return Stream.of(data).map(Arguments::of); } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java index 9cffce2b07bb..784c3a3b7844 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java @@ -84,9 +84,7 @@ public void testWriteClientAndTableServiceClientWithTimelineServer( SparkRDDWriteClient writeClient; if (passInTimelineServer) { - EmbeddedTimelineService timelineService = - new EmbeddedTimelineService(context(), null, writeConfig); - timelineService.startServer(); + EmbeddedTimelineService timelineService = EmbeddedTimelineService.getOrStartEmbeddedTimelineService(context(), null, writeConfig); writeConfig.setViewStorageConfig(timelineService.getRemoteFileSystemViewConfig()); writeClient = new SparkRDDWriteClient(context(), writeConfig, Option.of(timelineService)); // Both the write client and the table service client should use the same passed-in @@ -95,7 +93,7 @@ public void testWriteClientAndTableServiceClientWithTimelineServer( assertEquals(timelineService, writeClient.getTableServiceClient().getTimelineServer().get()); // Write config should not be changed assertEquals(writeConfig, writeClient.getConfig()); - timelineService.stop(); + timelineService.stopForBasePath(writeConfig.getBasePath()); } else { writeClient = new SparkRDDWriteClient(context(), writeConfig); // Only one timeline server should be instantiated, and the same timeline server diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java index 3b4a39c72c07..86cc078e9894 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestRemoteFileSystemViewWithMetadataTable.java @@ -53,7 +53,7 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.EnumSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +86,6 @@ public void setUp() throws Exception { initPath(); initSparkContexts(); initFileSystem(); - initTimelineService(); dataGen = new HoodieTestDataGenerator(0x1f86); } @@ -129,30 +128,46 @@ public void initTimelineService() { } } + private enum TestCase { + USE_EXISTING_TIMELINE_SERVER(true, false), + EMBEDDED_TIMELINE_SERVER_PER_TABLE(false, false), + SINGLE_EMBEDDED_TIMELINE_SERVER(false, true); + + private final boolean useExistingTimelineServer; + private final boolean reuseTimelineServer; + + TestCase(boolean useExistingTimelineServer, boolean reuseTimelineServer) { + this.useExistingTimelineServer = useExistingTimelineServer; + this.reuseTimelineServer = reuseTimelineServer; + } + } + @ParameterizedTest - @ValueSource(booleans = {true, false}) - public void testMORGetLatestFileSliceWithMetadataTable(boolean useExistingTimelineServer) throws IOException { + @EnumSource(value = TestCase.class) + public void testMORGetLatestFileSliceWithMetadataTable(TestCase testCase) throws IOException { + if (testCase.useExistingTimelineServer) { + initTimelineService(); + } // This test utilizes the `HoodieBackedTestDelayedTableMetadata` to make sure the // synced file system view is always served. // Create two tables to guarantee the timeline server can properly handle multiple base paths with metadata table enabled String basePathStr1 = initializeTable("dataset1"); String basePathStr2 = initializeTable("dataset2"); - try (SparkRDDWriteClient writeClient1 = createWriteClient(basePathStr1, "test_mor_table1", - useExistingTimelineServer ? Option.of(timelineService) : Option.empty()); - SparkRDDWriteClient writeClient2 = createWriteClient(basePathStr2, "test_mor_table2", - useExistingTimelineServer ? Option.of(timelineService) : Option.empty())) { + try (SparkRDDWriteClient writeClient1 = createWriteClient(basePathStr1, "test_mor_table1", testCase.reuseTimelineServer, + testCase.useExistingTimelineServer ? Option.of(timelineService) : Option.empty()); + SparkRDDWriteClient writeClient2 = createWriteClient(basePathStr2, "test_mor_table2", testCase.reuseTimelineServer, + testCase.useExistingTimelineServer ? Option.of(timelineService) : Option.empty())) { for (int i = 0; i < 3; i++) { writeToTable(i, writeClient1); } - for (int i = 0; i < 3; i++) { writeToTable(i, writeClient2); } - runAssertionsForBasePath(useExistingTimelineServer, basePathStr1, writeClient1); - runAssertionsForBasePath(useExistingTimelineServer, basePathStr2, writeClient2); + runAssertionsForBasePath(testCase.useExistingTimelineServer, basePathStr1, writeClient1); + runAssertionsForBasePath(testCase.useExistingTimelineServer, basePathStr2, writeClient2); } } @@ -229,7 +244,7 @@ protected HoodieTableType getTableType() { return HoodieTableType.MERGE_ON_READ; } - private SparkRDDWriteClient createWriteClient(String basePath, String tableName, Option timelineService) { + private SparkRDDWriteClient createWriteClient(String basePath, String tableName, boolean reuseTimelineServer, Option timelineService) { HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder() .withPath(basePath) .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA) @@ -247,6 +262,7 @@ private SparkRDDWriteClient createWriteClient(String basePath, String tableName, .withRemoteServerPort(timelineService.isPresent() ? timelineService.get().getServerPort() : REMOTE_PORT_NUM.defaultValue()) .build()) + .withEmbeddedTimelineServerReuseEnabled(reuseTimelineServer) .withAutoCommit(false) .forTable(tableName) .build(); @@ -302,4 +318,4 @@ public Boolean call() throws Exception { return result; } } -} +} \ No newline at end of file diff --git a/hudi-common/pom.xml b/hudi-common/pom.xml index b217d892eb61..79aed528079c 100644 --- a/hudi-common/pom.xml +++ b/hudi-common/pom.xml @@ -127,6 +127,10 @@ com.fasterxml.jackson.datatype jackson-datatype-jsr310 + + com.fasterxml.jackson.module + jackson-module-afterburner + diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DTOUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DTOUtils.java index ef5a88694876..4399860d6b4b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DTOUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/DTOUtils.java @@ -41,9 +41,9 @@ public static List fileGroupDTOsfromFileGroups(List fileGroupDTOS = new ArrayList<>(); + List fileGroupDTOS = new ArrayList<>(fileGroups.size()); fileGroupDTOS.add(FileGroupDTO.fromFileGroup(fileGroups.get(0), true)); - fileGroupDTOS.addAll(fileGroups.subList(1, fileGroups.size()).stream() + fileGroupDTOS.addAll(fileGroups.stream().skip(1) .map(fg -> FileGroupDTO.fromFileGroup(fg, false)).collect(Collectors.toList())); return fileGroupDTOS; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index f42f9bf2216c..b225e1b85b0b 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -43,6 +43,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import org.apache.http.Consts; import org.apache.http.client.fluent.Request; import org.apache.http.client.fluent.Response; @@ -136,13 +137,23 @@ public class RemoteHoodieTableFileSystemView implements SyncableFileSystemView, private static final Logger LOG = LoggerFactory.getLogger(RemoteHoodieTableFileSystemView.class); + private static final TypeReference> FILE_SLICE_DTOS_REFERENCE = new TypeReference>() {}; + private static final TypeReference> FILE_GROUP_DTOS_REFERENCE = new TypeReference>() {}; + private static final TypeReference BOOLEAN_TYPE_REFERENCE = new TypeReference() {}; + private static final TypeReference> COMPACTION_OP_DTOS_REFERENCE = new TypeReference>() {}; + private static final TypeReference> CLUSTERING_OP_DTOS_REFERENCE = new TypeReference>() {}; + private static final TypeReference> INSTANT_DTOS_REFERENCE = new TypeReference>() {}; + private static final TypeReference TIMELINE_DTO_REFERENCE = new TypeReference() {}; + private static final TypeReference> BASE_FILE_DTOS_REFERENCE = new TypeReference>() {}; + private static final TypeReference>> BASE_FILE_MAP_REFERENCE = new TypeReference>>() {}; + private static final TypeReference>> FILE_SLICE_MAP_REFERENCE = new TypeReference>>() {}; + private static final ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); private final String serverHost; private final int serverPort; private final String basePath; private final HoodieTableMetaClient metaClient; private HoodieTimeline timeline; - private final ObjectMapper mapper; private final int timeoutMs; private boolean closed = false; @@ -159,7 +170,6 @@ public RemoteHoodieTableFileSystemView(String server, int port, HoodieTableMetaC public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSystemViewStorageConfig viewConf) { this.basePath = metaClient.getBasePath(); - this.mapper = new ObjectMapper(); this.metaClient = metaClient; this.timeline = metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(); this.serverHost = viewConf.getRemoteViewServerHost(); @@ -175,7 +185,7 @@ public RemoteHoodieTableFileSystemView(HoodieTableMetaClient metaClient, FileSys } } - private T executeRequest(String requestPath, Map queryParameters, TypeReference reference, + private T executeRequest(String requestPath, Map queryParameters, TypeReference reference, RequestMethod method) throws IOException { ValidationUtils.checkArgument(!closed, "View already closed"); @@ -192,7 +202,7 @@ private T executeRequest(String requestPath, Map queryParame LOG.info("Sending request : (" + url + ")"); Response response = retryHelper != null ? retryHelper.start(() -> get(timeoutMs, url, method)) : get(timeoutMs, url, method); String content = response.returnContent().asString(Consts.UTF_8); - return (T) mapper.readValue(content, reference); + return MAPPER.readValue(content, reference); } private Map getParamsWithPartitionPath(String partitionPath) { @@ -250,7 +260,7 @@ public Stream getLatestBaseFiles() { private Stream getLatestBaseFilesFromParams(Map paramsMap, String requestPath) { try { List dataFiles = executeRequest(requestPath, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + BASE_FILE_DTOS_REFERENCE, RequestMethod.GET); return dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -273,8 +283,7 @@ public Map> getAllLatestBaseFilesBeforeOrOn(Strin Map> dataFileMap = executeRequest( ALL_LATEST_BASE_FILES_BEFORE_ON_INSTANT_URL, paramsMap, - new TypeReference>>() { - }, + BASE_FILE_MAP_REFERENCE, RequestMethod.GET); return dataFileMap.entrySet().stream().collect( Collectors.toMap( @@ -291,8 +300,7 @@ public Option getBaseFileOn(String partitionPath, String instant new String[] {INSTANT_PARAM, FILEID_PARAM}, new String[] {instantTime, fileId}); try { List dataFiles = executeRequest(LATEST_DATA_FILE_ON_INSTANT_URL, paramsMap, - new TypeReference>() { - }, RequestMethod.GET); + BASE_FILE_DTOS_REFERENCE, RequestMethod.GET); return Option.fromJavaOptional(dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile).findFirst()); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -317,7 +325,7 @@ public Stream getLatestFileSlices(String partitionPath) { Map paramsMap = getParamsWithPartitionPath(partitionPath); try { List dataFiles = executeRequest(LATEST_PARTITION_SLICES_URL, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); return dataFiles.stream().map(FileSliceDTO::toFileSlice); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -329,7 +337,7 @@ public Option getLatestFileSlice(String partitionPath, String fileId) Map paramsMap = getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId); try { List dataFiles = executeRequest(LATEST_PARTITION_SLICE_URL, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); return Option.fromJavaOptional(dataFiles.stream().map(FileSliceDTO::toFileSlice).findFirst()); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -341,7 +349,7 @@ public Stream getLatestUnCompactedFileSlices(String partitionPath) { Map paramsMap = getParamsWithPartitionPath(partitionPath); try { List dataFiles = executeRequest(LATEST_PARTITION_UNCOMPACTED_SLICES_URL, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); return dataFiles.stream().map(FileSliceDTO::toFileSlice); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -355,8 +363,7 @@ public Stream getLatestFileSlicesBeforeOrOn(String partitionPath, Str new String[] {MAX_INSTANT_PARAM, INCLUDE_FILES_IN_PENDING_COMPACTION_PARAM}, new String[] {maxCommitTime, String.valueOf(includeFileSlicesInPendingCompaction)}); try { - List dataFiles = executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + List dataFiles = executeRequest(LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); return dataFiles.stream().map(FileSliceDTO::toFileSlice); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -371,7 +378,7 @@ public Map> getAllLatestFileSlicesBeforeOrOn(String ma try { Map> fileSliceMap = executeRequest(ALL_LATEST_SLICES_BEFORE_ON_INSTANT_URL, paramsMap, - new TypeReference>>() {}, RequestMethod.GET); + FILE_SLICE_MAP_REFERENCE, RequestMethod.GET); return fileSliceMap.entrySet().stream().collect( Collectors.toMap( Map.Entry::getKey, @@ -386,7 +393,7 @@ public Stream getLatestMergedFileSlicesBeforeOrOn(String partitionPat Map paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxInstantTime); try { List dataFiles = executeRequest(LATEST_SLICES_MERGED_BEFORE_ON_INSTANT_URL, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); return dataFiles.stream().map(FileSliceDTO::toFileSlice); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -399,7 +406,7 @@ public Stream getLatestFileSliceInRange(List commitsToReturn) getParams(INSTANTS_PARAM, StringUtils.join(commitsToReturn.toArray(new String[0]), ",")); try { List dataFiles = executeRequest(LATEST_SLICES_RANGE_INSTANT_URL, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); return dataFiles.stream().map(FileSliceDTO::toFileSlice); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -411,7 +418,7 @@ public Stream getAllFileSlices(String partitionPath) { Map paramsMap = getParamsWithPartitionPath(partitionPath); try { List dataFiles = - executeRequest(ALL_SLICES_URL, paramsMap, new TypeReference>() {}, RequestMethod.GET); + executeRequest(ALL_SLICES_URL, paramsMap, FILE_SLICE_DTOS_REFERENCE, RequestMethod.GET); return dataFiles.stream().map(FileSliceDTO::toFileSlice); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -423,7 +430,7 @@ public Stream getAllFileGroups(String partitionPath) { Map paramsMap = getParamsWithPartitionPath(partitionPath); try { List fileGroups = executeRequest(ALL_FILEGROUPS_FOR_PARTITION_URL, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET); return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -435,7 +442,7 @@ public Stream getReplacedFileGroupsBeforeOrOn(String maxCommitT Map paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); try { List fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE_OR_ON, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET); return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -447,7 +454,7 @@ public Stream getReplacedFileGroupsBefore(String maxCommitTime, Map paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); try { List fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_BEFORE, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET); return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -459,7 +466,7 @@ public Stream getReplacedFileGroupsAfterOrOn(String minCommitTi Map paramsMap = getParamsWithAdditionalParam(partitionPath, MIN_INSTANT_PARAM, minCommitTime); try { List fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_AFTER_OR_ON, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET); return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -471,7 +478,7 @@ public Stream getAllReplacedFileGroups(String partitionPath) { Map paramsMap = getParamsWithPartitionPath(partitionPath); try { List fileGroups = executeRequest(ALL_REPLACED_FILEGROUPS_PARTITION, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + FILE_GROUP_DTOS_REFERENCE, RequestMethod.GET); return DTOUtils.fileGroupDTOsToFileGroups(fileGroups, metaClient); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -483,7 +490,7 @@ public boolean refresh() { try { // refresh the local timeline first. this.timeline = metaClient.reloadActiveTimeline().filterCompletedAndCompactionInstants(); - return executeRequest(REFRESH_TABLE, paramsMap, new TypeReference() {}, RequestMethod.POST); + return executeRequest(REFRESH_TABLE, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST); } catch (IOException e) { throw new HoodieRemoteException(e); } @@ -493,7 +500,7 @@ public boolean refresh() { public Void loadAllPartitions() { Map paramsMap = getParams(); try { - executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap, new TypeReference() {}, RequestMethod.POST); + executeRequest(LOAD_ALL_PARTITIONS_URL, paramsMap, BOOLEAN_TYPE_REFERENCE, RequestMethod.POST); return null; } catch (IOException e) { throw new HoodieRemoteException(e); @@ -505,7 +512,7 @@ public Stream> getPendingCompactionOperations( Map paramsMap = getParams(); try { List dtos = executeRequest(PENDING_COMPACTION_OPS, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + COMPACTION_OP_DTOS_REFERENCE, RequestMethod.GET); return dtos.stream().map(CompactionOpDTO::toCompactionOperation); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -517,7 +524,7 @@ public Stream> getPendingLogCompactionOperatio Map paramsMap = getParams(); try { List dtos = executeRequest(PENDING_LOG_COMPACTION_OPS, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + COMPACTION_OP_DTOS_REFERENCE, RequestMethod.GET); return dtos.stream().map(CompactionOpDTO::toCompactionOperation); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -529,7 +536,7 @@ public Stream> getFileGroupsInPendingClus Map paramsMap = getParams(); try { List dtos = executeRequest(PENDING_CLUSTERING_FILEGROUPS, paramsMap, - new TypeReference>() {}, RequestMethod.GET); + CLUSTERING_OP_DTOS_REFERENCE, RequestMethod.GET); return dtos.stream().map(ClusteringOpDTO::toClusteringOperation); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -551,7 +558,7 @@ public Option getLastInstant() { Map paramsMap = getParams(); try { List instants = - executeRequest(LAST_INSTANT, paramsMap, new TypeReference>() {}, RequestMethod.GET); + executeRequest(LAST_INSTANT, paramsMap, INSTANT_DTOS_REFERENCE, RequestMethod.GET); return Option.fromJavaOptional(instants.stream().map(InstantDTO::toInstant).findFirst()); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -563,7 +570,7 @@ public HoodieTimeline getTimeline() { Map paramsMap = getParams(); try { TimelineDTO timeline = - executeRequest(TIMELINE, paramsMap, new TypeReference() {}, RequestMethod.GET); + executeRequest(TIMELINE, paramsMap, TIMELINE_DTO_REFERENCE, RequestMethod.GET); return TimelineDTO.toTimeline(timeline, metaClient); } catch (IOException e) { throw new HoodieRemoteException(e); @@ -580,8 +587,7 @@ public Option getLatestBaseFile(String partitionPath, String fil Map paramsMap = getParamsWithAdditionalParam(partitionPath, FILEID_PARAM, fileId); try { List dataFiles = executeRequest(LATEST_PARTITION_DATA_FILE_URL, paramsMap, - new TypeReference>() { - }, RequestMethod.GET); + BASE_FILE_DTOS_REFERENCE, RequestMethod.GET); return Option.fromJavaOptional(dataFiles.stream().map(BaseFileDTO::toHoodieBaseFile).findFirst()); } catch (IOException e) { throw new HoodieRemoteException(e); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java index 9364528eb339..0f8651b4b05e 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteCopyOnWrite.java @@ -545,9 +545,11 @@ public void testWriteMultiWriterInvolved(WriteConcurrencyMode writeConcurrencyMo .checkpoint(1) .assertNextEvent() .checkpointComplete(1) - .checkWrittenData(EXPECTED3, 1); + .checkWrittenData(EXPECTED3, 1) + .end(); // step to commit the 2nd txn validateConcurrentCommit(pipeline1); + pipeline1.end(); } } @@ -587,43 +589,66 @@ public void testWriteMultiWriterPartialOverlapping(WriteConcurrencyMode writeCon if (OptionsResolver.isCowTable(conf) && OptionsResolver.isNonBlockingConcurrencyControl(conf)) { validateNonBlockingConcurrencyControlConditions(); } else { - TestHarness pipeline1 = preparePipeline(conf) - .consume(TestData.DATA_SET_INSERT_DUPLICATES) - .assertEmptyDataFiles(); - // now start pipeline2 and suspend the txn commit - Configuration conf2 = conf.clone(); - conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2"); - TestHarness pipeline2 = preparePipeline(conf2) - .consume(TestData.DATA_SET_INSERT_DUPLICATES) - .assertEmptyDataFiles(); - - // step to commit the 1st txn, should succeed - pipeline1.checkpoint(1) - .assertNextEvent() - .checkpointComplete(1) - .checkWrittenData(EXPECTED3, 1); - - // step to commit the 2nd txn - // should success for concurrent modification of same fileGroups if using non-blocking concurrency control - // should throw exception otherwise - validateConcurrentCommit(pipeline2); + TestHarness pipeline1 = null; + TestHarness pipeline2 = null; + try { + pipeline1 = preparePipeline(conf) + .consume(TestData.DATA_SET_INSERT_DUPLICATES) + .assertEmptyDataFiles(); + // now start pipeline2 and suspend the txn commit + Configuration conf2 = conf.clone(); + conf2.setString(FlinkOptions.WRITE_CLIENT_ID, "2"); + pipeline2 = preparePipeline(conf2) + .consume(TestData.DATA_SET_INSERT_DUPLICATES) + .assertEmptyDataFiles(); + + // step to commit the 1st txn, should succeed + pipeline1.checkpoint(1) + .assertNextEvent() + .checkpointComplete(1) + .checkWrittenData(EXPECTED3, 1); + + // step to commit the 2nd txn + // should success for concurrent modification of same fileGroups if using non-blocking concurrency control + // should throw exception otherwise + validateConcurrentCommit(pipeline2); + } finally { + if (pipeline1 != null) { + pipeline1.end(); + } + if (pipeline2 != null) { + pipeline2.end(); + } + } } } @Test public void testReuseEmbeddedServer() throws IOException { conf.setInteger("hoodie.filesystem.view.remote.timeout.secs", 500); - HoodieFlinkWriteClient writeClient = FlinkWriteClients.createWriteClient(conf); - FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig(); - - assertSame(viewStorageConfig.getStorageType(), FileSystemViewStorageType.REMOTE_FIRST); - - // get another write client - writeClient = FlinkWriteClients.createWriteClient(conf); - assertSame(writeClient.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST); - assertEquals(viewStorageConfig.getRemoteViewServerPort(), writeClient.getConfig().getViewStorageConfig().getRemoteViewServerPort()); - assertEquals(viewStorageConfig.getRemoteTimelineClientTimeoutSecs(), 500); - writeClient.close(); + conf.setString("hoodie.metadata.enable","true"); + HoodieFlinkWriteClient writeClient = null; + HoodieFlinkWriteClient writeClient2 = null; + + try { + writeClient = FlinkWriteClients.createWriteClient(conf); + FileSystemViewStorageConfig viewStorageConfig = writeClient.getConfig().getViewStorageConfig(); + + assertSame(viewStorageConfig.getStorageType(), FileSystemViewStorageType.REMOTE_FIRST); + + // get another write client + writeClient2 = FlinkWriteClients.createWriteClient(conf); + assertSame(writeClient2.getConfig().getViewStorageConfig().getStorageType(), FileSystemViewStorageType.REMOTE_FIRST); + assertEquals(viewStorageConfig.getRemoteViewServerPort(), writeClient2.getConfig().getViewStorageConfig().getRemoteViewServerPort()); + assertEquals(viewStorageConfig.getRemoteTimelineClientTimeoutSecs(), 500); + } finally { + if (writeClient != null) { + writeClient.close(); + } + if (writeClient2 != null) { + writeClient2.close(); + } + } } @Test diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java index e07118cb0521..d7460ecfb050 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnReadWithCompact.java @@ -159,6 +159,8 @@ public void testNonBlockingConcurrencyControlWithPartialUpdatePayload() throws E // because the data files belongs 3rd commit is not included in the last compaction. Map readOptimizedResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]"); TestData.checkWrittenData(tempFile, readOptimizedResult, 1); + pipeline1.end(); + pipeline2.end(); } @Test @@ -227,6 +229,8 @@ public void testNonBlockingConcurrencyControlWithInflightInstant() throws Except // the data files belongs 3rd commit is not included in the last compaction. Map readOptimizedResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,1,par1]"); TestData.checkWrittenData(tempFile, readOptimizedResult, 1); + pipeline1.end(); + pipeline2.end(); } // case1: txn1 is upsert writer, txn2 is bulk_insert writer. @@ -269,6 +273,8 @@ public void testBulkInsertWithNonBlockingConcurrencyControl() throws Exception { // step to commit the 2nd txn, should throw exception pipeline2.endInputThrows(HoodieWriteConflictException.class, "Cannot resolve conflicts"); + pipeline1.end(); + pipeline2.end(); } // case1: txn1 is upsert writer, txn2 is bulk_insert writer. @@ -342,6 +348,8 @@ public void testBulkInsertInSequenceWithNonBlockingConcurrencyControl() throws E // because the data files belongs 3rd commit is not included in the last compaction. Map readOptimizedResult = Collections.singletonMap("par1", "[id1,par1,id1,Danny,23,2,par1]"); TestData.checkWrittenData(tempFile, readOptimizedResult, 1); + pipeline1.end(); + pipeline2.end(); } @Override diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java index 4d9ff3226b06..65d3f176d6a3 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestWriteBase.java @@ -18,8 +18,8 @@ package org.apache.hudi.sink.utils; +import org.apache.hudi.client.HoodieFlinkWriteClient; import org.apache.hudi.client.WriteStatus; -import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecord; @@ -268,7 +268,8 @@ public TestHarness checkpoint(long checkpointId) throws Exception { * Stop the timeline server. */ public TestHarness stopTimelineServer() { - pipeline.getCoordinator().getWriteClient().getTimelineServer().ifPresent(EmbeddedTimelineService::stop); + HoodieFlinkWriteClient client = pipeline.getCoordinator().getWriteClient(); + client.getTimelineServer().ifPresent(embeddedTimelineService -> embeddedTimelineService.stopForBasePath(client.getConfig().getBasePath())); return this; } @@ -521,6 +522,7 @@ public TestHarness coordinatorFails() throws Exception { public void end() throws Exception { this.pipeline.close(); + this.pipeline = null; } private String lastPendingInstant() { diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index fda3156740d4..7f28e190e950 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -32,6 +32,7 @@ import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.AvroSchemaUtils.resolveNullableSchema import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.common.HoodieSparkEngineContext +import org.apache.hudi.client.embedded.EmbeddedTimelineService import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} import org.apache.hudi.commit.{DatasetBulkInsertCommitActionExecutor, DatasetBulkInsertOverwriteCommitActionExecutor, DatasetBulkInsertOverwriteTableCommitActionExecutor} import org.apache.hudi.common.config._ diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java index 345070fbe5c0..91adda4ee858 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/RequestHandler.java @@ -24,12 +24,12 @@ import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.dto.BaseFileDTO; -import org.apache.hudi.common.table.timeline.dto.InstantStateDTO; import org.apache.hudi.common.table.timeline.dto.ClusteringOpDTO; import org.apache.hudi.common.table.timeline.dto.CompactionOpDTO; import org.apache.hudi.common.table.timeline.dto.FileGroupDTO; import org.apache.hudi.common.table.timeline.dto.FileSliceDTO; import org.apache.hudi.common.table.timeline.dto.InstantDTO; +import org.apache.hudi.common.table.timeline.dto.InstantStateDTO; import org.apache.hudi.common.table.timeline.dto.TimelineDTO; import org.apache.hudi.common.table.view.FileSystemViewManager; import org.apache.hudi.common.table.view.RemoteHoodieTableFileSystemView; @@ -45,6 +45,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import io.javalin.Javalin; import io.javalin.http.BadRequestResponse; import io.javalin.http.Context; @@ -69,7 +70,7 @@ */ public class RequestHandler { - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); private static final Logger LOG = LoggerFactory.getLogger(RequestHandler.class); private final TimelineService.Config timelineServiceConfig; diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java index 790d476311ff..f215c26bc151 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java @@ -52,8 +52,8 @@ public class TimelineService { private static final int DEFAULT_NUM_THREADS = 250; private int serverPort; - private Config timelineServerConf; - private Configuration conf; + private final Config timelineServerConf; + private final Configuration conf; private transient HoodieEngineContext context; private transient FileSystem fs; private transient Javalin app = null; @@ -434,6 +434,10 @@ public void close() { LOG.info("Closed Timeline Service"); } + public void unregisterBasePath(String basePath) { + fsViewsManager.clearFileSystemView(basePath); + } + public Configuration getConf() { return conf; } diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java index a34b49843fac..5a5fa00b0de9 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/BaseFileHandler.java @@ -26,8 +26,7 @@ import org.apache.hadoop.fs.FileSystem; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -49,7 +48,7 @@ public List getLatestDataFiles(String basePath, String partitionPat public List getLatestDataFile(String basePath, String partitionPath, String fileId) { return viewManager.getFileSystemView(basePath).getLatestBaseFile(partitionPath, fileId) - .map(BaseFileDTO::fromHoodieBaseFile).map(Arrays::asList).orElse(new ArrayList<>()); + .map(BaseFileDTO::fromHoodieBaseFile).map(Collections::singletonList).orElse(Collections.emptyList()); } public List getLatestDataFiles(String basePath) { @@ -74,10 +73,8 @@ public Map> getAllLatestDataFilesBeforeOrOn(String bas public List getLatestDataFileOn(String basePath, String partitionPath, String instantTime, String fileId) { - List result = new ArrayList<>(); - viewManager.getFileSystemView(basePath).getBaseFileOn(partitionPath, instantTime, fileId) - .map(BaseFileDTO::fromHoodieBaseFile).ifPresent(result::add); - return result; + return viewManager.getFileSystemView(basePath).getBaseFileOn(partitionPath, instantTime, fileId) + .map(BaseFileDTO::fromHoodieBaseFile).map(Collections::singletonList).orElse(Collections.emptyList()); } public List getLatestDataFilesInRange(String basePath, List instants) { diff --git a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java index 9f8ed5d84cfe..05551dc42dde 100644 --- a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java +++ b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerDirState.java @@ -32,6 +32,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -64,7 +65,7 @@ */ public class MarkerDirState implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(MarkerDirState.class); - private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); // Marker directory private final String markerDirPath; private final FileSystem fileSystem; diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index fc0160f57e23..6074c4ec6fdc 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -1219,7 +1219,7 @@ public void close() { LOG.info("Shutting down embedded timeline server"); if (embeddedTimelineService.isPresent()) { - embeddedTimelineService.get().stop(); + embeddedTimelineService.get().stopForBasePath(cfg.targetBasePath); } if (metrics != null) { diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 67bf90a98538..f5304cce8082 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -531,7 +531,6 @@ public void testModifiedTableConfigs() throws Exception { List counts = countsPerCommit(tableBasePath, sqlContext); assertEquals(1000, counts.stream().mapToLong(entry -> entry.getLong(1)).sum()); - //perform the upsert and now with the original config, the commit should go through HoodieDeltaStreamer.Config newCfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.BULK_INSERT); newCfg.sourceLimit = 2000; diff --git a/pom.xml b/pom.xml index fe49188a2f82..973ddccb645e 100644 --- a/pom.xml +++ b/pom.xml @@ -471,6 +471,8 @@ org.apache.hbase.thirdparty:hbase-shaded-netty org.apache.hbase.thirdparty:hbase-shaded-protobuf org.apache.htrace:htrace-core4 + + com.fasterxml.jackson.module:jackson-module-afterburner @@ -872,6 +874,12 @@ jackson-module-scala_${scala.binary.version} ${fasterxml.jackson.module.scala.version} + + + com.fasterxml.jackson.module + jackson-module-afterburner + ${fasterxml.version} +