Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
333a942
Adding major changes on the server and controller side to allow exist…
Sep 25, 2024
4686e30
Adding pauseless flag for commit end path:
9aman Sep 25, 2024
271dda5
Resolving Checkstyle issues
9aman Sep 25, 2024
2a36c3d
Adding pauseless to the stream config
9aman Sep 25, 2024
76c091c
fixing a test case
9aman Sep 26, 2024
79c4558
Adding checks to ensure the stream config is not null
9aman Sep 26, 2024
6fe9d3a
Adding support for legacy table configs
9aman Sep 30, 2024
16b9664
Revert "Group commit IdealState updates (#13976)"
9aman Oct 8, 2024
4dca71e
synchronising calls for ideal state updates
9aman Oct 8, 2024
e1dfc1c
Setting a fixed time for the download to happen
9aman Oct 23, 2024
0a5375e
Fixing minor bug in timeout
9aman Oct 23, 2024
bcf250b
The changes prevent the server from trying indefinitely in case there…
9aman Oct 24, 2024
7eb74ff
Fixing state machine issues that we encountered for lease extension d…
9aman Oct 24, 2024
d4fc408
Changes to allow extending RealtimeSegmentValidationManager and PinotLLC
9aman Oct 23, 2024
1f67e75
Introduce an API to upload and update the metadata for a failed segment
9aman Oct 27, 2024
ab75843
Reverting changes to helix helper in order to use IdealState group co…
9aman Nov 4, 2024
9084f17
Adding changes for group commit in base controller starter
9aman Nov 4, 2024
3160b3b
Minor linting issues
9aman Nov 4, 2024
949c948
reverting changes in PinotHelixResourceManager.java to allow group co…
9aman Nov 4, 2024
5fa1b4f
Removing changes that conflict with the group commit changes
9aman Nov 4, 2024
56e9e3f
Minor additional reversals related to PinotHelixResourceManager
9aman Nov 4, 2024
b53ee5e
reverting synchronization changes in PinotLLCRealtimeSegmentManager
9aman Nov 4, 2024
c53c669
fixing minor issues in the code
9aman Nov 4, 2024
6c13f04
Improving the download logic a bit
9aman Nov 4, 2024
599ca33
WIP: Fixing minor issues in the code
9aman Nov 4, 2024
1804c91
Fixing issues in the PinotHelixResourcemanager
9aman Nov 5, 2024
28f9d0f
Improving upload segment API to return segment ZK metadata instead of…
9aman Nov 5, 2024
a147b3b
Adding flags in table config to inject failures
9aman Nov 15, 2024
809e4f0
Adding support for pauselessConsumptionEnabled in TableConfigBuilder
9aman Nov 15, 2024
e5ba623
Reverting unncessary changes to the branch
9aman Nov 15, 2024
9dfcc4e
Reverting failure scenarios
9aman Nov 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ public enum ControllerResponseStatus {
public static final String PARAM_MEMORY_USED_BYTES = "memoryUsedBytes";
public static final String PARAM_SEGMENT_SIZE_BYTES = "segmentSizeBytes";
public static final String PARAM_REASON = "reason";
public static final String PARAM_PAUSELESS_CONSUMPTION_ENABLED = "pauselessConsumptionEnabled";
// Sent by servers to request additional time to build
public static final String PARAM_EXTRA_TIME_SEC = "extraTimeSec";
// Sent by servers to indicate the number of rows read so far
Expand Down Expand Up @@ -199,6 +200,7 @@ public String getUrl(String hostPort, String protocol) {
params.put(PARAM_SEGMENT_NAME, _params.getSegmentName());
params.put(PARAM_OFFSET, String.valueOf(_params.getOffset()));
params.put(PARAM_INSTANCE_ID, _params.getInstanceId());
params.put(PARAM_PAUSELESS_CONSUMPTION_ENABLED, String.valueOf(_params.getPauselessConsumptionEnabled()));
if (_params.getReason() != null) {
params.put(PARAM_REASON, _params.getReason());
}
Expand Down Expand Up @@ -242,6 +244,7 @@ public static class Params {
private long _memoryUsedBytes;
private long _segmentSizeBytes;
private String _streamPartitionMsgOffset;
private boolean _pauselessConsumptionEnabled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of changing the existing protocol, please add a new controller endpoint that is called from the server when it needs the idealstate modified before segment upload.

This will avoid a lot of if pauselessEnabled statements in the code path


public Params() {
_offset = -1L;
Expand All @@ -256,6 +259,7 @@ public Params() {
_segmentSizeBytes = SEGMENT_SIZE_BYTES_DEFAULT;
_streamPartitionMsgOffset = null;
_reason = null;
_pauselessConsumptionEnabled = false;
}

public Params(Params params) {
Expand All @@ -271,6 +275,7 @@ public Params(Params params) {
_segmentSizeBytes = params.getSegmentSizeBytes();
_streamPartitionMsgOffset = params.getStreamPartitionMsgOffset();
_reason = params.getReason();
_pauselessConsumptionEnabled = params.getPauselessConsumptionEnabled();
}

@Deprecated
Expand Down Expand Up @@ -334,6 +339,11 @@ public Params withStreamPartitionMsgOffset(String offset) {
return this;
}

public Params withPauselessConsumptionEnabled(boolean pauselessConsumptionEnabled) {
_pauselessConsumptionEnabled = pauselessConsumptionEnabled;
return this;
}

public String getSegmentName() {
return _segmentName;
}
Expand Down Expand Up @@ -383,6 +393,10 @@ public String getStreamPartitionMsgOffset() {
return _streamPartitionMsgOffset;
}

public boolean getPauselessConsumptionEnabled() {
return _pauselessConsumptionEnabled;
}

public String toString() {
return "Offset: " + _offset + ",Segment name: " + _segmentName + ",Instance Id: " + _instanceId + ",Reason: "
+ _reason + ",NumRows: " + _numRows + ",BuildTimeMillis: " + _buildTimeMillis + ",WaitTimeMillis: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,16 @@ public void testRequestURL()
SegmentCompletionProtocol.SegmentCommitStartRequest segmentCommitStartRequest =
new SegmentCompletionProtocol.SegmentCommitStartRequest(params);
String url = segmentCommitStartRequest.getUrl("localhost:8080", "http");
// Assert.assertEquals(url,
// // CHECKSTYLE:OFF
// "http://localhost:8080/segmentCommitStart?extraTimeSec=3000&segmentSizeBytes=5000&reason=%7B%22type%22%3A%22ROW_LIMIT%22%2C%20%22value%22%3A1000%7D&buildTimeMillis=1000&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249615238429973311938200772279310862572716999467690098706%22%7D&instance=Server_localhost_8099&waitTimeMillis=2000&offset=-1&name=foo%25%25__0__0__12345Z&location=s3%3A%2F%2Fmy.bucket%2Fsegment&rowCount=6000&memoryUsedBytes=4000");
// // CHECKSTYLE:ON

// Adding the case for test checks
Assert.assertEquals(url,
// CHECKSTYLE:OFF
"http://localhost:8080/segmentCommitStart?extraTimeSec=3000&segmentSizeBytes=5000&reason=%7B%22type%22%3A%22ROW_LIMIT%22%2C%20%22value%22%3A1000%7D&buildTimeMillis=1000&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249615238429973311938200772279310862572716999467690098706%22%7D&instance=Server_localhost_8099&waitTimeMillis=2000&offset=-1&name=foo%25%25__0__0__12345Z&location=s3%3A%2F%2Fmy.bucket%2Fsegment&rowCount=6000&memoryUsedBytes=4000");
// CHECKSTYLE:OFF
"http://localhost:8080/segmentCommitStart?segmentSizeBytes=5000&reason=%7B%22type%22%3A%22ROW_LIMIT%22%2C%20%22value%22%3A1000%7D&buildTimeMillis=1000&streamPartitionMsgOffset=%7B%22shardId-000000000001%22%3A%2249615238429973311938200772279310862572716999467690098706%22%7D&instance=Server_localhost_8099&offset=-1&pauselessConsumptionEnabled=false&memoryUsedBytes=4000&extraTimeSec=3000&waitTimeMillis=2000&name=foo%25%25__0__0__12345Z&location=s3%3A%2F%2Fmy.bucket%2Fsegment&rowCount=6000");
// CHECKSTYLE:ON

paramsMap = Arrays.stream(url.split("\\?")[1].split("&"))
.collect(Collectors.toMap(e -> e.split("=")[0], e -> e.split("=")[1]));
Assert.assertEquals(paramsMap.get(SegmentCompletionProtocol.PARAM_SEGMENT_NAME),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,7 @@ private void setUpPinotController() {

// Helix resource manager must be started in order to create PinotLLCRealtimeSegmentManager
LOGGER.info("Starting realtime segment manager");
_pinotLLCRealtimeSegmentManager =
new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics);
_pinotLLCRealtimeSegmentManager = createPinotLLCRealtimeSegmentManager();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes in the ControllerStarter have been added to easily override two crucial classes:
PinotLLCRealtimeSegmentManager and RealtimeSegmentValidationManager.

// TODO: Need to put this inside HelixResourceManager when HelixControllerLeadershipManager is removed.
_helixResourceManager.registerPinotLLCRealtimeSegmentManager(_pinotLLCRealtimeSegmentManager);
_segmentCompletionManager =
Expand Down Expand Up @@ -616,6 +615,10 @@ protected void configure() {
_serviceStatusCallbackList.add(generateServiceStatusCallback(_helixParticipantManager));
}

protected PinotLLCRealtimeSegmentManager createPinotLLCRealtimeSegmentManager() {
return new PinotLLCRealtimeSegmentManager(_helixResourceManager, _config, _controllerMetrics);
}

/**
* This method is used to fix table/schema names.
* TODO: in the next release, maybe 2.0.0, we can remove this method. Meanwhile we can delete the orphan schemas
Expand Down Expand Up @@ -861,9 +864,7 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
new OfflineSegmentIntervalChecker(_config, _helixResourceManager, _leadControllerManager,
new ValidationMetrics(_metricsRegistry), _controllerMetrics);
periodicTasks.add(_offlineSegmentIntervalChecker);
_realtimeSegmentValidationManager =
new RealtimeSegmentValidationManager(_config, _helixResourceManager, _leadControllerManager,
_pinotLLCRealtimeSegmentManager, _validationMetrics, _controllerMetrics, _storageQuotaChecker);
_realtimeSegmentValidationManager = createRealtimeSegmentValidationManager();
periodicTasks.add(_realtimeSegmentValidationManager);
_brokerResourceValidationManager =
new BrokerResourceValidationManager(_config, _helixResourceManager, _leadControllerManager, _controllerMetrics);
Expand Down Expand Up @@ -892,6 +893,11 @@ protected List<PeriodicTask> setupControllerPeriodicTasks() {
return periodicTasks;
}

protected RealtimeSegmentValidationManager createRealtimeSegmentValidationManager() {
return new RealtimeSegmentValidationManager(_config, _helixResourceManager, _leadControllerManager,
_pinotLLCRealtimeSegmentManager, _validationMetrics, _controllerMetrics, _storageQuotaChecker);
}

@Override
public void stop() {
switch (_controllerMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public String segmentConsumed(@QueryParam(SegmentCompletionProtocol.PARAM_INSTAN
@QueryParam(SegmentCompletionProtocol.PARAM_STREAM_PARTITION_MSG_OFFSET) String streamPartitionMsgOffset,
@QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason,
@QueryParam(SegmentCompletionProtocol.PARAM_MEMORY_USED_BYTES) long memoryUsedBytes,
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows) {
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
@QueryParam(SegmentCompletionProtocol.PARAM_PAUSELESS_CONSUMPTION_ENABLED) boolean pauselessConsumptionEnabled) {

if (instanceId == null || segmentName == null || (offset == -1 && streamPartitionMsgOffset == null)) {
LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, streamPartitionMsgOffset={}", offset,
Expand All @@ -143,7 +144,8 @@ public String segmentConsumed(@QueryParam(SegmentCompletionProtocol.PARAM_INSTAN
}
SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params();
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withReason(stopReason)
.withMemoryUsedBytes(memoryUsedBytes).withNumRows(numRows);
.withMemoryUsedBytes(memoryUsedBytes).withNumRows(numRows)
.withPauselessConsumptionEnabled(pauselessConsumptionEnabled);
extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
LOGGER.info("Processing segmentConsumed:{}", requestParams.toString());

Expand Down Expand Up @@ -191,7 +193,8 @@ public String segmentCommitStart(@QueryParam(SegmentCompletionProtocol.PARAM_INS
@QueryParam(SegmentCompletionProtocol.PARAM_BUILD_TIME_MILLIS) long buildTimeMillis,
@QueryParam(SegmentCompletionProtocol.PARAM_WAIT_TIME_MILLIS) long waitTimeMillis,
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes) {
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes,
@QueryParam(SegmentCompletionProtocol.PARAM_PAUSELESS_CONSUMPTION_ENABLED) boolean pauselessConsumptionEnabled) {

if (instanceId == null || segmentName == null || (offset == -1 && streamPartitionMsgOffset == null)) {
LOGGER.error("Invalid call: offset={}, segmentName={}, instanceId={}, streamPartitionMsgOffset={}", offset,
Expand All @@ -203,7 +206,7 @@ public String segmentCommitStart(@QueryParam(SegmentCompletionProtocol.PARAM_INS
SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params();
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withMemoryUsedBytes(memoryUsedBytes)
.withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis).withNumRows(numRows)
.withSegmentSizeBytes(segmentSizeBytes);
.withSegmentSizeBytes(segmentSizeBytes).withPauselessConsumptionEnabled(pauselessConsumptionEnabled);
extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);

LOGGER.info("Processing segmentCommitStart:{}", requestParams.toString());
Expand Down Expand Up @@ -364,6 +367,7 @@ public String segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol
@QueryParam(SegmentCompletionProtocol.PARAM_ROW_COUNT) int numRows,
@QueryParam(SegmentCompletionProtocol.PARAM_SEGMENT_SIZE_BYTES) long segmentSizeBytes,
@QueryParam(SegmentCompletionProtocol.PARAM_REASON) String stopReason,
@QueryParam(SegmentCompletionProtocol.PARAM_PAUSELESS_CONSUMPTION_ENABLED) boolean pauselessConsumptionEnabled,
FormDataMultiPart metadataFiles) {
if (instanceId == null || segmentName == null || segmentLocation == null || metadataFiles == null || (offset == -1
&& streamPartitionMsgOffset == null)) {
Expand All @@ -377,7 +381,8 @@ public String segmentCommitEndWithMetadata(@QueryParam(SegmentCompletionProtocol
SegmentCompletionProtocol.Request.Params requestParams = new SegmentCompletionProtocol.Request.Params();
requestParams.withInstanceId(instanceId).withSegmentName(segmentName).withSegmentLocation(segmentLocation)
.withSegmentSizeBytes(segmentSizeBytes).withBuildTimeMillis(buildTimeMillis).withWaitTimeMillis(waitTimeMillis)
.withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes).withReason(stopReason);
.withNumRows(numRows).withMemoryUsedBytes(memoryUsedBytes).withReason(stopReason)
.withPauselessConsumptionEnabled(pauselessConsumptionEnabled);
extractOffsetFromParams(requestParams, streamPartitionMsgOffset, offset);
LOGGER.info("Processing segmentCommitEndWithMetadata:{}", requestParams.toString());

Expand Down
Loading