Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
6defce3
Controller side changes to allow pauseless ingestion:
9aman Dec 19, 2024
54ab7b3
Minor improvements to improve readability
9aman Dec 20, 2024
1e40134
Server side changes to enable pauseless consumption. The changes include
9aman Dec 23, 2024
012da87
Fixes in the commit protocol for pauseless ingestion to complete the …
9aman Dec 24, 2024
a97847f
Changing the way to enable pausless post introduction of multi-stream…
9aman Dec 30, 2024
2c2ba86
WIP: Changes in the expected state while performing recovery during p…
9aman Dec 30, 2024
4d7c893
Adding changes for ensurePartitionsConsuming function that is utilize…
9aman Dec 30, 2024
a041a75
Add Server side Reingestion API
Dec 30, 2024
b6d0904
run segment level validation
Dec 30, 2024
58f6c51
Add method to trigger reingestion
Dec 30, 2024
d6313b3
Linting fixes
Dec 30, 2024
ca6134a
Adding integration tests for 3 failure scenarios that can occur durin…
9aman Jan 2, 2025
845f616
WIP: Reingestion test
Jan 6, 2025
d2dd313
Fix bug with indexes in reingestion
Jan 6, 2025
fb34fc8
Add tests for reingestion
Jan 8, 2025
50725bd
Fix controller url in Reingestion
Jan 14, 2025
e74d360
Support https and auth in reingestion
Jan 14, 2025
ce3c851
Merge branch 'master' into resolve-failures-pauseless-ingestion
9aman Jan 15, 2025
d6208a6
Removing checks on default crc and replacing them with segment status…
9aman Jan 15, 2025
7f5b720
Formatting improvements
9aman Jan 15, 2025
3f05b2f
Allowing null table config to be passed for checking pauseless consum…
9aman Jan 15, 2025
2012e38
Ensuring upload retries are backward compatible
9aman Jan 15, 2025
7b9da37
Removing unnecessary code
9aman Jan 15, 2025
ded8962
Fixing existing test cases and adding unit tests to check upload to d…
9aman Jan 16, 2025
e703d84
Refactor test cases to reduce repetition
9aman Jan 16, 2025
c2fda4a
Add missing header file
9aman Jan 16, 2025
c836009
Fix reingestion test
Jan 17, 2025
7974ab9
refactoring file upload download client
Jan 17, 2025
c08f841
Removing pauselessConsumptionEnabled from index config
9aman Jan 21, 2025
c4b99bd
Remove reingestion code
Jan 21, 2025
aee514c
Removing check in replaceSegmentIfCrcMismatch
9aman Jan 23, 2025
88a619a
Adding a new class for simple serialization and deserialization of ma…
9aman Jan 24, 2025
791ac21
Removing files related to reingestion tests
9aman Jan 24, 2025
8db5bae
Merging master and including force commit PR changes
9aman Jan 24, 2025
55b2b29
Revert "Remove reingestion code"
Jan 27, 2025
f74df66
Revert "Removing files related to reingestion tests"
Jan 27, 2025
1f4db11
Fix reingestion issue where consuming segemnts are not replaced
Jan 27, 2025
8e9249c
Copy full segment to deep store before triggerring metadata upload
Jan 27, 2025
b804a69
Refactoring: added support for tracking running reingestion jobs
Jan 27, 2025
609942d
Refactor: fix doc comments
Jan 27, 2025
f939714
Make SegmentZKMetadata JSON serializable
Jackie-Jiang Jan 28, 2025
155c49f
Minor API name change
Jackie-Jiang Jan 28, 2025
080ec55
Refactor PinotLLC class to add ability to inject failures
Jan 28, 2025
7e04fa3
Minor improvements
9aman Jan 28, 2025
523913f
Moving ZkMetadaptaUtils to commons and reusing the code in the upload…
9aman Jan 28, 2025
5689333
Fix lint failures
Jan 28, 2025
f42c6b8
Misc fix and cleanup
Jackie-Jiang Jan 29, 2025
a2eebf9
The tests were running slow due to the condition that the IdealState …
9aman Jan 29, 2025
1ba5b6c
Refactor Reingestion integration test
Jan 29, 2025
11aa170
Merge remote-tracking branch 'upstream/master' into pauseless-reinges…
Jan 29, 2025
e84788a
Fix error in tests post rebase
Jan 29, 2025
0d46327
refactoring
Jan 30, 2025
a94c7e3
Remove redundant code
Feb 3, 2025
8b9b8d1
Add support for queue in reingestion
Feb 3, 2025
d035249
Refactoring
Feb 4, 2025
bc8a65b
refaactoring
Feb 4, 2025
f082d24
Honour segment build semaphore during reingestion
Feb 4, 2025
d1ad30b
Fix test
Feb 5, 2025
7e47dd5
Add a seperate API to upload reingested segments
Feb 5, 2025
5a42c28
Cleanup code
Feb 5, 2025
1551685
Replace reIngest with reingest
Feb 6, 2025
837aa26
Replace reIngest with reingest
Feb 6, 2025
09e8583
Ensure correctness in reingestion
Feb 6, 2025
3d6fdf4
Decouple reingest completion and upload ZK path
Feb 6, 2025
b525c7e
Refactor Stateless segment writer to have least number of arguments
Feb 6, 2025
f7ae25f
Fix segment reset for pauseless tables
Feb 6, 2025
3c17957
Cleaning up Stateless segment writer
Feb 6, 2025
a3fa25c
Rename metric
Feb 6, 2025
61ebb23
Misc cleanup and logic decoupling
Jackie-Jiang Feb 12, 2025
62316c0
nit
Jackie-Jiang Feb 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,10 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
// segment when the partition is first detected).
COMMITTING_SEGMENT_SIZE("committingSegmentSize", false),

TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false);
TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false),

// Number of reingested segments getting uploaded
REINGESTED_SEGMENT_UPLOADS_IN_PROGRESS("reingestedSegmentUploadsInProgress", true);

private final String _gaugeName;
private final String _unit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public static FileUploadType getDefaultUploadType() {
private static final String SCHEMA_PATH = "/schemas";
private static final String OLD_SEGMENT_PATH = "/segments";
private static final String SEGMENT_PATH = "/v2/segments";
private static final String REINGESTED_SEGMENT_UPLOAD_PATH = "/segments/reingested";
private static final String BATCH_SEGMENT_UPLOAD_PATH = "/segments/batchUpload";
private static final String TABLES_PATH = "/tables";
private static final String TYPE_DELIMITER = "type=";
Expand Down Expand Up @@ -369,6 +370,12 @@ public static URI getUploadSegmentURI(URI controllerURI)
return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SEGMENT_PATH);
}

public static URI getReingestedSegmentUploadURI(URI controllerURI)
throws URISyntaxException {
return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(),
REINGESTED_SEGMENT_UPLOAD_PATH);
}

public static URI getBatchSegmentUploadURI(URI controllerURI)
throws URISyntaxException {
return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,98 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl
}
}

private SuccessResponse uploadReingestedSegment(String tableName, FormDataMultiPart multiPart, HttpHeaders headers,
Request request) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
String realtimeTableName = TableNameBuilder.REALTIME.tableNameWithType(tableName);

// TODO: Consider validating the segment name and table name from the header against the actual segment
extractHttpHeader(headers, CommonConstants.Controller.SEGMENT_NAME_HTTP_HEADER);
extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);

String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
if (!FileUploadType.METADATA.name().equals(uploadTypeStr)) {
throw new ControllerApplicationException(LOGGER, "Reingestion upload type must be METADATA",
Response.Status.BAD_REQUEST);
}
String sourceDownloadURIStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
if (StringUtils.isEmpty(sourceDownloadURIStr)) {
throw new ControllerApplicationException(LOGGER, "Source download URI is required", Response.Status.BAD_REQUEST);
}
String copySegmentToDeepStore =
extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
if (!Boolean.parseBoolean(copySegmentToDeepStore)) {
throw new ControllerApplicationException(LOGGER, "COPY_SEGMENT_TO_DEEP_STORE must be true for reingestion upload",
Response.Status.BAD_REQUEST);
}

File tempTarFile = null;
File tempSegmentDir = null;
try {
ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance();
String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
tempTarFile = new File(provider.getFileUploadTempDir(), tempFileName);
tempSegmentDir = new File(provider.getUntarredFileTempDir(), tempFileName);

long segmentSizeInBytes;
createSegmentFileFromMultipart(multiPart, tempTarFile);
PinotFS pinotFS = null;
try {
URI segmentURI = new URI(sourceDownloadURIStr);
pinotFS = PinotFSFactory.create(segmentURI.getScheme());
segmentSizeInBytes = pinotFS.length(segmentURI);
} catch (Exception e) {
segmentSizeInBytes = -1;
LOGGER.warn("Could not fetch segment size for metadata push", e);
} finally {
if (pinotFS != null) {
pinotFS.close();
}
}

String metadataProviderClass = DefaultMetadataExtractor.class.getName();
SegmentMetadata segmentMetadata = getSegmentMetadata(tempTarFile, tempSegmentDir, metadataProviderClass);
String segmentName = segmentMetadata.getName();

String clientAddress = InetAddress.getByName(request.getRemoteAddr()).getHostName();
LOGGER.info("Processing upload request for reingested segment: {} of table: {} from client: {}", segmentName,
realtimeTableName, clientAddress);

// Update download URI if controller is responsible for moving the segment to the deep store
URI dataDirURI = provider.getDataDirURI();
String dataDirPath = dataDirURI.toString();
String encodedSegmentName = URIUtils.encode(segmentName);
String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, rawTableName, encodedSegmentName);
String segmentDownloadURIStr;
if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) {
segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName);
} else {
segmentDownloadURIStr = finalSegmentLocationPath;
}
URI finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
LOGGER.info("Using segment download URI: {} for reingested segment: {} of table: {}", segmentDownloadURIStr,
segmentName, realtimeTableName);

ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
zkOperator.completeReingestedSegmentOperations(realtimeTableName, segmentMetadata, finalSegmentLocationURI,
sourceDownloadURIStr, segmentDownloadURIStr, segmentSizeInBytes);

return new SuccessResponse(
"Successfully uploaded reingested segment: " + segmentName + " of table: " + realtimeTableName);
} catch (WebApplicationException e) {
throw e;
} catch (Exception e) {
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR, 1L);
_controllerMetrics.addMeteredTableValue(tableName, ControllerMeter.CONTROLLER_TABLE_SEGMENT_UPLOAD_ERROR, 1L);
throw new ControllerApplicationException(LOGGER,
"Exception while uploading reingested segment: " + e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, e);
} finally {
FileUtils.deleteQuietly(tempTarFile);
FileUtils.deleteQuietly(tempSegmentDir);
}
}

// Method used to update a list of segments in batch mode with the METADATA upload type.
private SuccessResponse uploadSegments(String tableName, TableType tableType, FormDataMultiPart multiPart,
boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers, Request request) {
Expand Down Expand Up @@ -965,6 +1057,35 @@ public Response revertReplaceSegments(
}
}

@POST
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.MULTIPART_FORM_DATA)
@Path("segments/reingested")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.UPLOAD_SEGMENT)
@Authenticate(AccessType.CREATE)
@ApiOperation(value = "Reingest a realtime segment", notes = "Reingest a segment as multipart file")
@ApiResponses(value = {
@ApiResponse(code = 200, message = "Successfully reingested segment"),
@ApiResponse(code = 400, message = "Bad Request"),
@ApiResponse(code = 403, message = "Segment validation fails"),
@ApiResponse(code = 409, message = "Segment already exists or another parallel push in progress"),
@ApiResponse(code = 412, message = "CRC check fails"),
@ApiResponse(code = 500, message = "Internal error")
})
@TrackInflightRequestMetrics
@TrackedByGauge(gauge = ControllerGauge.REINGESTED_SEGMENT_UPLOADS_IN_PROGRESS)
public void uploadReingestedSegment(FormDataMultiPart multiPart,
@ApiParam(value = "Name of the table", required = true)
@QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) String tableName, @Context HttpHeaders headers,
@Context Request request, @Suspended AsyncResponse asyncResponse) {
try {
asyncResponse.resume(uploadReingestedSegment(tableName, multiPart, headers, request));
} catch (Throwable t) {
asyncResponse.resume(t);
}
}

private static void createSegmentFileFromMultipart(FormDataMultiPart multiPart, File destFile)
throws IOException {
// Read segment file or segment metadata file and directly use that information to update zk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -170,6 +171,44 @@ public void completeSegmentsOperations(String tableNameWithType, FileUploadType
processExistingSegments(tableNameWithType, uploadType, enableParallelPushProtection, headers, existingSegmentsList);
}

public void completeReingestedSegmentOperations(String realtimeTableName, SegmentMetadata segmentMetadata,
URI finalSegmentLocationURI, String sourceDownloadURIStr, String segmentDownloadURIStr, long segmentSizeInBytes)
throws Exception {
String segmentName = segmentMetadata.getName();
ZNRecord segmentMetadataZNRecord =
_pinotHelixResourceManager.getSegmentMetadataZnRecord(realtimeTableName, segmentName);
if (segmentMetadataZNRecord == null) {
throw new ControllerApplicationException(LOGGER, "Failed to find segment ZK metadata for segment: " + segmentName,
Response.Status.NOT_FOUND);
}
int expectedVersion = segmentMetadataZNRecord.getVersion();
SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(segmentMetadataZNRecord);
if (segmentZKMetadata.getStatus() != CommonConstants.Segment.Realtime.Status.COMMITTING) {
throw new ControllerApplicationException(LOGGER,
"Reingested segment: " + segmentName + " must be in COMMITTING status, but found: "
+ segmentZKMetadata.getStatus(), Response.Status.CONFLICT);
}

// Copy the segment to the final location
copyFromSegmentURIToDeepStore(new URI(sourceDownloadURIStr), finalSegmentLocationURI);
LOGGER.info("Copied reingested segment: {} of table: {} to final location: {}", segmentName, realtimeTableName,
finalSegmentLocationURI);

// Update the ZK metadata
segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap());
SegmentZKMetadataUtils.updateCommittingSegmentZKMetadata(realtimeTableName, segmentZKMetadata, segmentMetadata,
segmentDownloadURIStr, segmentSizeInBytes, segmentZKMetadata.getEndOffset());
if (!_pinotHelixResourceManager.updateZkMetadata(realtimeTableName, segmentZKMetadata, expectedVersion)) {
throw new RuntimeException(
String.format("Failed to update ZK metadata for segment: %s, table: %s, expected version: %d", segmentName,
realtimeTableName, expectedVersion));
}
LOGGER.info("Updated reingested segment: {} of table: {} to property store", segmentName, realtimeTableName);

// Send a message to servers hosting the table to reset the segment
_pinotHelixResourceManager.resetSegment(realtimeTableName, segmentName, null);
}

/**
* Returns {@code true} when the segment should be processed as new segment.
* <p>When segment ZK metadata exists, check if segment exists in the ideal state. If the previous upload failed after
Expand Down
Loading
Loading