Skip to content

DynamicProxyable for ReactiveCouchbaseRepository using ThreadLocal  #1838

Closed
@RittikeGhosh

Description

@RittikeGhosh

Spring Boot Version: 3.1.4
Spring Dependency Management Version: 1.1.3
Couchbase Enterprise: 7.2.2
Project Link: couchbasedynamicproxyable-demo.zip

DynamicProxyable

Support for Couchbase-specific options, scope and collections The three "with" methods will return a new proxy
instance with the specified options, scope, or collections set. The setters are called with the corresponding
options, scope and collection to set the ThreadLocal fields on the CouchbaseOperations of the repository just
before the call is made to the repository, and called again with 'null' just after the call is made. The repository
method will fetch those values to use in the call.

As mentioned in DynamicProxyable it is mentioned that, it uses ThreadLocal, is it correct in reactive paradigm ?

That's why when I am trying this in reactive programming it is not consistent rather I could say, not supporting.

I have created a demo project recreating this issue.

 var user = new User("hw" + random.nextInt(0, 100), "Hello World");
 userRepository.withScope(TENANT_SCOPE).findById(user.id())
   .doOnNext(u -> {
       throw new RuntimeException("User already Exists!");
   })
   .then(userRepository.withScope(TENANT_SCOPE).save(user))
   .subscribe(u -> log.info("User Persisted Successfully! {}", u));

This piece of code has two separate repository call to the same repository mentioning withScope().

In this example if both the repository calls happen in the same thread, then there is no issue but if the threads are different, the call fails as it fails to pick the correct scope from the ThreadLocal because it is different thread.

This is the log:

2023-10-08T11:59:51.850+05:30 DEBUG 1050611 --- [           main] s.d.c.c.ReactiveFindByIdOperationSupport : findById key=hw49 scope: tenant collection: user options: com.couchbase.client.java.kv.GetOptions@7db40fd5
2023-10-08T11:59:51.920+05:30 DEBUG 1050611 --- [   cb-io-kv-5-2] d.c.c.ReactiveUpsertByIdOperationSupport : upsertById object=User[id=hw49, name=Hello World] scope: _default collection: user options: null
2023-10-08T11:59:51.956+05:30 DEBUG 1050611 --- [   cb-io-kv-5-2] o.s.d.c.core.query.OptionsBuilder        : upsert options: {}{durabilityLevel: Optional.empty, persistTo: NONE, replicateTo: NONE, timeout: Optional.empty, retryStrategy: Optional.empty, clientContext: null, parentSpan: Optional.empty}
reactor.core.Exceptions$ErrorCallbackNotImplemented: com.couchbase.client.core.error.AmbiguousTimeoutException: UpsertRequest, Reason: TIMEOUT {"cancelled":true,"completed":true,"coreId":"0x548ece300000001","idempotent":false,"lastChannelId":"0548ECE300000001/000000006D07D16B","lastDispatchedFrom":"127.0.0.1:55462","lastDispatchedTo":"localhost:11210","reason":"TIMEOUT","requestId":7,"requestType":"UpsertRequest","retried":14,"retryReasons":["COLLECTION_MAP_REFRESH_IN_PROGRESS"],"service":{"bucket":"myService","collection":"user","documentId":"hw49","opaque":"0x10","scope":"_default","type":"kv","vbucket":583},"timeoutMs":2500,"timings":{"encodingMicros":4405,"totalMicros":2513777}}
...Stacktrace skipped
com.couchbase.client.core.error.UnambiguousTimeoutException: GetCollectionIdRequest, Reason: TIMEOUT {"cancelled":true,"completed":true,"coreId":"0x548ece300000001","idempotent":true,"lastChannelId":"0548ECE300000001/000000006D07D16B","lastDispatchedFrom":"127.0.0.1:55462","lastDispatchedTo":"localhost:11210","reason":"TIMEOUT","requestId":6,"requestType":"GetCollectionIdRequest","retried":14,"retryReasons":["KV_COLLECTION_OUTDATED"],"service":{"bucket":"myService","collection":"user","errorCode":{"description":"Operation specified an unknown collection.","name":"UNKNOWN_COLLECTION"},"opaque":"0xf","scope":"_default","type":"kv","vbucket":0},"timeoutMs":2500,"timings":{"dispatchMicros":1345,"totalDispatchMicros":23755,"totalServerMicros":0,"serverMicros":0}}
...Stacktrace skipped

As can be seen in second line of the log, the scope set is _default, which is incorrect as it should be tenant.

Instead of plain then() operator, if I try deferring it works,
.then(Mono.defer(() -> userRepository.withScope(TENANT_SCOPE).save(user).log("2")))

Any appropriate solution ? This is important for multi tenant implementation.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions