Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
  • Loading branch information
adithyachakilam committed Oct 8, 2024
1 parent 38ea363 commit 84e2f10
Show file tree
Hide file tree
Showing 6 changed files with 28 additions and 95 deletions.
2 changes: 0 additions & 2 deletions docs/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,8 +262,6 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.|
|`task/autoScaler/requiredCount`|Count of required tasks based on the calculations of `lagBased` auto scaler.|`dataSource`, `stream`, `scalingSkipReason`|Depends on auto scaler config.|
|`task/supervisor/active/count`|Count of the active task groups for a supervisor.|`datasource`, `stream`|Depends on state of supervisor.|
|`task/supervisor/publishing/count`|Count of the publishing task groups for a supervisor.|`datasource`, `stream`|Depends on state of supervisor.|

If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,4 +343,14 @@ default BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
{
return BroadcastDatasourceLoadingSpec.createFromContext(getContext(), BroadcastDatasourceLoadingSpec.ALL);
}

/**
* specifies the current status of the task.
*
* @return string
*/
default String status()
{
return "UNKONWN";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,11 @@ public Appenderator getAppenderator()
{
return runnerSupplier.get();
}

@Override
@VisibleForTesting
public String status()
{
return getRunner().getStatus().name();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4362,12 +4362,6 @@ protected void scheduleReporting(ScheduledExecutorService reportingExec)
spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
this::emitTaskCount,
ioConfig.getStartDelay().getMillis(),
spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
}

/**
Expand Down Expand Up @@ -4470,32 +4464,6 @@ protected void emitNoticesQueueSize()
}
}

public void emitTaskCount()
{
try {
ServiceMetricEvent.Builder eventBuilder = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimensionIfNotNull(
DruidMetrics.TAGS,
spec.getContextValue(DruidMetrics.TAGS)
)
.setDimension(
DruidMetrics.STREAM,
getIoConfig().getStream()
);
emitter.emit(eventBuilder.setMetric("task/supervisor/active/count", activelyReadingTaskGroups.size()));
emitter.emit(eventBuilder.setMetric("task/supervisor/publishing/count",
pendingCompletionTaskGroups.values()
.stream()
.filter(list -> !list.isEmpty())
.count()
));
}
catch (Exception e) {
log.warn(e, "Unable to publish active/publisihing task count");
}
}

protected void emitLag()
{
SupervisorStateManager.State basicState = stateManager.getSupervisorState().getBasicState();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1800,32 +1800,6 @@ public void testEmitNoLagWhenSuspended() throws Exception
verifyAll();
}

@Test
public void testEmitTaskCounts() throws Exception
{
expectEmitterSupervisor(false);

CountDownLatch latch = new CountDownLatch(1);
TestEmittingTestSeekableStreamSupervisor supervisor = new TestEmittingTestSeekableStreamSupervisor(
latch,
TestEmittingTestSeekableStreamSupervisor.TASK_COUNTS,
ImmutableMap.of("1", 100L, "2", 250L, "3", 500L),
ImmutableMap.of("1", 10000L, "2", 15000L, "3", 20000L)
);
supervisor.start();

Assert.assertTrue(supervisor.stateManager.isHealthy());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState());
Assert.assertEquals(BasicState.PENDING, supervisor.stateManager.getSupervisorState().getBasicState());
Assert.assertTrue(supervisor.stateManager.getExceptionEvents().isEmpty());
Assert.assertFalse(supervisor.stateManager.isAtLeastOneSuccessfulRun());

latch.await();
emitter.verifyEmitted("task/supervisor/active/count", 1);
emitter.verifyEmitted("task/supervisor/active/count", 1);
verifyAll();
}

@Test
public void testGetStats()
{
Expand Down Expand Up @@ -2518,16 +2492,9 @@ public void testScheduleReporting()
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
DruidMonitorSchedulerConfig config = new DruidMonitorSchedulerConfig();
EasyMock.expect(spec.getMonitorSchedulerConfig()).andReturn(config).times(3);
ScheduledExecutorService executorService = EasyMock.createMock(ScheduledExecutorService.class);
EasyMock.expect(executorService.scheduleWithFixedDelay(EasyMock.anyObject(), EasyMock.eq(86415000L), EasyMock.eq(300000L), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).once();
EasyMock.expect(executorService.scheduleAtFixedRate(EasyMock.anyObject(), EasyMock.eq(86425000L), EasyMock.eq(config.getEmissionDuration().getMillis()), EasyMock.eq(TimeUnit.MILLISECONDS))).andReturn(EasyMock.createMock(ScheduledFuture.class)).times(2);
EasyMock.expect(executorService.scheduleAtFixedRate(
EasyMock.anyObject(),
EasyMock.eq(86400000L),
EasyMock.eq(config.getEmissionDuration().getMillis()),
EasyMock.eq(TimeUnit.MILLISECONDS)
)).andReturn(EasyMock.createMock(ScheduledFuture.class)).times(1);

EasyMock.replay(executorService, spec);
final BaseTestSeekableStreamSupervisor supervisor = new BaseTestSeekableStreamSupervisor()
Expand Down Expand Up @@ -3027,7 +2994,6 @@ private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableS
private static final byte LAG = 0x01;
private static final byte NOTICE_QUEUE = 0x02;
private static final byte NOTICE_PROCESS = 0x04;
private static final byte TASK_COUNTS = 0x08;


TestEmittingTestSeekableStreamSupervisor(
Expand Down Expand Up @@ -3089,16 +3055,6 @@ public void emitNoticeProcessTime(String noticeType, long timeInMillis)
latch.countDown();
}

@Override
public void emitTaskCount()
{
if ((metricFlag & TASK_COUNTS) == 0) {
return;
}
super.emitTaskCount();
latch.countDown();
}

@Override
public LagStats computeLagStats()
{
Expand All @@ -3121,12 +3077,6 @@ protected void scheduleReporting(ScheduledExecutorService reportingExec)
spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
reportingExec.scheduleAtFixedRate(
this::emitTaskCount,
ioConfig.getStartDelay().getMillis(),
spec.getMonitorSchedulerConfig().getEmissionDuration().getMillis(),
TimeUnit.MILLISECONDS
);
}
}

Expand Down
22 changes: 11 additions & 11 deletions services/src/main/java/org/apache/druid/cli/CliPeon.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
import com.github.rvesse.airline.annotations.Option;
import com.github.rvesse.airline.annotations.restrictions.Required;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Binder;
import com.google.inject.Inject;
Expand Down Expand Up @@ -141,6 +139,7 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -303,19 +302,20 @@ public void configure(Binder binder)
@Named(ServiceStatusMonitor.HEARTBEAT_TAGS_BINDING)
public Supplier<Map<String, Object>> heartbeatDimensions(Task task)
{
ImmutableMap.Builder<String, Object> builder = ImmutableMap.builder();
builder.put(DruidMetrics.TASK_ID, task.getId());
builder.put(DruidMetrics.DATASOURCE, task.getDataSource());
builder.put(DruidMetrics.TASK_TYPE, task.getType());
builder.put(DruidMetrics.GROUP_ID, task.getGroupId());
Map<String, Object> map = new HashMap<>();
map.put(DruidMetrics.TASK_ID, task.getId());
map.put(DruidMetrics.DATASOURCE, task.getDataSource());
map.put(DruidMetrics.TASK_TYPE, task.getType());
map.put(DruidMetrics.GROUP_ID, task.getGroupId());
Map<String, Object> tags = task.getContextValue(DruidMetrics.TAGS);
if (tags != null && !tags.isEmpty()) {
builder.put(DruidMetrics.TAGS, tags);
map.put(DruidMetrics.TAGS, tags);
}

return Suppliers.ofInstance(
builder.build()
);
return () -> {
map.put(DruidMetrics.STATUS, task.status());
return map;
};
}

@Provides
Expand Down

0 comments on commit 84e2f10

Please sign in to comment.