Skip to content

Commit 7053b69

Browse files
author
Nick Dolgiy
authored
Merge pull request #181 from SpineEventEngine/new-pickup-api
Implement new shard pick-up API introduced by `core-java`.
2 parents 5ccee00 + 3c03e1e commit 7053b69

File tree

5 files changed

+99
-100
lines changed

5 files changed

+99
-100
lines changed

datastore/src/main/java/io/spine/server/storage/datastore/DsSessionStorage.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -99,17 +99,21 @@ public void write(ShardSessionRecord message) {
9999
*
100100
* <p>Returns the updated record if the update succeeded.
101101
*
102-
* <p>Returns {@code Optional.empty()} if the update could not be executed, either because
103-
* the rules of the passed {@code RecordUpdate} prevented it, or due to a concurrent changes
104-
* which have happened to the corresponding Datastore entity.
102+
* <p>Returns {@code Optional.empty()} if the update could not be executed, because
103+
* the rules of the passed {@code RecordUpdate} prevented it.
105104
*
106105
* @param index
107106
* index of a record to execute an update for
108107
* @param update
109108
* an update to perform
110109
* @return a modified record, or {@code Optional.empty()} if the update could not be executed
110+
* @throws DatastoreException
111+
* if there is a problem communicating with Datastore, or if the entity could not
112+
* be updated due to a concurrent changes which have happened to the corresponding
113+
* Datastore entity.
111114
*/
112-
Optional<ShardSessionRecord> updateTransactionally(ShardIndex index, RecordUpdate update) {
115+
Optional<ShardSessionRecord> updateTransactionally(ShardIndex index, RecordUpdate update)
116+
throws DatastoreException {
113117
try (TransactionWrapper tx = newTransaction()) {
114118
Key key = key(index);
115119
Optional<Entity> result = tx.read(key);
@@ -125,7 +129,7 @@ Optional<ShardSessionRecord> updateTransactionally(ShardIndex index, RecordUpdat
125129
}
126130
return updated;
127131
} catch (DatastoreException e) {
128-
return Optional.empty();
132+
throw e;
129133
} catch (RuntimeException e) {
130134
throw newIllegalStateException(
131135
e, "Cannot update the `ShardSessionRecord` with index `%s` in a transaction.",
@@ -168,7 +172,8 @@ private enum Column implements MessageColumn<ShardSessionRecord> {
168172

169173
worker((m) -> {
170174
WorkerId worker = m.getWorker();
171-
String value = worker.getNodeId().getValue() + '-' + worker.getValue();
175+
String value = worker.getNodeId()
176+
.getValue() + '-' + worker.getValue();
172177
return StringValue.of(value);
173178

174179
}),

datastore/src/main/java/io/spine/server/storage/datastore/DsShardProcessingSession.java

Lines changed: 0 additions & 51 deletions
This file was deleted.

datastore/src/main/java/io/spine/server/storage/datastore/DsShardedWorkRegistry.java

Lines changed: 47 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@
2626

2727
package io.spine.server.storage.datastore;
2828

29+
import com.google.cloud.datastore.DatastoreException;
2930
import com.google.protobuf.Duration;
3031
import io.spine.logging.Logging;
3132
import io.spine.server.NodeId;
3233
import io.spine.server.delivery.AbstractWorkRegistry;
34+
import io.spine.server.delivery.PickUpOutcome;
3335
import io.spine.server.delivery.ShardIndex;
34-
import io.spine.server.delivery.ShardProcessingSession;
3536
import io.spine.server.delivery.ShardSessionRecord;
3637
import io.spine.server.delivery.WorkerId;
3738
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -41,6 +42,8 @@
4142

4243
import static com.google.common.base.Preconditions.checkNotNull;
4344
import static io.spine.base.Time.currentTime;
45+
import static io.spine.server.delivery.PickUpOutcomeMixin.alreadyPicked;
46+
import static io.spine.server.delivery.PickUpOutcomeMixin.pickedUp;
4447

4548
/**
4649
* A {@link io.spine.server.delivery.ShardedWorkRegistry} based on the Google Datastore storage.
@@ -75,16 +78,33 @@ public DsShardedWorkRegistry(DatastoreStorageFactory factory) {
7578
* <p>The potential concurrent access to the same record is handled by using the Datastore
7679
* transaction mechanism. In case of any parallel executions of {@code pickUp} operation,
7780
* the one started earlier wins.
81+
*
82+
* @throws DatastoreException
83+
* if there is a problem updating an entity in Datastore. This exception may signal
84+
* about a technical issue communicating with Datastore, or about a concurrent
85+
* change of a corresponding entity.
7886
*/
7987
@Override
80-
public synchronized Optional<ShardProcessingSession> pickUp(ShardIndex index, NodeId nodeId) {
88+
public synchronized PickUpOutcome pickUp(ShardIndex index, NodeId nodeId)
89+
throws DatastoreException {
8190
checkNotNull(index);
8291
checkNotNull(nodeId);
8392

8493
WorkerId worker = currentWorkerFor(nodeId);
85-
Optional<ShardSessionRecord> result =
86-
storage.updateTransactionally(index, new UpdateWorkerIfAbsent(index, worker));
87-
return result.map(this::asSession);
94+
UpdateWorkerIfAbsent updateAction = new UpdateWorkerIfAbsent(index, worker);
95+
Optional<ShardSessionRecord> result = storage.updateTransactionally(index, updateAction);
96+
if (result.isPresent()) {
97+
return pickedUp(result.get());
98+
} else {
99+
ShardSessionRecord notUpdated = updateAction.previous()
100+
.get();
101+
return alreadyPicked(notUpdated.getWorker(), notUpdated.getWhenLastPicked());
102+
}
103+
}
104+
105+
@Override
106+
public void release(ShardSessionRecord session) {
107+
clearNode(session);
88108
}
89109

90110
/**
@@ -93,11 +113,12 @@ public synchronized Optional<ShardProcessingSession> pickUp(ShardIndex index, No
93113
*/
94114
@Override
95115
protected WorkerId currentWorkerFor(NodeId id) {
96-
long threadId = Thread.currentThread().getId();
116+
long threadId = Thread.currentThread()
117+
.getId();
97118
return WorkerId.newBuilder()
98-
.setNodeId(id)
99-
.setValue(Long.toString(threadId))
100-
.vBuild();
119+
.setNodeId(id)
120+
.setValue(Long.toString(threadId))
121+
.vBuild();
101122
}
102123

103124
@Override
@@ -126,11 +147,6 @@ protected Optional<ShardSessionRecord> find(ShardIndex index) {
126147
return read;
127148
}
128149

129-
@Override
130-
protected ShardProcessingSession asSession(ShardSessionRecord record) {
131-
return new DsShardProcessingSession(record, () -> clearNode(record));
132-
}
133-
134150
/**
135151
* Obtains the session storage which persists the session records.
136152
*/
@@ -143,11 +159,14 @@ protected DsSessionStorage storage() {
143159
* {@link ShardIndex} if the record has not been picked by anyone.
144160
*
145161
* <p>If there is no such a record, creates a new record.
162+
*
163+
* <p>Preserves the record state before updating if the supplied record is not {@code null}.
146164
*/
147165
private static class UpdateWorkerIfAbsent implements DsSessionStorage.RecordUpdate {
148166

149167
private final ShardIndex index;
150168
private final WorkerId workerToSet;
169+
private ShardSessionRecord previous;
151170

152171
private UpdateWorkerIfAbsent(ShardIndex index, WorkerId worker) {
153172
this.index = index;
@@ -156,8 +175,11 @@ private UpdateWorkerIfAbsent(ShardIndex index, WorkerId worker) {
156175

157176
@Override
158177
public Optional<ShardSessionRecord> createOrUpdate(@Nullable ShardSessionRecord previous) {
159-
if (previous != null && previous.hasWorker()) {
160-
return Optional.empty();
178+
if (previous != null) {
179+
this.previous = previous;
180+
if (previous.hasWorker()) {
181+
return Optional.empty();
182+
}
161183
}
162184
ShardSessionRecord.Builder builder =
163185
previous == null
@@ -171,5 +193,14 @@ public Optional<ShardSessionRecord> createOrUpdate(@Nullable ShardSessionRecord
171193
.vBuild();
172194
return Optional.of(updated);
173195
}
196+
197+
/**
198+
* Returns the {@code ShardSessionRecord} state before the update is executed, or empty
199+
* {@code Optional} if the is no previous record or
200+
* the {@linkplain #createOrUpdate(ShardSessionRecord) createOrUpdate()} is not called yet.
201+
*/
202+
private Optional<ShardSessionRecord> previous() {
203+
return Optional.ofNullable(previous);
204+
}
174205
}
175206
}

datastore/src/test/java/io/spine/server/storage/datastore/DsShardedWorkRegistryTest.java

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,9 @@
3131
import com.google.protobuf.util.Timestamps;
3232
import io.spine.base.Identifier;
3333
import io.spine.server.NodeId;
34+
import io.spine.server.delivery.PickUpOutcome;
35+
import io.spine.server.delivery.ShardAlreadyPickedUp;
3436
import io.spine.server.delivery.ShardIndex;
35-
import io.spine.server.delivery.ShardProcessingSession;
3637
import io.spine.server.delivery.ShardSessionRecord;
3738
import io.spine.server.delivery.ShardedWorkRegistry;
3839
import io.spine.server.delivery.ShardedWorkRegistryTest;
@@ -80,11 +81,11 @@ protected ShardedWorkRegistry registry() {
8081
@Test
8182
@DisplayName("pick up the shard and write a corresponding record to the storage")
8283
void pickUp() {
83-
Optional<ShardProcessingSession> session = registry.pickUp(index, nodeId);
84+
PickUpOutcome outcome = registry.pickUp(index, nodeId);
8485
WorkerId expectedWorker = registry.currentWorkerFor(nodeId);
85-
assertThat(session).isPresent();
86-
assertThat(session.get()
87-
.shardIndex()).isEqualTo(index);
86+
assertThat(outcome.hasSession()).isTrue();
87+
assertThat(outcome.getSession()
88+
.getIndex()).isEqualTo(index);
8889

8990
ShardSessionRecord record = readSingleRecord(index);
9091
assertThat(record.getIndex()).isEqualTo(index);
@@ -95,42 +96,54 @@ void pickUp() {
9596
@DisplayName("not be able to pick up the shard if it's already picked up")
9697
void cannotPickUpIfTaken() {
9798

98-
Optional<ShardProcessingSession> session = registry.pickUp(index, nodeId);
99-
assertThat(session).isPresent();
99+
PickUpOutcome outcome = registry.pickUp(index, nodeId);
100+
assertThat(outcome.hasSession()).isTrue();
101+
ShardSessionRecord session = outcome.getSession();
100102

101-
Optional<ShardProcessingSession> sameIdxSameNode = registry.pickUp(index, nodeId);
102-
assertThat(sameIdxSameNode).isEmpty();
103+
PickUpOutcome sameIdxSameNode = registry.pickUp(index, nodeId);
104+
assertThat(sameIdxSameNode.hasSession()).isFalse();
105+
assertThat(sameIdxSameNode.hasAlreadyPicked()).isTrue();
103106

104-
Optional<ShardProcessingSession> sameIdxAnotherNode = registry.pickUp(index, newNode());
105-
assertThat(sameIdxAnotherNode).isEmpty();
107+
ShardAlreadyPickedUp alreadyPicked = sameIdxSameNode.getAlreadyPicked();
108+
assertThat(alreadyPicked.getWorker()).isEqualTo(session.getWorker());
109+
110+
PickUpOutcome sameIdxAnotherNode = registry.pickUp(index, newNode());
111+
assertThat(sameIdxAnotherNode.hasSession()).isFalse();
112+
assertThat(sameIdxAnotherNode.hasAlreadyPicked()).isTrue();
113+
114+
ShardAlreadyPickedUp anotherAlreadyPicked = sameIdxAnotherNode.getAlreadyPicked();
115+
assertThat(anotherAlreadyPicked.getWorker()).isEqualTo(session.getWorker());
106116

107117
ShardIndex anotherIdx = newIndex(24, 100);
108-
Optional<ShardProcessingSession> anotherIdxSameNode = registry.pickUp(anotherIdx, nodeId);
109-
assertThat(anotherIdxSameNode).isPresent();
118+
PickUpOutcome anotherIdxSameNode = registry.pickUp(anotherIdx, nodeId);
119+
assertThat(anotherIdxSameNode.hasSession()).isTrue();
120+
ShardSessionRecord anotherSession = anotherIdxSameNode.getSession();
121+
122+
PickUpOutcome anotherIdxAnotherNode = registry.pickUp(anotherIdx, newNode());
123+
assertThat(anotherIdxAnotherNode.hasSession()).isFalse();
124+
assertThat(anotherIdxAnotherNode.hasAlreadyPicked()).isTrue();
110125

111-
Optional<ShardProcessingSession> anotherIdxAnotherNode =
112-
registry.pickUp(anotherIdx, newNode());
113-
assertThat(anotherIdxAnotherNode).isEmpty();
126+
ShardAlreadyPickedUp oneMoreAnotherPicked = anotherIdxAnotherNode.getAlreadyPicked();
127+
assertThat(oneMoreAnotherPicked.getWorker()).isEqualTo(anotherSession.getWorker());
114128
}
115129

116130
@Test
117-
@DisplayName("complete the shard session (once picked up) and make it available for picking up")
118-
void completeSessionAndMakeItAvailable() {
119-
Optional<ShardProcessingSession> optional = registry.pickUp(index, nodeId);
120-
assertThat(optional).isPresent();
131+
@DisplayName("release the shard session (once picked up) and make it available for picking up")
132+
void releaseSessionAndMakeItAvailable() {
133+
PickUpOutcome outcome = registry.pickUp(index, nodeId);
134+
assertThat(outcome.hasSession()).isTrue();
121135

122136
Timestamp whenPickedFirst = readSingleRecord(index).getWhenLastPicked();
123137

124-
DsShardProcessingSession session = (DsShardProcessingSession) optional.get();
125-
session.complete();
138+
registry.release(outcome.getSession());
126139

127140
ShardSessionRecord completedRecord = readSingleRecord(index);
128141
assertThat(completedRecord.hasWorker()).isFalse();
129142

130143
NodeId anotherNode = newNode();
131144
WorkerId anotherWorker = registry.currentWorkerFor(anotherNode);
132-
Optional<ShardProcessingSession> anotherOptional = registry.pickUp(index, anotherNode);
133-
assertThat(anotherOptional).isPresent();
145+
PickUpOutcome anotherOutcome = registry.pickUp(index, anotherNode);
146+
assertThat(anotherOutcome.hasSession()).isTrue();
134147

135148
ShardSessionRecord secondSessionRecord = readSingleRecord(index);
136149
assertThat(secondSessionRecord.getWorker()).isEqualTo(anotherWorker);
@@ -152,7 +165,8 @@ void notAcceptNulls() {
152165
}
153166

154167
private ShardSessionRecord readSingleRecord(ShardIndex index) {
155-
Optional<ShardSessionRecord> record = registry.storage().read(index);
168+
Optional<ShardSessionRecord> record = registry.storage()
169+
.read(index);
156170
assertThat(record).isPresent();
157171
return record.get();
158172
}

version.gradle.kts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,5 +36,5 @@ val cloudPubsubV1Version: String by extra("1.105.8")
3636
val cloudTraceVersion: String by extra("2.14.0")
3737

3838
val spineBaseVersion: String by extra("1.9.0-SNAPSHOT.5")
39-
val spineCoreVersion: String by extra("1.9.0-SNAPSHOT.10")
40-
val versionToPublish: String by extra("1.9.0-SNAPSHOT.3")
39+
val spineCoreVersion: String by extra("1.9.0-SNAPSHOT.11")
40+
val versionToPublish: String by extra("1.9.0-SNAPSHOT.4")

0 commit comments

Comments
 (0)