Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public static class CustomHeaders {
public static final String UPLOAD_TYPE = "UPLOAD_TYPE";
public static final String REFRESH_ONLY = "REFRESH_ONLY";
public static final String DOWNLOAD_URI = "DOWNLOAD_URI";

/**
* This header is only used for METADATA push, to allow controller to copy segment to deep store,
* if segment was not placed in the deep store to begin with
*/
public static final String COPY_SEGMENT_TO_DEEP_STORE = "COPY_SEGMENT_TO_DEEP_STORE";
public static final String SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER = "Pinot-SegmentZKMetadataCustomMapModifier";
public static final String CRYPTER = "CRYPTER";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public Response downloadSegment(
}

private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType,
@Nullable FormDataMultiPart multiPart, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection,
@Nullable FormDataMultiPart multiPart, boolean copySegmentToFinalLocation, boolean enableParallelPushProtection,
boolean allowRefresh, HttpHeaders headers, Request request) {
if (StringUtils.isNotEmpty(tableName)) {
TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName);
Expand All @@ -213,13 +213,15 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl
extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);

String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
String downloadURI = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
String sourceDownloadURIStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER);
String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR);

File tempEncryptedFile = null;
File tempDecryptedFile = null;
File tempSegmentDir = null;
// The downloadUri for putting into segment zk metadata
String segmentDownloadURIStr = sourceDownloadURIStr;
try {
ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance();
String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
Expand All @@ -238,20 +240,22 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl
"Segment file (as multipart/form-data) is required for SEGMENT upload mode",
Response.Status.BAD_REQUEST);
}
if (!moveSegmentToFinalLocation && StringUtils.isEmpty(downloadURI)) {
if (!copySegmentToFinalLocation && StringUtils.isEmpty(sourceDownloadURIStr)) {
throw new ControllerApplicationException(LOGGER,
"Download URI is required if segment should not be copied to the deep store",
"Source download URI is required in header field 'DOWNLOAD_URI' if segment should not be copied to "
+ "the deep store",
Response.Status.BAD_REQUEST);
}
createSegmentFileFromMultipart(multiPart, destFile);
segmentSizeInBytes = destFile.length();
break;
case URI:
if (StringUtils.isEmpty(downloadURI)) {
throw new ControllerApplicationException(LOGGER, "Download URI is required for URI upload mode",
if (StringUtils.isEmpty(sourceDownloadURIStr)) {
throw new ControllerApplicationException(LOGGER,
"Source download URI is required in header field 'DOWNLOAD_URI' for URI upload mode",
Response.Status.BAD_REQUEST);
}
downloadSegmentFileFromURI(downloadURI, destFile, tableName);
downloadSegmentFileFromURI(sourceDownloadURIStr, destFile, tableName);
segmentSizeInBytes = destFile.length();
break;
case METADATA:
Expand All @@ -260,14 +264,19 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl
"Segment metadata file (as multipart/form-data) is required for METADATA upload mode",
Response.Status.BAD_REQUEST);
}
if (StringUtils.isEmpty(downloadURI)) {
throw new ControllerApplicationException(LOGGER, "Download URI is required for METADATA upload mode",
if (StringUtils.isEmpty(sourceDownloadURIStr)) {
throw new ControllerApplicationException(LOGGER,
"Source download URI is required in header field 'DOWNLOAD_URI' for METADATA upload mode",
Response.Status.BAD_REQUEST);
}
moveSegmentToFinalLocation = false;
// override copySegmentToFinalLocation if override provided in headers:COPY_SEGMENT_TO_DEEP_STORE
// else set to false for backward compatibility
String copySegmentToDeepStore =
extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore);
createSegmentFileFromMultipart(multiPart, destFile);
try {
URI segmentURI = new URI(downloadURI);
URI segmentURI = new URI(sourceDownloadURIStr);
PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme());
segmentSizeInBytes = pinotFS.length(segmentURI);
} catch (Exception e) {
Expand Down Expand Up @@ -332,24 +341,25 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl

// Update download URI if controller is responsible for moving the segment to the deep store
URI finalSegmentLocationURI = null;
if (moveSegmentToFinalLocation) {
if (copySegmentToFinalLocation) {
URI dataDirURI = provider.getDataDirURI();
String dataDirPath = dataDirURI.toString();
String encodedSegmentName = URIUtils.encode(segmentName);
String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, rawTableName, encodedSegmentName);
if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) {
downloadURI = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName);
segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName);
} else {
downloadURI = finalSegmentLocationPath;
segmentDownloadURIStr = finalSegmentLocationPath;
}
finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
}
LOGGER.info("Using download URI: {} for segment: {} of table: {} (move segment: {})", downloadURI, segmentFile,
tableNameWithType, moveSegmentToFinalLocation);
LOGGER.info("Using segment download URI: {} for segment: {} of table: {} (move segment: {})",
segmentDownloadURIStr, segmentFile, tableNameWithType, copySegmentToFinalLocation);

ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, segmentFile,
downloadURI, crypterName, segmentSizeInBytes, enableParallelPushProtection, allowRefresh, headers);
zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI,
segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes,
enableParallelPushProtection, allowRefresh, headers);

return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType);
} catch (WebApplicationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.api.upload;

import com.google.common.base.Preconditions;
import java.io.File;
import java.net.URI;
import javax.annotation.Nullable;
Expand All @@ -29,6 +30,7 @@
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
Expand Down Expand Up @@ -60,7 +62,8 @@ public ZKOperator(PinotHelixResourceManager pinotHelixResourceManager, Controlle
}

public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata,
@Nullable URI finalSegmentLocationURI, File segmentFile, String downloadUrl, @Nullable String crypterName,
FileUploadType uploadType, @Nullable URI finalSegmentLocationURI, File segmentFile,
@Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName,
long segmentSizeInBytes, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
Expand All @@ -76,8 +79,9 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata
Response.Status.GONE);
}
LOGGER.info("Adding new segment: {} to table: {}", segmentName, tableNameWithType);
processNewSegment(tableNameWithType, segmentMetadata, finalSegmentLocationURI, segmentFile, downloadUrl,
crypterName, segmentSizeInBytes, enableParallelPushProtection, headers);
processNewSegment(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, segmentFile,
sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, enableParallelPushProtection,
headers);
} else {
// Refresh an existing segment
if (!allowRefresh) {
Expand All @@ -89,16 +93,16 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata
tableNameWithType), Response.Status.CONFLICT);
}
LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType);
processExistingSegment(tableNameWithType, segmentMetadata, existingSegmentMetadataZNRecord,
finalSegmentLocationURI, segmentFile, downloadUrl, crypterName, segmentSizeInBytes,
enableParallelPushProtection, headers);
processExistingSegment(tableNameWithType, segmentMetadata, uploadType, existingSegmentMetadataZNRecord,
finalSegmentLocationURI, segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName,
segmentSizeInBytes, enableParallelPushProtection, headers);
}
}

private void processExistingSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI, File segmentFile,
String downloadUrl, @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection,
HttpHeaders headers)
FileUploadType uploadType, ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI,
File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr,
@Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
int expectedVersion = existingSegmentMetadataZNRecord.getVersion();
Expand Down Expand Up @@ -179,8 +183,7 @@ private void processExistingSegment(String tableNameWithType, SegmentMetadata se
"New segment crc {} is different than the existing segment crc {}. Updating ZK metadata and refreshing "
+ "segment {}", newCrc, existingCrc, segmentName);
if (finalSegmentLocationURI != null) {
moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI);
LOGGER.info("Moved segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr,
finalSegmentLocationURI);
}

Expand All @@ -191,12 +194,12 @@ private void processExistingSegment(String tableNameWithType, SegmentMetadata se
if (customMapModifier == null) {
// If no modifier is provided, use the custom map from the segment metadata
segmentZKMetadata.setCustomMap(null);
ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl,
crypterName, segmentSizeInBytes);
ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata,
segmentDownloadURIStr, crypterName, segmentSizeInBytes);
} else {
// If modifier is provided, first set the custom map from the segment metadata, then apply the modifier
ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl,
crypterName, segmentSizeInBytes);
ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata,
segmentDownloadURIStr, crypterName, segmentSizeInBytes);
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
}
if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) {
Expand Down Expand Up @@ -237,16 +240,17 @@ private void checkCRC(HttpHeaders headers, String tableNameWithType, String segm
}
}

private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
@Nullable URI finalSegmentLocationURI, File segmentFile, String downloadUrl, @Nullable String crypterName,
long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers)
private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadType uploadType,
@Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr,
String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes,
boolean enableParallelPushProtection, HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
SegmentZKMetadata newSegmentZKMetadata;
try {
newSegmentZKMetadata =
ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, downloadUrl, crypterName,
segmentSizeInBytes);
ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, segmentDownloadURIStr,
crypterName, segmentSizeInBytes);
} catch (IllegalArgumentException e) {
throw new ControllerApplicationException(LOGGER,
String.format("Got invalid segment metadata when adding segment: %s for table: %s, reason: %s", segmentName,
Expand Down Expand Up @@ -274,8 +278,7 @@ private void processNewSegment(String tableNameWithType, SegmentMetadata segment

if (finalSegmentLocationURI != null) {
try {
moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI);
LOGGER.info("Moved segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr,
finalSegmentLocationURI);
} catch (Exception e) {
// Cleanup the Zk entry and the segment from the permanent directory if it exists.
Expand Down Expand Up @@ -310,9 +313,39 @@ private void processNewSegment(String tableNameWithType, SegmentMetadata segment
}
}

private void moveSegmentToPermanentDirectory(File segmentFile, URI finalSegmentLocationURI)
private void copySegmentToDeepStore(String tableNameWithType, String segmentName, FileUploadType uploadType,
File segmentFile, String sourceDownloadURIStr, URI finalSegmentLocationURI)
throws Exception {
if (uploadType == FileUploadType.METADATA) {
// In Metadata push, local segmentFile only contains metadata.
// Copy segment over from sourceDownloadURI to final location.
copyFromSegmentURIToDeepStore(new URI(sourceDownloadURIStr), finalSegmentLocationURI);
LOGGER.info("Copied segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
finalSegmentLocationURI);
} else {
// In push types other than METADATA, local segmentFile contains the complete segment.
// Move local segment to final location
copyFromSegmentFileToDeepStore(segmentFile, finalSegmentLocationURI);
LOGGER.info("Copied segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
finalSegmentLocationURI);
}
}

private void copyFromSegmentFileToDeepStore(File segmentFile, URI finalSegmentLocationURI)
throws Exception {
LOGGER.info("Copying segment from: {} to: {}", segmentFile.getAbsolutePath(), finalSegmentLocationURI);
PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copyFromLocalFile(segmentFile, finalSegmentLocationURI);
}

private void copyFromSegmentURIToDeepStore(URI sourceDownloadURI, URI finalSegmentLocationURI)
throws Exception {
if (sourceDownloadURI.equals(finalSegmentLocationURI)) {
LOGGER.info("Skip copying segment as sourceDownloadURI: {} is the same as finalSegmentLocationURI",
sourceDownloadURI);
} else {
Preconditions.checkState(sourceDownloadURI.getScheme().equals(finalSegmentLocationURI.getScheme()));
LOGGER.info("Copying segment from: {} to: {}", sourceDownloadURI, finalSegmentLocationURI);
PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copy(sourceDownloadURI, finalSegmentLocationURI);
}
}
}
Loading