Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new compaction config and status APIs served by the Overlord #17834

Open
wants to merge 19 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions docs/api-reference/automatic-compaction-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ A successful request returns an HTTP `200 OK` message code and an empty response

### Update capacity for compaction tasks

:::info
This API is now deprecated. Use [Update cluster-level compaction config](#update-cluster-level-compaction-config) instead.
:::

Updates the capacity for compaction tasks. The minimum number of compaction tasks is 1 and the maximum is 2147483647.

Note that while the max compaction tasks can theoretically be set to 2147483647, the practical limit is determined by the available cluster capacity and is capped at 10% of the cluster's total capacity.
Expand Down Expand Up @@ -356,6 +360,69 @@ A successful request returns an HTTP `200 OK` message code and an empty response

## View automatic compaction configuration

### Get cluster-level compaction config

Retrieves cluster-level configuration for compaction tasks which applies to all datasources, unless explicitly overridden in the datasource compaction config.
This includes all the fields listed in [Update cluster-level compaction config](#update-cluster-level-compaction-config).

#### URL

`GET` `/druid/coordinator/v1/config/compaction/cluster`

#### Responses

<Tabs>

<TabItem value="8" label="200 SUCCESS">

*Successfully retrieved cluster compaction configuration*

</TabItem>
</Tabs>

---

#### Sample request

<Tabs>

<TabItem value="10" label="cURL">

```shell
curl "http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/config/compaction/cluster"
```
</TabItem>

<TabItem value="11" label="HTTP">

```HTTP
GET /druid/coordinator/v1/config/compaction/cluster HTTP/1.1
Host: http://ROUTER_IP:ROUTER_PORT
```

</TabItem>
</Tabs>

#### Sample response

<details>
<summary>View the response</summary>

```json
{
"compactionTaskSlotRatio": 0.5,
"maxCompactionTaskSlots": 1500,
"compactionPolicy": {
"type": "newestSegmentFirst",
"priorityDatasource": "wikipedia"
},
"useSupervisors": true,
"engine": "msq"
}
```

</details>

### Get all automatic compaction configurations

Retrieves all automatic compaction configurations. Returns a `compactionConfigs` object containing the active automatic compaction configurations of all datasources.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,46 @@
*/
public interface CompactionScheduler
{
void start();
void becomeLeader();

void stop();
void stopBeingLeader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see that the only case the scheduler stops is if it becomes leader, but would it be better to leave it as the general start() and stop()`?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I follow. Could you please elaborate?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stop() and start() seemed to better names from my understanding. stopBeingLeader() seems to be tying in the reason for why the scheduler is stopping, but that doesn't need to be part of the interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the problem is that becomeLeader() and stopBeingLeader() don't exactly start or stop the scheduler anymore. They just notify the scheduler that something has changed i.e. leadership.
In fact, stopBeingLeader() just sets a flag and returns. The scheduled job handles the change and stops scheduling further jobs.

Also, there are now two separate methods start() and stop() which are associated with lifecycle start and stop.


/**
* @return true if the scheduler is enabled i.e. when
* {@link DruidCompactionConfig#isUseSupervisors()} is true.
*/
boolean isEnabled();

/**
* @return true if the scheduler is currently running and submitting compaction
* tasks.
*/
boolean isRunning();

CompactionConfigValidationResult validateCompactionConfig(DataSourceCompactionConfig compactionConfig);

/**
* Starts compaction for a datasource if not already running.
*/
void startCompaction(String dataSourceName, DataSourceCompactionConfig compactionConfig);

/**
* Stops compaction for a datasource if currently running.
*/
void stopCompaction(String dataSourceName);

Map<String, AutoCompactionSnapshot> getAllCompactionSnapshots();

/**
* @return Non-null snapshot of the current status of compaction for the datasource.
*/
AutoCompactionSnapshot getCompactionSnapshot(String dataSource);

/**
* Simulates a compaction run with the given cluster config.
*
* @return Result of the simulation
*/
CompactionSimulateResult simulateRunWithConfigUpdate(ClusterCompactionConfig updateRequest);

}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public SupervisorReport<AutoCompactionSnapshot> getStatus()
final AutoCompactionSnapshot snapshot;
if (supervisorSpec.isSuspended()) {
snapshot = AutoCompactionSnapshot.builder(dataSource)
.withStatus(AutoCompactionSnapshot.AutoCompactionScheduleStatus.NOT_ENABLED)
.withStatus(AutoCompactionSnapshot.ScheduleStatus.NOT_ENABLED)
.build();
} else if (!supervisorSpec.getValidationResult().isValid()) {
snapshot = AutoCompactionSnapshot.builder(dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

public class CompactionSupervisorSpec implements SupervisorSpec
{
Expand All @@ -41,6 +42,11 @@ public class CompactionSupervisorSpec implements SupervisorSpec
private final CompactionScheduler scheduler;
private final CompactionConfigValidationResult validationResult;

public static String getSupervisorIdForDatasource(String dataSource)
{
return ID_PREFIX + dataSource;
}

@JsonCreator
public CompactionSupervisorSpec(
@JsonProperty("spec") DataSourceCompactionConfig spec,
Expand Down Expand Up @@ -70,7 +76,7 @@ public boolean isSuspended()
@Override
public String getId()
{
return ID_PREFIX + spec.getDataSource();
return getSupervisorIdForDatasource(spec.getDataSource());
}

public CompactionConfigValidationResult getValidationResult()
Expand Down Expand Up @@ -113,4 +119,23 @@ public String getSource()
{
return "";
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
CompactionSupervisorSpec that = (CompactionSupervisorSpec) o;
return suspended == that.suspended && Objects.equals(spec, that.spec);
}

@Override
public int hashCode()
{
return Objects.hash(suspended, spec);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.apache.druid.java.util.common.Stopwatch;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
Expand Down Expand Up @@ -103,6 +105,7 @@ public class OverlordCompactionScheduler implements CompactionScheduler
*/
private final TaskRunnerListener taskRunnerListener;

private final AtomicBoolean isLeader = new AtomicBoolean(false);
private final AtomicBoolean started = new AtomicBoolean(false);
private final CompactSegments duty;

Expand Down Expand Up @@ -165,29 +168,36 @@ public void statusChanged(String taskId, TaskStatus status)
};
}

@LifecycleStart
public synchronized void start()
{
// Do nothing
}

@LifecycleStop
public synchronized void stop()
{
executor.shutdownNow();
}

@Override
public void start()
public void becomeLeader()
{
if (isEnabled() && started.compareAndSet(false, true)) {
log.info("Starting compaction scheduler.");
initState();
scheduleOnExecutor(this::scheduledRun);
if (isLeader.compareAndSet(false, true)) {
scheduleOnExecutor(this::scheduledRun, SCHEDULE_PERIOD_SECONDS);
}
}

@Override
public void stop()
public void stopBeingLeader()
{
if (isEnabled() && started.compareAndSet(true, false)) {
log.info("Stopping compaction scheduler.");
cleanupState();
}
isLeader.set(false);
}

@Override
public boolean isRunning()
{
return isEnabled() && started.get();
return started.get();
}

@Override
Expand Down Expand Up @@ -220,8 +230,16 @@ public void stopCompaction(String dataSourceName)
statusTracker.removeDatasource(dataSourceName);
}

/**
* Initializes scheduler state if required.
*/
private synchronized void initState()
{
if (!started.compareAndSet(false, true)) {
return;
}

log.info("Starting compaction scheduler.");
final Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (taskRunnerOptional.isPresent()) {
taskRunnerOptional.get().registerListener(taskRunnerListener, Execs.directExecutor());
Expand All @@ -231,8 +249,16 @@ private synchronized void initState()
}
}

/**
* Cleans up scheduler state if required.
*/
private synchronized void cleanupState()
{
if (!started.compareAndSet(true, false)) {
return;
}

log.info("Stopping compaction scheduler.");
final Optional<TaskRunner> taskRunnerOptional = taskMaster.getTaskRunner();
if (taskRunnerOptional.isPresent()) {
taskRunnerOptional.get().unregisterListener(taskRunnerListener.getListenerId());
Expand All @@ -251,21 +277,36 @@ public boolean isEnabled()
return compactionConfigSupplier.get().isUseSupervisors();
}

/**
* Periodic task which runs the compaction duty if we are leader and
* useSupervisors is true. Otherwise, the scheduler state is cleaned up.
*/
private synchronized void scheduledRun()
{
if (isRunning()) {
if (!isLeader.get()) {
cleanupState();
return;
}

if (isEnabled()) {
initState();
try {
runCompactionDuty();
}
catch (Exception e) {
log.error(e, "Error processing compaction queue. Continuing schedule.");
}
scheduleOnExecutor(this::scheduledRun);
scheduleOnExecutor(this::scheduledRun, SCHEDULE_PERIOD_SECONDS);
} else {
cleanupState();
scheduleOnExecutor(this::scheduledRun, SCHEDULE_PERIOD_SECONDS * 4);
}
}

/**
* Runs the compaction duty and emits stats if {@link #METRIC_EMISSION_PERIOD}
* has elapsed.
*/
private synchronized void runCompactionDuty()
{
final CoordinatorRunStats stats = new CoordinatorRunStats();
Expand All @@ -291,7 +332,22 @@ private synchronized void runCompactionDuty()
@Override
public AutoCompactionSnapshot getCompactionSnapshot(String dataSource)
{
return duty.getAutoCompactionSnapshot(dataSource);
if (!activeDatasourceConfigs.containsKey(dataSource)) {
return AutoCompactionSnapshot.builder(dataSource)
.withStatus(AutoCompactionSnapshot.ScheduleStatus.NOT_ENABLED)
.build();
}

final AutoCompactionSnapshot snapshot = duty.getAutoCompactionSnapshot(dataSource);
if (snapshot == null) {
final AutoCompactionSnapshot.ScheduleStatus status =
isEnabled()
? AutoCompactionSnapshot.ScheduleStatus.AWAITING_FIRST_RUN
: AutoCompactionSnapshot.ScheduleStatus.NOT_ENABLED;
return AutoCompactionSnapshot.builder(dataSource).withStatus(status).build();
} else {
return snapshot;
}
}

@Override
Expand Down Expand Up @@ -336,7 +392,7 @@ private DataSourcesSnapshot getDatasourceSnapshot()
return segmentManager.getSnapshotOfDataSourcesWithAllUsedSegments();
}

private void scheduleOnExecutor(Runnable runnable)
private void scheduleOnExecutor(Runnable runnable, long delaySeconds)
{
executor.schedule(
() -> {
Expand All @@ -347,7 +403,7 @@ private void scheduleOnExecutor(Runnable runnable)
log.error(t, "Error while executing runnable");
}
},
SCHEDULE_PERIOD_SECONDS,
delaySeconds,
TimeUnit.SECONDS
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public void stop()
public void start()
{
taskMaster.becomeFullLeader();
compactionScheduler.start();
compactionScheduler.becomeLeader();
scheduledBatchTaskManager.start();

// Announce the node only after all the services have been initialized
Expand All @@ -181,7 +181,7 @@ public void stop()
{
serviceAnnouncer.unannounce(node);
scheduledBatchTaskManager.stop();
compactionScheduler.stop();
compactionScheduler.stopBeingLeader();
taskMaster.downgradeToHalfLeader();
}
}
Expand Down
Loading
Loading