From 921819880e83aa805b4147b6e1723f5ee2f09876 Mon Sep 17 00:00:00 2001 From: jarvis Date: Thu, 25 Apr 2024 08:33:17 +0800 Subject: [PATCH] [Improve][Zeta] add complete checkpoint won't save file UT --- .../server/AbstractSeaTunnelServerTest.java | 30 +++++ .../checkpoint/CheckpointStorageTest.java | 118 ++++++++++++++++++ .../server/checkpoint/SavePointTest.java | 26 ---- .../engine/server/master/JobMetricsTest.java | 26 ---- .../stream_fake_to_console_biginterval.conf | 52 ++++++++ 5 files changed, 200 insertions(+), 52 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java create mode 100644 seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_biginterval.conf diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java index 5a0d288f4a9..2710c2cbd7e 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java @@ -20,6 +20,9 @@ import org.apache.seatunnel.common.utils.ExceptionUtils; import org.apache.seatunnel.engine.common.config.ConfigProvider; import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; +import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; +import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -27,10 +30,13 @@ import com.hazelcast.config.Config; import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import com.hazelcast.internal.serialization.Data; import com.hazelcast.logging.ILogger; import com.hazelcast.spi.impl.NodeEngine; import lombok.extern.slf4j.Slf4j; +import java.util.Collections; + @Slf4j @TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class AbstractSeaTunnelServerTest { @@ -83,6 +89,30 @@ public void before() { LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class); } + public SeaTunnelConfig loadSeaTunnelConfig() { + return ConfigProvider.locateAndGetSeaTunnelConfig(); + } + + protected void startJob(Long jobId, String path, boolean isStartWithSavePoint) { + LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobId.toString(), jobId); + + JobImmutableInformation jobImmutableInformation = + new JobImmutableInformation( + jobId, + "Test", + isStartWithSavePoint, + nodeEngine.getSerializationService().toData(testLogicalDag), + testLogicalDag.getJobConfig(), + Collections.emptyList(), + Collections.emptyList()); + + Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation); + + PassiveCompletableFuture voidPassiveCompletableFuture = + server.getCoordinatorService().submitJob(jobId, data); + voidPassiveCompletableFuture.join(); + } + @AfterAll public void after() { try { diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java new file mode 100644 index 00000000000..13d86d011a1 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.server.checkpoint; + +import org.apache.seatunnel.engine.checkpoint.storage.PipelineState; +import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage; +import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory; +import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.common.config.server.CheckpointConfig; +import org.apache.seatunnel.engine.common.utils.FactoryUtil; +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + +import static org.awaitility.Awaitility.await; + +@DisabledOnOs(OS.WINDOWS) +public class CheckpointStorageTest extends AbstractSeaTunnelServerTest { + + public static String STREAM_CONF_PATH = "stream_fake_to_console_biginterval.conf"; + public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf"; + + @Override + public SeaTunnelConfig loadSeaTunnelConfig() { + SeaTunnelConfig seaTunnelConfig = super.loadSeaTunnelConfig(); + CheckpointConfig checkpointConfig = seaTunnelConfig.getEngineConfig().getCheckpointConfig(); + // set a bigger interval in here and config file to avoid auto trigger checkpoint affect + // test result + checkpointConfig.setCheckpointInterval(Integer.MAX_VALUE); + seaTunnelConfig.getEngineConfig().setCheckpointConfig(checkpointConfig); + return seaTunnelConfig; + } + + @Test + public void testGenerateFileWhenSavepoint() + throws CheckpointStorageException, InterruptedException { + long jobId = System.currentTimeMillis(); + CheckpointConfig checkpointConfig = + server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig(); + server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig); + + CheckpointStorage checkpointStorage = + FactoryUtil.discoverFactory( + Thread.currentThread().getContextClassLoader(), + CheckpointStorageFactory.class, + checkpointConfig.getStorage().getStorage()) + .create(checkpointConfig.getStorage().getStoragePluginConfig()); + startJob(jobId, STREAM_CONF_PATH, false); + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertTrue( + server.getCoordinatorService() + .getJobStatus(jobId) + .equals(JobStatus.RUNNING))); + Thread.sleep(1000); + CompletableFuture future1 = + server.getCoordinatorService().getJobMaster(jobId).savePoint(); + future1.join(); + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + server.getCoordinatorService().getJobStatus(jobId), + JobStatus.SAVEPOINT_DONE)); + List savepoint1 = checkpointStorage.getAllCheckpoints(String.valueOf(jobId)); + Assertions.assertEquals(1, savepoint1.size()); + } + + @Test + public void testBatchJob() throws CheckpointStorageException { + long jobId = System.currentTimeMillis(); + CheckpointConfig checkpointConfig = + server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig(); + server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig); + + CheckpointStorage checkpointStorage = + FactoryUtil.discoverFactory( + Thread.currentThread().getContextClassLoader(), + CheckpointStorageFactory.class, + checkpointConfig.getStorage().getStorage()) + .create(checkpointConfig.getStorage().getStoragePluginConfig()); + startJob(jobId, BATCH_CONF_PATH, false); + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + server.getCoordinatorService().getJobStatus(jobId), + JobStatus.FINISHED)); + List allCheckpoints = + checkpointStorage.getAllCheckpoints(String.valueOf(jobId)); + Assertions.assertEquals(0, allCheckpoints.size()); + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java index 8b2ca9ae358..c062a95941d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java @@ -20,11 +20,8 @@ import org.apache.seatunnel.common.utils.FileUtils; import org.apache.seatunnel.engine.common.exception.SavePointFailedException; import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; -import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; -import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; -import org.apache.seatunnel.engine.server.TestUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Disabled; @@ -32,9 +29,6 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; -import com.hazelcast.internal.serialization.Data; - -import java.util.Collections; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; @@ -221,24 +215,4 @@ public void savePointAndRestore(boolean needRestart) throws InterruptedException Thread.sleep(1000); } - - private void startJob(Long jobId, String path, boolean isStartWithSavePoint) { - LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobId.toString(), jobId); - - JobImmutableInformation jobImmutableInformation = - new JobImmutableInformation( - jobId, - "Test", - isStartWithSavePoint, - nodeEngine.getSerializationService().toData(testLogicalDag), - testLogicalDag.getJobConfig(), - Collections.emptyList(), - Collections.emptyList()); - - Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation); - - PassiveCompletableFuture voidPassiveCompletableFuture = - server.getCoordinatorService().submitJob(jobId, data); - voidPassiveCompletableFuture.join(); - } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java index ed12a565d71..0e6202a0a69 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java @@ -18,13 +18,9 @@ package org.apache.seatunnel.engine.server.master; import org.apache.seatunnel.api.common.metrics.JobMetrics; -import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture; -import org.apache.seatunnel.engine.core.dag.logical.LogicalDag; -import org.apache.seatunnel.engine.core.job.JobImmutableInformation; import org.apache.seatunnel.engine.core.job.JobStatus; import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest; import org.apache.seatunnel.engine.server.CoordinatorService; -import org.apache.seatunnel.engine.server.TestUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -32,10 +28,8 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import org.junit.jupiter.api.condition.OS; -import com.hazelcast.internal.serialization.Data; import lombok.extern.slf4j.Slf4j; -import java.util.Collections; import java.util.concurrent.TimeUnit; import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT; @@ -160,24 +154,4 @@ public void testMetricsOnJobRestart() throws InterruptedException { }); server.getCoordinatorService().cancelJob(jobId3); } - - private void startJob(Long jobId, String path, boolean isStartWithSavePoint) { - LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobId.toString(), jobId); - - JobImmutableInformation jobImmutableInformation = - new JobImmutableInformation( - jobId, - "Test", - isStartWithSavePoint, - nodeEngine.getSerializationService().toData(testLogicalDag), - testLogicalDag.getJobConfig(), - Collections.emptyList(), - Collections.emptyList()); - - Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation); - - PassiveCompletableFuture voidPassiveCompletableFuture = - server.getCoordinatorService().submitJob(jobId, data); - voidPassiveCompletableFuture.join(); - } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_biginterval.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_biginterval.conf new file mode 100644 index 00000000000..73d79fa6609 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_biginterval.conf @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in SeaTunnel config +###### + +env { + # You can set SeaTunnel environment configuration here + parallelism = 2 + job.mode = "STREAMING" + checkpoint.interval = 2147483640 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 2 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + name = "string" + age = "int" + } + } + } + + # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + +sink { + Console { + } + + # If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +}