Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,20 @@
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.configuration.properties.S3TableProperties;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.StateStoreProvider;
import sleeper.core.statestore.commit.StateStoreCommitRequest;
import sleeper.core.statestore.commit.StateStoreCommitRequestSerDe;
import sleeper.core.statestore.transactionlog.AddTransactionRequest;
import sleeper.core.statestore.transactionlog.transaction.TransactionSerDeProvider;
import sleeper.core.util.LoggedDuration;
import sleeper.core.util.PollWithRetries;
import sleeper.dynamodb.tools.DynamoDBUtils;
import sleeper.statestore.StateStoreFactory;
import sleeper.statestore.committer.StateStoreCommitter.RequestHandle;
import sleeper.statestore.committer.StateStoreCommitter.RetryOnThrottling;
import sleeper.statestore.transactionlog.S3TransactionBodyStore;

import java.time.Duration;
Expand Down Expand Up @@ -72,9 +76,9 @@ public class MultiThreadedStateStoreCommitter {
private String qUrl;
private StateStoreCommitRequestSerDe serDe;
private S3TransactionBodyStore transactionBodyStore;
private TablePropertiesProvider tablePropertiesProvider;
private StateStoreProvider stateStoreProvider;
private StateStoreCommitter committer;
private PollWithRetries throttlingRetriesConfig;
private RetryOnThrottling retryOnThrottling;
private Map<String, CompletableFuture<Instant>> tableFutures = new HashMap<>();

public MultiThreadedStateStoreCommitter(S3Client s3Client, DynamoDbClient dynamoClient, SqsClient sqsClient, String configBucketName, String qUrl) {
Expand All @@ -93,17 +97,15 @@ private void init() {
qUrl = instanceProperties.get(STATESTORE_COMMITTER_QUEUE_URL);
}

TablePropertiesProvider tablePropertiesProvider = S3TableProperties.createProvider(instanceProperties, s3Client, dynamoClient);
tablePropertiesProvider = S3TableProperties.createProvider(instanceProperties, s3Client, dynamoClient);
serDe = new StateStoreCommitRequestSerDe(tablePropertiesProvider);

StateStoreFactory stateStoreFactory = StateStoreFactory.forCommitterProcess(instanceProperties, s3Client, dynamoClient);
stateStoreProvider = StateStoreProvider.memoryLimitOnly(instanceProperties, stateStoreFactory);
transactionBodyStore = new S3TransactionBodyStore(instanceProperties, s3Client, TransactionSerDeProvider.from(tablePropertiesProvider));
committer = new StateStoreCommitter(
tablePropertiesProvider,
stateStoreProvider,
transactionBodyStore);
throttlingRetriesConfig = PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(5), Duration.ofMinutes(10));

PollWithRetries throttlingRetriesConfig = PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(5), Duration.ofMinutes(10));
retryOnThrottling = operation -> DynamoDBUtils.retryOnThrottlingException(throttlingRetriesConfig, operation);
}

/**
Expand Down Expand Up @@ -143,7 +145,7 @@ public void runUntil(int waitTimeSeconds, Function<ReceiveMessageResponse, Boole
// been written to the config bucket yet when we first start up.
if (response.hasMessages()) {
lastReceivedCommitsAt = Instant.now();
if (committer == null) {
if (stateStoreProvider == null) {
init();
}
}
Expand All @@ -153,7 +155,7 @@ public void runUntil(int waitTimeSeconds, Function<ReceiveMessageResponse, Boole
LoggedDuration.withShortOutput(startedAt, Instant.now()),
LoggedDuration.withShortOutput(lastReceivedCommitsAt, Instant.now()));

Map<String, List<StateStoreCommitRequestWithSqsReceipt>> messagesByTableId = response.messages().stream()
Map<String, List<StateStoreCommitRequestWithSqsReceipt>> commitRequestsByTableId = response.messages().stream()
.map(message -> {
LOGGER.trace("Received message: {}", message);
StateStoreCommitRequest request = serDe.fromJson(message.body());
Expand All @@ -163,11 +165,11 @@ public void runUntil(int waitTimeSeconds, Function<ReceiveMessageResponse, Boole
.collect(Collectors.groupingBy(request -> request.getCommitRequest().getTableId()));

// Try to make sure there is going to be enough heap space available to process these commits
ensureEnoughHeapSpaceAvailable(messagesByTableId.keySet());
ensureEnoughHeapSpaceAvailable(commitRequestsByTableId.keySet());

messagesByTableId.entrySet().forEach(tableMessages -> {
String tableId = tableMessages.getKey();
List<StateStoreCommitRequestWithSqsReceipt> requestsWithHandle = tableMessages.getValue();
commitRequestsByTableId.entrySet().forEach(tableCommitRequests -> {
String tableId = tableCommitRequests.getKey();
List<StateStoreCommitRequestWithSqsReceipt> requestsWithHandle = tableCommitRequests.getValue();
LOGGER.info("Received {} requests for table: {}", requestsWithHandle.size(), tableId);

// Wait until processing of previous commits for this table has finished
Expand All @@ -179,9 +181,12 @@ public void runUntil(int waitTimeSeconds, Function<ReceiveMessageResponse, Boole
}
}

TableProperties tableProperties = tablePropertiesProvider.getById(tableId);
StateStore stateStore = stateStoreProvider.getStateStore(tableProperties);

// Apply the commits for each table on a separate thread
CompletableFuture<Instant> task = CompletableFuture.supplyAsync(() -> {
processMessagesForTable(tableId, requestsWithHandle);
processCommitRequestsForTable(tableId, stateStore, requestsWithHandle);
return Instant.now();
});

Expand Down Expand Up @@ -216,15 +221,58 @@ private void ensureEnoughHeapSpaceAvailable(Set<String> tableIds) {
}
}

private boolean processMessagesForTable(String tableId, List<StateStoreCommitRequestWithSqsReceipt> requestsWithHandle) {
private void processCommitRequestsForTable(String tableId, StateStore stateStore, List<StateStoreCommitRequestWithSqsReceipt> requests) {
Instant startedAt = Instant.now();
LOGGER.info("Processing {} requests for table: {} ...", requestsWithHandle.size(), tableId);
LOGGER.info("Processing {} requests for table: {} ...", requests.size(), tableId);
applyBatchOfCommits(retryOnThrottling, stateStore, requests);
reportCommitOutcomesToSqs(tableId, requests);
LOGGER.info("Finished applying batch of {} commit requests for table {} in {}",
requests.size(),
tableId,
LoggedDuration.withShortOutput(startedAt, Instant.now()));
}

/**
* Applies a batch of state store commit requests.
*
* @param retryOnThrottling function to apply retries due to DynamoDB API throttling
* @param stateStore state store of the Sleeper table that the commit requests should be applied to
* @param requests the commit requests
*/
private void applyBatchOfCommits(RetryOnThrottling retryOnThrottling, StateStore stateStore, List<StateStoreCommitRequestWithSqsReceipt> requests) {
for (int i = 0; i < requests.size(); i++) {
StateStoreCommitRequestWithSqsReceipt request = requests.get(i);
try {
retryOnThrottling.doWithRetries(() -> applyCommit(stateStore, request));
} catch (InterruptedException e) {
LOGGER.error("Interrupted applying commit request", e);
requests.subList(i, requests.size())
.forEach(failed -> failed.setFailed(e));
return;
} catch (RuntimeException e) {
LOGGER.error("Failed commit request", e);
request.setFailed(e);
}
}
}

committer.applyBatch(
operation -> DynamoDBUtils.retryOnThrottlingException(throttlingRetriesConfig, operation),
requestsWithHandle.stream().map(StateStoreCommitRequestWithSqsReceipt::getHandle).toList());
/**
* Applies a state store commit request.
*
* @param stateStore state store of the Sleeper table that the commit request should be applied to
* @param request the commit request
*/
private void applyCommit(StateStore stateStore, StateStoreCommitRequestWithSqsReceipt request) throws StateStoreException {
stateStore.addTransaction(
AddTransactionRequest.withTransaction(transactionBodyStore.getTransaction(request.getCommitRequest()))
.bodyKey(request.getCommitRequest().getBodyKey())
.build());
LOGGER.info("Applied request to table ID {} with type {} at time {}",
request.getCommitRequest().getTableId(), request.getCommitRequest().getTransactionType(), Instant.now());
}

Map<Boolean, List<StateStoreCommitRequestWithSqsReceipt>> requestResults = requestsWithHandle.stream().collect(Collectors.partitioningBy(StateStoreCommitRequestWithSqsReceipt::failed));
private void reportCommitOutcomesToSqs(String tableId, List<StateStoreCommitRequestWithSqsReceipt> requests) {
Map<Boolean, List<StateStoreCommitRequestWithSqsReceipt>> requestResults = requests.stream().collect(Collectors.partitioningBy(StateStoreCommitRequestWithSqsReceipt::failed));
List<StateStoreCommitRequestWithSqsReceipt> failedRequests = requestResults.get(true);
List<StateStoreCommitRequestWithSqsReceipt> successfulRequests = requestResults.get(false);

Expand Down Expand Up @@ -287,12 +335,6 @@ private boolean processMessagesForTable(String tableId, List<StateStoreCommitReq
LOGGER.debug("Successfully changed visibility of {} requests for table {} in SQS queue", changeVisibilityResponse.successful().size(), tableId);
}
}

LOGGER.info("Finished processing {} messages for table {} in {} ...",
requestsWithHandle.size(),
tableId,
LoggedDuration.withShortOutput(startedAt, Instant.now()));
return true;
}

/**
Expand All @@ -313,11 +355,9 @@ private StateStoreCommitRequest getCommitRequest() {
return commitRequest;
}

private RequestHandle getHandle() {
return RequestHandle.withCallbackOnFail(commitRequest, e -> {
LOGGER.error("Error whilst processing state store commit request for table: {}", commitRequest.getTableId(), e);
failed = true;
});
private void setFailed(Exception e) {
LOGGER.error("Error whilst processing state store commit request for table: {}", commitRequest.getTableId(), e);
failed = true;
}

private String getSqsReceipt() {
Expand Down