From 84e2f108945d89bc89e8f1afdb10b28bd20b51a7 Mon Sep 17 00:00:00 2001 From: Adithya Chakilam Date: Mon, 7 Oct 2024 19:39:46 -0500 Subject: [PATCH] changes --- docs/operations/metrics.md | 2 - .../druid/indexing/common/task/Task.java | 10 ++++ .../SeekableStreamIndexTask.java | 7 +++ .../supervisor/SeekableStreamSupervisor.java | 32 ------------ .../SeekableStreamSupervisorStateTest.java | 50 ------------------- .../java/org/apache/druid/cli/CliPeon.java | 22 ++++---- 6 files changed, 28 insertions(+), 95 deletions(-) diff --git a/docs/operations/metrics.md b/docs/operations/metrics.md index 2ef5d4b79282..4df9e7987ccc 100644 --- a/docs/operations/metrics.md +++ b/docs/operations/metrics.md @@ -262,8 +262,6 @@ batch ingestion emit the following metrics. These metrics are deltas for each em |`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds| |`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.| |`task/autoScaler/requiredCount`|Count of required tasks based on the calculations of `lagBased` auto scaler.|`dataSource`, `stream`, `scalingSkipReason`|Depends on auto scaler config.| -|`task/supervisor/active/count`|Count of the active task groups for a supervisor.|`datasource`, `stream`|Depends on state of supervisor.| -|`task/supervisor/publishing/count`|Count of the publishing task groups for a supervisor.|`datasource`, `stream`|Depends on state of supervisor.| If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0. diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java index cacdc47f520a..1fadd4f6ae25 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java @@ -343,4 +343,14 @@ default BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec() { return BroadcastDatasourceLoadingSpec.createFromContext(getContext(), BroadcastDatasourceLoadingSpec.ALL); } + + /** + * specifies the current status of the task. + * + * @return string + */ + default String status() + { + return "UNKONWN"; + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java index 41cd084cd960..606ce6334923 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java @@ -311,4 +311,11 @@ public Appenderator getAppenderator() { return runnerSupplier.get(); } + + @Override + @VisibleForTesting + public String status() + { + return getRunner().getStatus().name(); + } } diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index e223bbf0535e..1da5b4fbb9cc 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -4362,12 +4362,6 @@ protected void scheduleReporting(ScheduledExecutorService reportingExec) spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(), TimeUnit.MILLISECONDS ); - reportingExec.scheduleAtFixedRate( - this::emitTaskCount, - ioConfig.getStartDelay().getMillis(), - spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(), - TimeUnit.MILLISECONDS - ); } /** @@ -4470,32 +4464,6 @@ protected void emitNoticesQueueSize() } } - public void emitTaskCount() - { - try { - ServiceMetricEvent.Builder eventBuilder = ServiceMetricEvent.builder() - .setDimension(DruidMetrics.DATASOURCE, dataSource) - .setDimensionIfNotNull( - DruidMetrics.TAGS, - spec.getContextValue(DruidMetrics.TAGS) - ) - .setDimension( - DruidMetrics.STREAM, - getIoConfig().getStream() - ); - emitter.emit(eventBuilder.setMetric("task/supervisor/active/count", activelyReadingTaskGroups.size())); - emitter.emit(eventBuilder.setMetric("task/supervisor/publishing/count", - pendingCompletionTaskGroups.values() - .stream() - .filter(list -> !list.isEmpty()) - .count() - )); - } - catch (Exception e) { - log.warn(e, "Unable to publish active/publisihing task count"); - } - } - protected void emitLag() { SupervisorStateManager.State basicState = stateManager.getSupervisorState().getBasicState(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 97a4b5905d57..91b5c30482ce 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1800,32 +1800,6 @@ public void testEmitNoLagWhenSuspended() throws Exception verifyAll(); } - @Test - public void testEmitTaskCounts() throws Exception - { - expectEmitterSupervisor(false); - - CountDownLatch latch = new CountDownLatch(1); - TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor( - latch, - TestEmittingTestSeekableStreamSupervisor.TASK_COUNTS, - ImmutableMap.of("1", 100L, "2", 250L, "3", 500L), - ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L) - ); - supervisor.start(); - - Assert.assertTrue(supervisor.stateManager.isHealthy()); - Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState()); - Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState()); - Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty()); - Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun()); - - latch.await(); - emitter.verifyEmitted("task/supervisor/active/count", 1); - emitter.verifyEmitted("task/supervisor/active/count", 1); - verifyAll(); - } - @Test public void testGetStats() { @@ -2518,16 +2492,9 @@ public void testScheduleReporting() { EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig(); - EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(config).times(3); ScheduledExecutorService executorService = EasyMock.createMock(ScheduledExecutorService.class); EasyMock.expect(executorService.scheduleWithFixedDelay(EasyMock.anyObject(), EasyMock.eq(86415000L), EasyMock.eq(300000L), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).once(); EasyMock.expect(executorService.scheduleAtFixedRate(EasyMock.anyObject(), EasyMock.eq(86425000L), EasyMock.eq(config.getEmissionDuration().getMillis()), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).times(2); - EasyMock.expect(executorService.scheduleAtFixedRate( - EasyMock.anyObject(), - EasyMock.eq(86400000L), - EasyMock.eq(config.getEmissionDuration().getMillis()), - EasyMock.eq(TimeUnit.MILLISECONDS) - )).andReturn(EasyMock.createMock(ScheduledFuture.class)).times(1); EasyMock.replay(executorService, spec); final BaseTestSeekableStreamSupervisor supervisor = new BaseTestSeekableStreamSupervisor() @@ -3027,7 +2994,6 @@ private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableS private static final byte LAG = 0x01; private static final byte NOTICE_QUEUE = 0x02; private static final byte NOTICE_PROCESS = 0x04; - private static final byte TASK_COUNTS = 0x08; TestEmittingTestSeekableStreamSupervisor( @@ -3089,16 +3055,6 @@ public void emitNoticeProcessTime(String noticeType, long timeInMillis) latch.countDown(); } - @Override - public void emitTaskCount() - { - if ((metricFlag & TASK_COUNTS) == 0) { - return; - } - super.emitTaskCount(); - latch.countDown(); - } - @Override public LagStats computeLagStats() { @@ -3121,12 +3077,6 @@ protected void scheduleReporting(ScheduledExecutorService reportingExec) spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(), TimeUnit.MILLISECONDS ); - reportingExec.scheduleAtFixedRate( - this::emitTaskCount, - ioConfig.getStartDelay().getMillis(), - spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(), - TimeUnit.MILLISECONDS - ); } } diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java b/services/src/main/java/org/apache/druid/cli/CliPeon.java index 15374625d301..cf0403f58e3d 100644 --- a/services/src/main/java/org/apache/druid/cli/CliPeon.java +++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java @@ -25,10 +25,8 @@ import com.github.rvesse.airline.annotations.Option; import com.github.rvesse.airline.annotations.restrictions.Required; import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.inject.Binder; import com.google.inject.Inject; @@ -141,6 +139,7 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Paths; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; @@ -303,19 +302,20 @@ public void configure(Binder binder) @Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING) public Supplier> heartbeatDimensions(Task task) { - ImmutableMap.Builder builder = ImmutableMap.builder(); - builder.put(DruidMetrics.TASK_ID, task.getId()); - builder.put(DruidMetrics.DATASOURCE, task.getDataSource()); - builder.put(DruidMetrics.TASK_TYPE, task.getType()); - builder.put(DruidMetrics.GROUP_ID, task.getGroupId()); + Map map = new HashMap<>(); + map.put(DruidMetrics.TASK_ID, task.getId()); + map.put(DruidMetrics.DATASOURCE, task.getDataSource()); + map.put(DruidMetrics.TASK_TYPE, task.getType()); + map.put(DruidMetrics.GROUP_ID, task.getGroupId()); Map tags = task.getContextValue(DruidMetrics.TAGS); if (tags != null && !tags.isEmpty()) { - builder.put(DruidMetrics.TAGS, tags); + map.put(DruidMetrics.TAGS, tags); } - return Suppliers.ofInstance( - builder.build() - ); + return () -> { + map.put(DruidMetrics.STATUS, task.status()); + return map; + }; } @Provides