Skip to content

Commit

Permalink
[Improve][Zeta] Add an interface for batch retrieval of JobMetrics (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
ic4y committed Jun 25, 2023
1 parent 5ef4e1a commit c211235
Show file tree
Hide file tree
Showing 18 changed files with 710 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ public class ClientCommandArgs extends AbstractCommandArgs {
description = "Get job metrics by JobId")
private String metricsJobId;

@Parameter(
names = {"--get_running_job_metrics"},
description = "Gets metrics for running jobs")
private boolean getRunningJobMetrics = false;

@Parameter(
names = {"-l", "--list"},
description = "list job status")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public void execute() throws CommandExecuteException {
if (clientCommandArgs.isListJob()) {
String jobStatus = engineClient.getJobClient().listJobStatus(true);
System.out.println(jobStatus);
} else if (clientCommandArgs.isGetRunningJobMetrics()) {
String runningJobMetrics = engineClient.getJobClient().getRunningJobMetrics();
System.out.println(runningJobMetrics);
} else if (null != clientCommandArgs.getJobId()) {
String jobState =
engineClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetRunningJobMetricsCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;

Expand Down Expand Up @@ -117,6 +118,12 @@ public String getJobMetrics(Long jobId) {
SeaTunnelGetJobMetricsCodec::decodeResponse);
}

public String getRunningJobMetrics() {
return hazelcastClient.requestOnMasterAndDecodeResponse(
SeaTunnelGetRunningJobMetricsCodec.encodeRequest(),
SeaTunnelGetRunningJobMetricsCodec::decodeResponse);
}

public void savePointJob(Long jobId) {
PassiveCompletableFuture<Void> cancelFuture =
hazelcastClient.requestOnMasterAndGetCompletableFuture(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ public void testGetJobMetrics() {

String jobMetrics = jobClient.getJobMetrics(jobId);

System.out.println(jobMetrics);

Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT));
Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS));
Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT));
Expand All @@ -214,6 +216,58 @@ public void testGetJobMetrics() {
}
}

@Test
public void testGetRunningJobMetrics() throws ExecutionException, InterruptedException {
Common.setDeployMode(DeployMode.CLUSTER);
String filePath = TestUtils.getResource("/batch_fake_to_console.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("fake_to_console1");

SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
JobClient jobClient = seaTunnelClient.getJobClient();

ClientJobProxy execute1 =
seaTunnelClient.createExecutionContext(filePath, jobConfig).execute();
long jobId1 = execute1.getJobId();

execute1.waitForJobComplete();

filePath = TestUtils.getResource("streaming_fake_to_console.conf");
jobConfig = new JobConfig();
jobConfig.setName("fake_to_console2");
ClientJobProxy execute2 =
seaTunnelClient.createExecutionContext(filePath, jobConfig).execute();
ClientJobProxy execute3 =
seaTunnelClient.createExecutionContext(filePath, jobConfig).execute();

long jobId2 = execute2.getJobId();
long jobId3 = execute3.getJobId();

await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertTrue(
jobClient.getJobStatus(jobId1).equals("FINISHED")
&& jobClient.getJobStatus(jobId2).equals("RUNNING")
&& jobClient
.getJobStatus(jobId3)
.equals("RUNNING")));

System.out.println(jobClient.getRunningJobMetrics());

await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
String runningJobMetrics = jobClient.getRunningJobMetrics();
Assertions.assertTrue(
runningJobMetrics.contains(jobId2 + "")
&& runningJobMetrics.contains(jobId3 + ""));
});

jobClient.cancelJob(jobId2);
jobClient.cancelJob(jobId3);
}

@Test
public void testCancelJob() throws ExecutionException, InterruptedException {
Common.setDeployMode(DeployMode.CLIENT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
parallelism = 1
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

transform {
}

sink {
console {
source_table_name="fake"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.engine.core.protocol.codec;

import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.client.impl.protocol.Generated;
import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;

import static com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET;
import static com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;

/*
* This file is auto-generated by the Hazelcast Client Protocol Code Generator.
* To change this file, edit the templates or the protocol
* definitions on the https://github.com/hazelcast/hazelcast-client-protocol
* and regenerate it.
*/

/** */
@Generated("2a54110c40297eed90df5f79bde1171d")
public final class SeaTunnelGetRunningJobMetricsCodec {
// hex: 0xDE0C00
public static final int REQUEST_MESSAGE_TYPE = 14552064;
// hex: 0xDE0C01
public static final int RESPONSE_MESSAGE_TYPE = 14552065;
private static final int REQUEST_INITIAL_FRAME_SIZE =
PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
private static final int RESPONSE_INITIAL_FRAME_SIZE =
RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;

private SeaTunnelGetRunningJobMetricsCodec() {}

public static ClientMessage encodeRequest() {
ClientMessage clientMessage = ClientMessage.createForEncode();
clientMessage.setRetryable(true);
clientMessage.setOperationName("SeaTunnel.GetRunningJobMetrics");
ClientMessage.Frame initialFrame =
new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
clientMessage.add(initialFrame);
return clientMessage;
}

public static ClientMessage encodeResponse(java.lang.String response) {
ClientMessage clientMessage = ClientMessage.createForEncode();
ClientMessage.Frame initialFrame =
new ClientMessage.Frame(
new byte[RESPONSE_INITIAL_FRAME_SIZE], UNFRAGMENTED_MESSAGE);
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, RESPONSE_MESSAGE_TYPE);
clientMessage.add(initialFrame);

StringCodec.encode(clientMessage, response);
return clientMessage;
}

/** */
public static java.lang.String decodeResponse(ClientMessage clientMessage) {
ClientMessage.ForwardFrameIterator iterator = clientMessage.frameIterator();
// empty initial frame
iterator.next();
return StringCodec.decode(iterator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,22 @@ methods:
retryable: true
partitionIdentifier: -1
params: []
response:
params:
- name: response
type: String
nullable: false
since: 2.0
doc: ''

- id: 12
name: getRunningJobMetrics
since: 2.0
doc: ''
request:
retryable: true
partitionIdentifier: -1
params: [ ]
response:
params:
- name: response
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.seatunnel.engine.server;

import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
Expand All @@ -43,18 +45,24 @@
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
import org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hazelcast.cluster.Address;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.services.MembershipServiceEvent;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngineImpl;
import lombok.NonNull;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
Expand All @@ -65,6 +73,9 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID;
import static org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJobMetricsMap;

public class CoordinatorService {
private final NodeEngineImpl nodeEngine;
private final ILogger logger;
Expand Down Expand Up @@ -522,6 +533,73 @@ public JobMetrics getJobMetrics(long jobId) {
return jobMetricsImap != null ? jobMetricsImap.merge(jobMetrics) : jobMetrics;
}

public Map<Long, JobMetrics> getRunningJobMetrics() {
final Set<Long> runningJobIds = runningJobMasterMap.keySet();

Set<Address> addresses = new HashSet<>();
ownedSlotProfilesIMap.forEach(
(pipelineLocation, ownedSlotProfilesIMap) -> {
if (runningJobIds.contains(pipelineLocation.getJobId())) {
ownedSlotProfilesIMap
.values()
.forEach(
ownedSlotProfile -> {
addresses.add(ownedSlotProfile.getWorker());
});
}
});

List<RawJobMetrics> metrics = new ArrayList<>();

addresses.forEach(
address -> {
try {
if (nodeEngine.getClusterService().getMember(address) != null) {
RawJobMetrics rawJobMetrics =
(RawJobMetrics)
NodeEngineUtil.sendOperationToMemberNode(
nodeEngine,
new GetMetricsOperation(
dis ->
(dis.tagValue(JOB_ID)
!= null
&& runningJobIds
.contains(
Long
.parseLong(
dis
.tagValue(
JOB_ID))))),
address)
.get();
metrics.add(rawJobMetrics);
}
}
// HazelcastInstanceNotActiveException. It means that the node is
// offline, so waiting for the taskGroup to restore can be successful
catch (HazelcastInstanceNotActiveException e) {
logger.warning(
String.format(
"get metrics with exception: %s.",
ExceptionUtils.getMessage(e)));
} catch (Exception e) {
throw new SeaTunnelException(e.getMessage());
}
});

Map<Long, JobMetrics> longJobMetricsMap = toJobMetricsMap(metrics);

longJobMetricsMap.forEach(
(jobId, jobMetrics) -> {
JobMetrics jobMetricsImap = jobHistoryService.getJobMetrics(jobId);
if (jobMetricsImap != null) {
longJobMetricsMap.put(jobId, jobMetricsImap.merge(jobMetrics));
}
});

return longJobMetricsMap;
}

public JobDAGInfo getJobInfo(long jobId) {
JobDAGInfo jobInfo = jobHistoryService.getJobDAGInfo(jobId);
if (jobInfo != null) {
Expand Down
Loading

0 comments on commit c211235

Please sign in to comment.