Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -435,17 +435,24 @@ private static HttpUriRequest getSendSegmentJsonRequest(URI uri, String jsonStri
return requestBuilder.build();
}

private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs) {
private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs,
@Nullable String authToken) {
RequestBuilder requestBuilder =
RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1).setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE)
.setEntity(new StringEntity(jsonRequestBody, ContentType.APPLICATION_JSON));
if (StringUtils.isNotBlank(authToken)) {
requestBuilder.addHeader("Authorization", authToken);
}
setTimeout(requestBuilder, socketTimeoutMs);
return requestBuilder.build();
}

private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs) {
private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs, @Nullable String authToken) {
RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1)
.setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE);
if (StringUtils.isNotBlank(authToken)) {
requestBuilder.addHeader("Authorization", authToken);
}
setTimeout(requestBuilder, socketTimeoutMs);
return requestBuilder.build();
}
Expand Down Expand Up @@ -1018,28 +1025,31 @@ public SimpleHttpResponse sendSegmentJson(URI uri, String jsonString)
*
* @param uri URI
* @param startReplaceSegmentsRequest request
* @param authToken auth token
* @return Response
* @throws IOException
* @throws HttpErrorStatusException
*/
public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest)
public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest,
@Nullable String authToken)
throws IOException, HttpErrorStatusException {
return sendRequest(getStartReplaceSegmentsRequest(uri, JsonUtils.objectToString(startReplaceSegmentsRequest),
DEFAULT_SOCKET_TIMEOUT_MS));
DEFAULT_SOCKET_TIMEOUT_MS, authToken));
}

/**
* End replace segments with default settings.
*
* @param uri URI
* @oaram socketTimeoutMs Socket timeout in milliseconds
* @param authToken auth token
* @return Response
* @throws IOException
* @throws HttpErrorStatusException
*/
public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs)
public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs, @Nullable String authToken)
throws IOException, HttpErrorStatusException {
return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs));
return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs, authToken));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
List<String> segmentsTo =
segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
lineageEntryId = SegmentConversionUtils.startSegmentReplace(tableNameWithType, uploadURL,
new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), authToken);
}

// Upload the tarred segments
Expand Down Expand Up @@ -213,9 +213,8 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig

// Update the segment lineage to indicate that the segment replacement is done.
if (replaceSegmentsEnabled) {
SegmentConversionUtils
.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
_minionConf.getEndReplaceSegmentsTimeoutMs());
SegmentConversionUtils.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
_minionConf.getEndReplaceSegmentsTimeoutMs(), authToken);
}

String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
Expand Down Expand Up @@ -122,15 +123,16 @@ public static void uploadSegment(Map<String, String> configs, List<Header> httpH
}

public static String startSegmentReplace(String tableNameWithType, String uploadURL,
StartReplaceSegmentsRequest startReplaceSegmentsRequest)
StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable String authToken)
throws Exception {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
SSLContext sslContext = MinionContext.getInstance().getSSLContext();
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
URI uri =
FileUploadDownloadClient.getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), true);
SimpleHttpResponse response = fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest);
SimpleHttpResponse response =
fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest, authToken);
String responseString = response.getResponse();
LOGGER.info(
"Got response {}: {} while sending start replace segment request for table: {}, uploadURL: {}, request: {}",
Expand All @@ -140,15 +142,15 @@ public static String startSegmentReplace(String tableNameWithType, String upload
}

public static void endSegmentReplace(String tableNameWithType, String uploadURL, String segmentLineageEntryId,
int socketTimeoutMs)
int socketTimeoutMs, @Nullable String authToken)
throws Exception {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
SSLContext sslContext = MinionContext.getInstance().getSSLContext();
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
URI uri = FileUploadDownloadClient
.getEndReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), segmentLineageEntryId);
SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs);
SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs, authToken);
LOGGER.info("Got response {}: {} while sending end replace segment request for table: {}, uploadURL: {}",
response.getStatusCode(), response.getResponse(), tableNameWithType, uploadURL);
}
Expand Down