Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,17 +99,21 @@ public void write(ShardSessionRecord message) {
*
* <p>Returns the updated record if the update succeeded.
*
* <p>Returns {@code Optional.empty()} if the update could not be executed, either because
* the rules of the passed {@code RecordUpdate} prevented it, or due to a concurrent changes
* which have happened to the corresponding Datastore entity.
* <p>Returns {@code Optional.empty()} if the update could not be executed, because
* the rules of the passed {@code RecordUpdate} prevented it.
*
* @param index
* index of a record to execute an update for
* @param update
* an update to perform
* @return a modified record, or {@code Optional.empty()} if the update could not be executed
* @throws DatastoreException
* if there is a problem communicating with Datastore, or if the entity could not
* be updated due to a concurrent changes which have happened to the corresponding
* Datastore entity.
*/
Optional<ShardSessionRecord> updateTransactionally(ShardIndex index, RecordUpdate update) {
Optional<ShardSessionRecord> updateTransactionally(ShardIndex index, RecordUpdate update)
throws DatastoreException {
try (TransactionWrapper tx = newTransaction()) {
Key key = key(index);
Optional<Entity> result = tx.read(key);
Expand All @@ -125,7 +129,7 @@ Optional<ShardSessionRecord> updateTransactionally(ShardIndex index, RecordUpdat
}
return updated;
} catch (DatastoreException e) {
return Optional.empty();
throw e;
} catch (RuntimeException e) {
throw newIllegalStateException(
e, "Cannot update the `ShardSessionRecord` with index `%s` in a transaction.",
Expand Down Expand Up @@ -168,7 +172,8 @@ private enum Column implements MessageColumn<ShardSessionRecord> {

worker((m) -> {
WorkerId worker = m.getWorker();
String value = worker.getNodeId().getValue() + '-' + worker.getValue();
String value = worker.getNodeId()
.getValue() + '-' + worker.getValue();
return StringValue.of(value);

}),
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@

package io.spine.server.storage.datastore;

import com.google.cloud.datastore.DatastoreException;
import com.google.protobuf.Duration;
import io.spine.logging.Logging;
import io.spine.server.NodeId;
import io.spine.server.delivery.AbstractWorkRegistry;
import io.spine.server.delivery.PickUpOutcome;
import io.spine.server.delivery.ShardIndex;
import io.spine.server.delivery.ShardProcessingSession;
import io.spine.server.delivery.ShardSessionRecord;
import io.spine.server.delivery.WorkerId;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -41,6 +42,8 @@

import static com.google.common.base.Preconditions.checkNotNull;
import static io.spine.base.Time.currentTime;
import static io.spine.server.delivery.PickUpOutcomeMixin.alreadyPicked;
import static io.spine.server.delivery.PickUpOutcomeMixin.pickedUp;

/**
* A {@link io.spine.server.delivery.ShardedWorkRegistry} based on the Google Datastore storage.
Expand Down Expand Up @@ -75,16 +78,33 @@ public DsShardedWorkRegistry(DatastoreStorageFactory factory) {
* <p>The potential concurrent access to the same record is handled by using the Datastore
* transaction mechanism. In case of any parallel executions of {@code pickUp} operation,
* the one started earlier wins.
*
* @throws DatastoreException
* if there is a problem updating an entity in Datastore. This exception may signal
* about a technical issue communicating with Datastore, or about a concurrent
* change of a corresponding entity.
*/
@Override
public synchronized Optional<ShardProcessingSession> pickUp(ShardIndex index, NodeId nodeId) {
public synchronized PickUpOutcome pickUp(ShardIndex index, NodeId nodeId)
throws DatastoreException {
checkNotNull(index);
checkNotNull(nodeId);

WorkerId worker = currentWorkerFor(nodeId);
Optional<ShardSessionRecord> result =
storage.updateTransactionally(index, new UpdateWorkerIfAbsent(index, worker));
return result.map(this::asSession);
UpdateWorkerIfAbsent updateAction = new UpdateWorkerIfAbsent(index, worker);
Optional<ShardSessionRecord> result = storage.updateTransactionally(index, updateAction);
if (result.isPresent()) {
return pickedUp(result.get());
} else {
ShardSessionRecord notUpdated = updateAction.previous()
.get();
return alreadyPicked(notUpdated.getWorker(), notUpdated.getWhenLastPicked());
}
}

@Override
public void release(ShardSessionRecord session) {
clearNode(session);
}

/**
Expand All @@ -93,11 +113,12 @@ public synchronized Optional<ShardProcessingSession> pickUp(ShardIndex index, No
*/
@Override
protected WorkerId currentWorkerFor(NodeId id) {
long threadId = Thread.currentThread().getId();
long threadId = Thread.currentThread()
.getId();
return WorkerId.newBuilder()
.setNodeId(id)
.setValue(Long.toString(threadId))
.vBuild();
.setNodeId(id)
.setValue(Long.toString(threadId))
.vBuild();
}

@Override
Expand Down Expand Up @@ -126,11 +147,6 @@ protected Optional<ShardSessionRecord> find(ShardIndex index) {
return read;
}

@Override
protected ShardProcessingSession asSession(ShardSessionRecord record) {
return new DsShardProcessingSession(record, () -> clearNode(record));
}

/**
* Obtains the session storage which persists the session records.
*/
Expand All @@ -143,11 +159,14 @@ protected DsSessionStorage storage() {
* {@link ShardIndex} if the record has not been picked by anyone.
*
* <p>If there is no such a record, creates a new record.
*
* <p>Preserves the record state before updating if the supplied record is not {@code null}.
*/
private static class UpdateWorkerIfAbsent implements DsSessionStorage.RecordUpdate {

private final ShardIndex index;
private final WorkerId workerToSet;
private ShardSessionRecord previous;

private UpdateWorkerIfAbsent(ShardIndex index, WorkerId worker) {
this.index = index;
Expand All @@ -156,8 +175,11 @@ private UpdateWorkerIfAbsent(ShardIndex index, WorkerId worker) {

@Override
public Optional<ShardSessionRecord> createOrUpdate(@Nullable ShardSessionRecord previous) {
if (previous != null && previous.hasWorker()) {
return Optional.empty();
if (previous != null) {
this.previous = previous;
if (previous.hasWorker()) {
return Optional.empty();
}
}
ShardSessionRecord.Builder builder =
previous == null
Expand All @@ -171,5 +193,14 @@ public Optional<ShardSessionRecord> createOrUpdate(@Nullable ShardSessionRecord
.vBuild();
return Optional.of(updated);
}

/**
* Returns the {@code ShardSessionRecord} state before the update is executed, or empty
* {@code Optional} if the is no previous record or
* the {@linkplain #createOrUpdate(ShardSessionRecord) createOrUpdate()} is not called yet.
*/
private Optional<ShardSessionRecord> previous() {
return Optional.ofNullable(previous);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
import com.google.protobuf.util.Timestamps;
import io.spine.base.Identifier;
import io.spine.server.NodeId;
import io.spine.server.delivery.PickUpOutcome;
import io.spine.server.delivery.ShardAlreadyPickedUp;
import io.spine.server.delivery.ShardIndex;
import io.spine.server.delivery.ShardProcessingSession;
import io.spine.server.delivery.ShardSessionRecord;
import io.spine.server.delivery.ShardedWorkRegistry;
import io.spine.server.delivery.ShardedWorkRegistryTest;
Expand Down Expand Up @@ -80,11 +81,11 @@ protected ShardedWorkRegistry registry() {
@Test
@DisplayName("pick up the shard and write a corresponding record to the storage")
void pickUp() {
Optional<ShardProcessingSession> session = registry.pickUp(index, nodeId);
PickUpOutcome outcome = registry.pickUp(index, nodeId);
WorkerId expectedWorker = registry.currentWorkerFor(nodeId);
assertThat(session).isPresent();
assertThat(session.get()
.shardIndex()).isEqualTo(index);
assertThat(outcome.hasSession()).isTrue();
assertThat(outcome.getSession()
.getIndex()).isEqualTo(index);

ShardSessionRecord record = readSingleRecord(index);
assertThat(record.getIndex()).isEqualTo(index);
Expand All @@ -95,42 +96,54 @@ void pickUp() {
@DisplayName("not be able to pick up the shard if it's already picked up")
void cannotPickUpIfTaken() {

Optional<ShardProcessingSession> session = registry.pickUp(index, nodeId);
assertThat(session).isPresent();
PickUpOutcome outcome = registry.pickUp(index, nodeId);
assertThat(outcome.hasSession()).isTrue();
ShardSessionRecord session = outcome.getSession();

Optional<ShardProcessingSession> sameIdxSameNode = registry.pickUp(index, nodeId);
assertThat(sameIdxSameNode).isEmpty();
PickUpOutcome sameIdxSameNode = registry.pickUp(index, nodeId);
assertThat(sameIdxSameNode.hasSession()).isFalse();
assertThat(sameIdxSameNode.hasAlreadyPicked()).isTrue();

Optional<ShardProcessingSession> sameIdxAnotherNode = registry.pickUp(index, newNode());
assertThat(sameIdxAnotherNode).isEmpty();
ShardAlreadyPickedUp alreadyPicked = sameIdxSameNode.getAlreadyPicked();
assertThat(alreadyPicked.getWorker()).isEqualTo(session.getWorker());

PickUpOutcome sameIdxAnotherNode = registry.pickUp(index, newNode());
assertThat(sameIdxAnotherNode.hasSession()).isFalse();
assertThat(sameIdxAnotherNode.hasAlreadyPicked()).isTrue();

ShardAlreadyPickedUp anotherAlreadyPicked = sameIdxAnotherNode.getAlreadyPicked();
assertThat(anotherAlreadyPicked.getWorker()).isEqualTo(session.getWorker());

ShardIndex anotherIdx = newIndex(24, 100);
Optional<ShardProcessingSession> anotherIdxSameNode = registry.pickUp(anotherIdx, nodeId);
assertThat(anotherIdxSameNode).isPresent();
PickUpOutcome anotherIdxSameNode = registry.pickUp(anotherIdx, nodeId);
assertThat(anotherIdxSameNode.hasSession()).isTrue();
ShardSessionRecord anotherSession = anotherIdxSameNode.getSession();

PickUpOutcome anotherIdxAnotherNode = registry.pickUp(anotherIdx, newNode());
assertThat(anotherIdxAnotherNode.hasSession()).isFalse();
assertThat(anotherIdxAnotherNode.hasAlreadyPicked()).isTrue();

Optional<ShardProcessingSession> anotherIdxAnotherNode =
registry.pickUp(anotherIdx, newNode());
assertThat(anotherIdxAnotherNode).isEmpty();
ShardAlreadyPickedUp oneMoreAnotherPicked = anotherIdxAnotherNode.getAlreadyPicked();
assertThat(oneMoreAnotherPicked.getWorker()).isEqualTo(anotherSession.getWorker());
}

@Test
@DisplayName("complete the shard session (once picked up) and make it available for picking up")
void completeSessionAndMakeItAvailable() {
Optional<ShardProcessingSession> optional = registry.pickUp(index, nodeId);
assertThat(optional).isPresent();
@DisplayName("release the shard session (once picked up) and make it available for picking up")
void releaseSessionAndMakeItAvailable() {
PickUpOutcome outcome = registry.pickUp(index, nodeId);
assertThat(outcome.hasSession()).isTrue();

Timestamp whenPickedFirst = readSingleRecord(index).getWhenLastPicked();

DsShardProcessingSession session = (DsShardProcessingSession) optional.get();
session.complete();
registry.release(outcome.getSession());

ShardSessionRecord completedRecord = readSingleRecord(index);
assertThat(completedRecord.hasWorker()).isFalse();

NodeId anotherNode = newNode();
WorkerId anotherWorker = registry.currentWorkerFor(anotherNode);
Optional<ShardProcessingSession> anotherOptional = registry.pickUp(index, anotherNode);
assertThat(anotherOptional).isPresent();
PickUpOutcome anotherOutcome = registry.pickUp(index, anotherNode);
assertThat(anotherOutcome.hasSession()).isTrue();

ShardSessionRecord secondSessionRecord = readSingleRecord(index);
assertThat(secondSessionRecord.getWorker()).isEqualTo(anotherWorker);
Expand All @@ -152,7 +165,8 @@ void notAcceptNulls() {
}

private ShardSessionRecord readSingleRecord(ShardIndex index) {
Optional<ShardSessionRecord> record = registry.storage().read(index);
Optional<ShardSessionRecord> record = registry.storage()
.read(index);
assertThat(record).isPresent();
return record.get();
}
Expand Down
4 changes: 2 additions & 2 deletions version.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,5 +36,5 @@ val cloudPubsubV1Version: String by extra("1.105.8")
val cloudTraceVersion: String by extra("2.14.0")

val spineBaseVersion: String by extra("1.9.0-SNAPSHOT.5")
val spineCoreVersion: String by extra("1.9.0-SNAPSHOT.10")
val versionToPublish: String by extra("1.9.0-SNAPSHOT.3")
val spineCoreVersion: String by extra("1.9.0-SNAPSHOT.11")
val versionToPublish: String by extra("1.9.0-SNAPSHOT.4")