Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,16 @@ Gradle:
```kotlin
dependencies {
// Datastore Storage support library.
implementation("io.spine.gcloud:spine-datastore:1.7.0")
implementation("io.spine.gcloud:spine-datastore:1.8.0")

// Pub/Sub messaging support library.
implementation("io.spine.gcloud:spine-pubsub:1.7.0")
implementation("io.spine.gcloud:spine-pubsub:1.8.0")

// Stackdriver Trace support library.
implementation("io.spine.gcloud:spine-stackdriver-trace:1.7.0")
implementation("io.spine.gcloud:spine-stackdriver-trace:1.8.0")

// Datastore-related test utilities (if needed).
implementation("io.spine.gcloud:testutil-gcloud:1.7.0")
implementation("io.spine.gcloud:testutil-gcloud:1.8.0")
}
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.cloud.datastore.TimestampValue;
import io.spine.server.delivery.ShardIndex;
import io.spine.server.delivery.ShardSessionRecord;
import io.spine.server.delivery.WorkerId;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Iterator;
Expand Down Expand Up @@ -165,9 +166,11 @@ private enum Column implements MessageColumn<ShardSessionRecord> {
.getOfTotal());
}),

node((m) -> {
return StringValue.of(m.getPickedBy()
.getValue());
worker((m) -> {
WorkerId worker = m.getWorker();
String value = worker.getNodeId().getValue() + '-' + worker.getValue();
return StringValue.of(value);

}),

when_last_picked((m) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.spine.server.delivery.ShardIndex;
import io.spine.server.delivery.ShardProcessingSession;
import io.spine.server.delivery.ShardSessionRecord;
import io.spine.server.delivery.WorkerId;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.Iterator;
Expand Down Expand Up @@ -79,11 +80,26 @@ public DsShardedWorkRegistry(DatastoreStorageFactory factory) {
public synchronized Optional<ShardProcessingSession> pickUp(ShardIndex index, NodeId nodeId) {
checkNotNull(index);
checkNotNull(nodeId);

WorkerId worker = currentWorkerFor(nodeId);
Optional<ShardSessionRecord> result =
storage.updateTransactionally(index, new UpdateNodeIfAbsent(index, nodeId));
storage.updateTransactionally(index, new UpdateWorkerIfAbsent(index, worker));
return result.map(this::asSession);
}

/**
* Creates a worker ID by combining the given node ID with the ID of the current Java thread,
* in which the execution in performed.
*/
@Override
protected WorkerId currentWorkerFor(NodeId id) {
long threadId = Thread.currentThread().getId();
return WorkerId.newBuilder()
.setNodeId(id)
.setValue(Long.toString(threadId))
.vBuild();
}

@Override
public synchronized Iterable<ShardIndex> releaseExpiredSessions(Duration inactivityPeriod) {
return super.releaseExpiredSessions(inactivityPeriod);
Expand Down Expand Up @@ -123,24 +139,24 @@ protected DsSessionStorage storage() {
}

/**
* Updates the {@code nodeId} for the {@link ShardSessionRecord} with the specified
* Updates the {@code workerId} for the {@link ShardSessionRecord} with the specified
* {@link ShardIndex} if the record has not been picked by anyone.
*
* <p>If there is no such a record, creates a new record.
*/
private static class UpdateNodeIfAbsent implements DsSessionStorage.RecordUpdate {
private static class UpdateWorkerIfAbsent implements DsSessionStorage.RecordUpdate {

private final ShardIndex index;
private final NodeId nodeToSet;
private final WorkerId workerToSet;

private UpdateNodeIfAbsent(ShardIndex index, NodeId set) {
private UpdateWorkerIfAbsent(ShardIndex index, WorkerId worker) {
this.index = index;
nodeToSet = set;
workerToSet = worker;
}

@Override
public Optional<ShardSessionRecord> createOrUpdate(@Nullable ShardSessionRecord previous) {
if (previous != null && previous.hasPickedBy()) {
if (previous != null && previous.hasWorker()) {
return Optional.empty();
}
ShardSessionRecord.Builder builder =
Expand All @@ -150,7 +166,7 @@ public Optional<ShardSessionRecord> createOrUpdate(@Nullable ShardSessionRecord
: previous.toBuilder();

ShardSessionRecord updated =
builder.setPickedBy(nodeToSet)
builder.setWorker(workerToSet)
.setWhenLastPicked(currentTime())
.vBuild();
return Optional.of(updated);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.spine.server.delivery.ShardSessionRecord;
import io.spine.server.delivery.ShardedWorkRegistry;
import io.spine.server.delivery.ShardedWorkRegistryTest;
import io.spine.server.delivery.WorkerId;
import io.spine.testing.server.storage.datastore.TestDatastoreStorageFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -47,8 +48,6 @@
import static com.google.common.truth.Truth.assertThat;
import static com.google.common.truth.Truth8.assertThat;
import static io.spine.server.storage.datastore.given.TestShardIndex.newIndex;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@DisplayName("`DsShardedWorkRegistry` should")
class DsShardedWorkRegistryTest extends ShardedWorkRegistryTest {
Expand Down Expand Up @@ -82,60 +81,62 @@ protected ShardedWorkRegistry registry() {
@DisplayName("pick up the shard and write a corresponding record to the storage")
void pickUp() {
Optional<ShardProcessingSession> session = registry.pickUp(index, nodeId);
assertTrue(session.isPresent());
WorkerId expectedWorker = registry.currentWorkerFor(nodeId);
assertThat(session).isPresent();
assertThat(session.get()
.shardIndex()).isEqualTo(index);

ShardSessionRecord record = readSingleRecord(index);
assertThat(record.getIndex()).isEqualTo(index);
assertThat(record.getPickedBy()).isEqualTo(nodeId);
assertThat(record.getWorker()).isEqualTo(expectedWorker);
}

@Test
@DisplayName("not be able to pick up the shard if it's already picked up")
void cannotPickUpIfTaken() {

Optional<ShardProcessingSession> session = registry.pickUp(index, nodeId);
assertTrue(session.isPresent());
assertThat(session).isPresent();

Optional<ShardProcessingSession> sameIdxSameNode = registry.pickUp(index, nodeId);
assertFalse(sameIdxSameNode.isPresent());
assertThat(sameIdxSameNode).isEmpty();

Optional<ShardProcessingSession> sameIdxAnotherNode = registry.pickUp(index, newNode());
assertFalse(sameIdxAnotherNode.isPresent());
assertThat(sameIdxAnotherNode).isEmpty();

ShardIndex anotherIdx = newIndex(24, 100);
Optional<ShardProcessingSession> anotherIdxSameNode = registry.pickUp(anotherIdx, nodeId);
assertTrue(anotherIdxSameNode.isPresent());
assertThat(anotherIdxSameNode).isPresent();

Optional<ShardProcessingSession> anotherIdxAnotherNode =
registry.pickUp(anotherIdx, newNode());
assertFalse(anotherIdxAnotherNode.isPresent());
assertThat(anotherIdxAnotherNode).isEmpty();
}

@Test
@DisplayName("complete the shard session (once picked up) and make it available for picking up")
void completeSessionAndMakeItAvailable() {
Optional<ShardProcessingSession> optional = registry.pickUp(index, nodeId);
assertTrue(optional.isPresent());
assertThat(optional).isPresent();

Timestamp whenPickedFirst = readSingleRecord(index).getWhenLastPicked();

DsShardProcessingSession session = (DsShardProcessingSession) optional.get();
session.complete();

ShardSessionRecord completedRecord = readSingleRecord(index);
assertFalse(completedRecord.hasPickedBy());
assertThat(completedRecord.hasWorker()).isFalse();

NodeId anotherNode = newNode();
WorkerId anotherWorker = registry.currentWorkerFor(anotherNode);
Optional<ShardProcessingSession> anotherOptional = registry.pickUp(index, anotherNode);
assertTrue(anotherOptional.isPresent());
assertThat(anotherOptional).isPresent();

ShardSessionRecord secondSessionRecord = readSingleRecord(index);
assertThat(secondSessionRecord.getPickedBy()).isEqualTo(anotherNode);
assertThat(secondSessionRecord.getWorker()).isEqualTo(anotherWorker);

Timestamp whenPickedSecond = secondSessionRecord.getWhenLastPicked();
assertTrue(Timestamps.compare(whenPickedFirst, whenPickedSecond) < 0);
assertThat(Timestamps.compare(whenPickedFirst, whenPickedSecond) < 0).isTrue();
}

@Test
Expand Down
16 changes: 8 additions & 8 deletions license-report.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@


# Dependencies of `io.spine.gcloud:spine-datastore:1.7.0`
# Dependencies of `io.spine.gcloud:spine-datastore:1.8.0`

## Runtime
1. **Group:** com.fasterxml.jackson **Name:** jackson-bom **Version:** 2.12.0 **No license information found**
Expand Down Expand Up @@ -592,12 +592,12 @@
The dependencies distributed under several licenses, are used according their commercial-use-friendly license.


This report was generated on **Tue Dec 15 12:20:09 EET 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
This report was generated on **Thu Dec 16 16:38:53 EET 2021** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).




# Dependencies of `io.spine.gcloud:spine-pubsub:1.7.0`
# Dependencies of `io.spine.gcloud:spine-pubsub:1.8.0`

## Runtime
1. **Group:** com.google.android **Name:** annotations **Version:** 4.1.1.4
Expand Down Expand Up @@ -1003,12 +1003,12 @@ This report was generated on **Tue Dec 15 12:20:09 EET 2020** using [Gradle-Lice
The dependencies distributed under several licenses, are used according their commercial-use-friendly license.


This report was generated on **Tue Dec 15 12:20:20 EET 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
This report was generated on **Thu Dec 16 16:38:57 EET 2021** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).




# Dependencies of `io.spine.gcloud:spine-stackdriver-trace:1.7.0`
# Dependencies of `io.spine.gcloud:spine-stackdriver-trace:1.8.0`

## Runtime
1. **Group:** com.fasterxml.jackson **Name:** jackson-bom **Version:** 2.12.0 **No license information found**
Expand Down Expand Up @@ -1594,12 +1594,12 @@ This report was generated on **Tue Dec 15 12:20:20 EET 2020** using [Gradle-Lice
The dependencies distributed under several licenses, are used according their commercial-use-friendly license.


This report was generated on **Tue Dec 15 12:20:43 EET 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
This report was generated on **Thu Dec 16 16:39:04 EET 2021** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).




# Dependencies of `io.spine.gcloud:spine-testutil-gcloud:1.7.0`
# Dependencies of `io.spine.gcloud:spine-testutil-gcloud:1.8.0`

## Runtime
1. **Group:** com.fasterxml.jackson **Name:** jackson-bom **Version:** 2.12.0 **No license information found**
Expand Down Expand Up @@ -2191,4 +2191,4 @@ This report was generated on **Tue Dec 15 12:20:43 EET 2020** using [Gradle-Lice
The dependencies distributed under several licenses, are used according their commercial-use-friendly license.


This report was generated on **Tue Dec 15 12:20:48 EET 2020** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
This report was generated on **Thu Dec 16 16:39:07 EET 2021** using [Gradle-License-Report plugin](https://github.com/jk1/Gradle-License-Report) by Evgeny Naumenko, licensed under [Apache 2.0 License](https://github.com/jk1/Gradle-License-Report/blob/master/LICENSE).
52 changes: 18 additions & 34 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ all modules and does not describe the project structure per-subproject.

<groupId>io.spine.gcloud</groupId>
<artifactId>spine-gcloud-java</artifactId>
<version>1.7.0</version>
<version>1.8.0</version>

<inceptionYear>2015</inceptionYear>

Expand Down Expand Up @@ -46,7 +46,7 @@ all modules and does not describe the project structure per-subproject.
<dependency>
<groupId>io.spine</groupId>
<artifactId>spine-server</artifactId>
<version>1.7.0</version>
<version>1.8.0</version>
<scope>compile</scope>
</dependency>
<dependency>
Expand All @@ -70,7 +70,7 @@ all modules and does not describe the project structure per-subproject.
<dependency>
<groupId>io.spine</groupId>
<artifactId>spine-testutil-server</artifactId>
<version>1.7.0</version>
<version>1.8.0</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -98,44 +98,28 @@ all modules and does not describe the project structure per-subproject.
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_core</artifactId>
<version>2.4.0</version>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
<version>3.0.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>javac</artifactId>
<version>9+181-r4173-1</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protoc</artifactId>
<version>3.13.0</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>protoc-gen-grpc-java</artifactId>
<version>1.28.1</version>
</dependency>
<dependency>
<groupId>io.spine.tools</groupId>
<artifactId>spine-protoc-plugin</artifactId>
<version>1.7.0</version>
</dependency>
<dependency>
<groupId>net.sourceforge.pmd</groupId>
<artifactId>pmd-java</artifactId>
<version>6.24.0</version>
<artifactId>error_prone_annotations</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.agent</artifactId>
<version>0.8.5</version>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_type_annotations</artifactId>
<version>2.4.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.jacoco</groupId>
<artifactId>org.jacoco.ant</artifactId>
<version>0.8.5</version>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
<version>3.7.1</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Expand Down
6 changes: 3 additions & 3 deletions version.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,6 @@
* `.config/gradle/dependencies.gradle`.
*/

val spineBaseVersion: String by extra("1.7.0")
val spineCoreVersion: String by extra("1.7.0")
val versionToPublish: String by extra("1.7.0")
val spineBaseVersion: String by extra("1.8.0")
val spineCoreVersion: String by extra("1.8.0")
val versionToPublish: String by extra("1.8.0")