Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public static class CustomHeaders {
public static final String UPLOAD_TYPE = "UPLOAD_TYPE";
public static final String REFRESH_ONLY = "REFRESH_ONLY";
public static final String DOWNLOAD_URI = "DOWNLOAD_URI";

public static final String MOVE_SEGMENT_TO_DEEP_STORE = "MOVE_SEGMENT_TO_DEEP_STORE";
public static final String SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER = "Pinot-SegmentZKMetadataCustomMapModifier";
public static final String CRYPTER = "CRYPTER";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,15 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl
throw new ControllerApplicationException(LOGGER, "Download URI is required for METADATA upload mode",
Response.Status.BAD_REQUEST);
}
moveSegmentToFinalLocation = false;
// override moveSegmentToFinalLocation if override provided in headers:moveSegmentToDeepStore
// else set to false for backward compatibility
String moveSegmentToDeepStore =
extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.MOVE_SEGMENT_TO_DEEP_STORE);
if (moveSegmentToDeepStore != null) {
moveSegmentToFinalLocation = Boolean.parseBoolean(moveSegmentToDeepStore);
} else {
moveSegmentToFinalLocation = false;
}
createSegmentFileFromMultipart(multiPart, destFile);
try {
URI segmentURI = new URI(downloadURI);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino
headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath));
headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
FileUploadDownloadClient.FileUploadType.METADATA.toString()));
if (spec.getPushJobSpec() != null) {
headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.MOVE_SEGMENT_TO_DEEP_STORE,
String.valueOf(spec.getPushJobSpec().getMoveToDeepStoreForMetadataPush())));
}
headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider));

SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public class PushJobSpec implements Serializable {
*/
private long _pushRetryIntervalMillis = 1000;

/**
* Applicable for URI and METADATA push types.
* If true, and if segment was not already in the deep store, move it to deep store.
*/
private boolean _moveToDeepStoreForMetadataPush;
/**
* Used in SegmentUriPushJobRunner, which is used to composite the segment uri to send to pinot controller.
* The URI sends to controller is in the format ${segmentUriPrefix}${segmentPath}${segmentUriSuffix}
Expand Down Expand Up @@ -121,4 +126,12 @@ public int getPushParallelism() {
public void setPushParallelism(int pushParallelism) {
_pushParallelism = pushParallelism;
}

public boolean getMoveToDeepStoreForMetadataPush() {
return _moveToDeepStoreForMetadataPush;
}

public void setMoveToDeepStoreForMetadataPush(boolean moveToDeepStoreForMetadataPush) {
_moveToDeepStoreForMetadataPush = moveToDeepStoreForMetadataPush;
}
}