Skip to content

Commit

Permalink
[Improve][Zeta] when job finished, the checkpoint won't write to file (
Browse files Browse the repository at this point in the history
  • Loading branch information
liunaijie authored Apr 26, 2024
1 parent 058f559 commit 7763541
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ public boolean notSchemaChangeCheckpoint() {
return !isSchemaChangeCheckpoint();
}

/** only batch job FINISHED will return true. other case all return false. */
public boolean notCompletedCheckpoint() {
return this != COMPLETED_POINT_TYPE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -767,14 +767,16 @@ public synchronized void completePendingCheckpoint(CompletedCheckpoint completed
final long checkpointId = completedCheckpoint.getCheckpointId();
completedCheckpointIds.addLast(String.valueOf(completedCheckpoint.getCheckpointId()));
try {
byte[] states = serializer.serialize(completedCheckpoint);
checkpointStorage.storeCheckPoint(
PipelineState.builder()
.checkpointId(checkpointId)
.jobId(String.valueOf(jobId))
.pipelineId(pipelineId)
.states(states)
.build());
if (completedCheckpoint.getCheckpointType().notCompletedCheckpoint()) {
byte[] states = serializer.serialize(completedCheckpoint);
checkpointStorage.storeCheckPoint(
PipelineState.builder()
.checkpointId(checkpointId)
.jobId(String.valueOf(jobId))
.pipelineId(pipelineId)
.states(states)
.build());
}
if (completedCheckpointIds.size()
% coordinatorConfig.getStorage().getMaxRetainedCheckpoints()
== 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,23 @@
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;

import com.hazelcast.config.Config;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngine;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;

@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractSeaTunnelServerTest<T extends AbstractSeaTunnelServerTest> {
Expand Down Expand Up @@ -83,6 +89,30 @@ public void before() {
LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
}

public SeaTunnelConfig loadSeaTunnelConfig() {
return ConfigProvider.locateAndGetSeaTunnelConfig();
}

protected void startJob(Long jobId, String path, boolean isStartWithSavePoint) {
LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobId.toString(), jobId);

JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
jobId,
"Test",
isStartWithSavePoint,
nodeEngine.getSerializationService().toData(testLogicalDag),
testLogicalDag.getJobConfig(),
Collections.emptyList(),
Collections.emptyList());

Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobId, data);
voidPassiveCompletableFuture.join();
}

@AfterAll
public void after() {
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.server.checkpoint;

import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.FactoryUtil;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import static org.awaitility.Awaitility.await;

@DisabledOnOs(OS.WINDOWS)
public class CheckpointStorageTest extends AbstractSeaTunnelServerTest {

public static String STREAM_CONF_PATH = "stream_fake_to_console_biginterval.conf";
public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf";

@Override
public SeaTunnelConfig loadSeaTunnelConfig() {
SeaTunnelConfig seaTunnelConfig = super.loadSeaTunnelConfig();
CheckpointConfig checkpointConfig = seaTunnelConfig.getEngineConfig().getCheckpointConfig();
// set a bigger interval in here and config file to avoid auto trigger checkpoint affect
// test result
checkpointConfig.setCheckpointInterval(Integer.MAX_VALUE);
seaTunnelConfig.getEngineConfig().setCheckpointConfig(checkpointConfig);
return seaTunnelConfig;
}

@Test
public void testGenerateFileWhenSavepoint()
throws CheckpointStorageException, InterruptedException {
long jobId = System.currentTimeMillis();
CheckpointConfig checkpointConfig =
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);

CheckpointStorage checkpointStorage =
FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(),
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
startJob(jobId, STREAM_CONF_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
server.getCoordinatorService()
.getJobStatus(jobId)
.equals(JobStatus.RUNNING)));
Thread.sleep(1000);
CompletableFuture<Void> future1 =
server.getCoordinatorService().getJobMaster(jobId).savePoint();
future1.join();
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
server.getCoordinatorService().getJobStatus(jobId),
JobStatus.SAVEPOINT_DONE));
List<PipelineState> savepoint1 = checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
Assertions.assertEquals(1, savepoint1.size());
}

@Test
public void testBatchJob() throws CheckpointStorageException {
long jobId = System.currentTimeMillis();
CheckpointConfig checkpointConfig =
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);

CheckpointStorage checkpointStorage =
FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(),
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
startJob(jobId, BATCH_CONF_PATH, false);
await().atMost(120000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
server.getCoordinatorService().getJobStatus(jobId),
JobStatus.FINISHED));
List<PipelineState> allCheckpoints =
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
Assertions.assertEquals(0, allCheckpoints.size());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,15 @@
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.common.exception.SavePointFailedException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.TestUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import com.hazelcast.internal.serialization.Data;

import java.util.Collections;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -221,24 +215,4 @@ public void savePointAndRestore(boolean needRestart) throws InterruptedException

Thread.sleep(1000);
}

private void startJob(Long jobId, String path, boolean isStartWithSavePoint) {
LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobId.toString(), jobId);

JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
jobId,
"Test",
isStartWithSavePoint,
nodeEngine.getSerializationService().toData(testLogicalDag),
testLogicalDag.getJobConfig(),
Collections.emptyList(),
Collections.emptyList());

Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobId, data);
voidPassiveCompletableFuture.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,18 @@
package org.apache.seatunnel.engine.server.master;

import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.CoordinatorService;
import org.apache.seatunnel.engine.server.TestUtils;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;

import com.hazelcast.internal.serialization.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

import static org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
Expand Down Expand Up @@ -160,24 +154,4 @@ public void testMetricsOnJobRestart() throws InterruptedException {
});
server.getCoordinatorService().cancelJob(jobId3);
}

private void startJob(Long jobId, String path, boolean isStartWithSavePoint) {
LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobId.toString(), jobId);

JobImmutableInformation jobImmutableInformation =
new JobImmutableInformation(
jobId,
"Test",
isStartWithSavePoint,
nodeEngine.getSerializationService().toData(testLogicalDag),
testLogicalDag.getJobConfig(),
Collections.emptyList(),
Collections.emptyList());

Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobId, data);
voidPassiveCompletableFuture.join();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in SeaTunnel config
######

env {
# You can set SeaTunnel environment configuration here
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 2147483640
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
parallelism = 2
result_table_name = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}

# If you would like to get more information about how to configure SeaTunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/category/source-v2
}

sink {
Console {
}

# If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/category/sink-v2
}

0 comments on commit 7763541

Please sign in to comment.