Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddata.AsyncUpdate replaces old data with new instead of mergin them #86

Open
vasily-kirichenko opened this issue Jan 7, 2018 · 2 comments
Labels

Comments

@vasily-kirichenko
Copy link
Contributor

I'm trying to use PNCounterDictionary:
(node 1)

let cluster = Cluster.Get system
let ddata = DistributedData.Get system
let key = PNCounterDictionaryKey "test"
    
async {
    for n in 1..10_000_000 do
        let counterKey = string (n % 10)
        do! ddata.AsyncUpdate(key, PNCounterDictionary.Empty.Increment(cluster, counterKey), Consistency.writeLocal)  
        do! Async.Sleep 500
} 
|> Async.RunSynchronously

(node 2)

let ddata = DistributedData.Get system
let key: PNCounterDictionaryKey<string> = PNCounterDictionaryKey "test"

props (
    let loop (ctx: Actor<Msg>) =
        function
        | Tick ->
            ActorTaskScheduler.RunTask (fun () ->
                async {
                    let! counters = ddata.AsyncGet(key, Consistency.readLocal)
                    ctx.Self <! GotCounter counters
                }
                |> Async.StartAsTask
                |> fun x -> x :> Task)
            ignored()
        
        | GotCounter counter ->
            logInfof ctx "Counters! %A" counter
            ctx.Schedule (TimeSpan.FromSeconds 1.0) ctx.Self Tick |> ignore
            ignored()
            
    fun (ctx: Actor<Msg>) ->
        ctx.Self <! Tick
        become (loop ctx)
) 
|> spawnAnonymous system
|> ignore

output (node 2):

[INFO][05-Jan-18 10:18:42][Thread 0019][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[6, 1]])
[INFO][05-Jan-18 10:18:43][Thread 0019][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[6, 1]])
[INFO][05-Jan-18 10:18:44][Thread 0020][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[0, 1]])
[INFO][05-Jan-18 10:18:45][Thread 0005][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[0, 1]])
[INFO][05-Jan-18 10:18:46][Thread 0019][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[4, 1]])
[INFO][05-Jan-18 10:18:47][Thread 0005][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[4, 1]])
[INFO][05-Jan-18 10:18:48][Thread 0026][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[8, 1]])
[INFO][05-Jan-18 10:18:49][Thread 0003][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[8, 1]])
[INFO][05-Jan-18 10:18:50][Thread 0019][[akka://TestSystem/user/$a#92655313]] Counters! Some (seq [[2, 1]])

As I understand, the map is replaced in every ddata.AsyncUpdate(key, PNCounterDictionary.Empty.Increment(cluster, counterKey), Consistency.writeLocal) with a new PNCounterDictionary (containing single key). I expect it to be merged with the previous map.

The questions are:

  • am I doing updating right?
  • if I am, is it a bug?

Also, it would be very useful if some ddata usage examples were added in form of docs or example snippets.

C# DData DSL works:

async {
    for n in 1..10_000_000 do
        let counterKey = string (n % 10)
        
        ddata.Replicator.Tell(
            Dsl.Update(
                key, 
                PNCounterDictionary.Empty, 
                Consistency.writeLocal, 
                fun existing -> existing.Increment(cluster, counterKey)))
       
        do! Async.Sleep 500
} 
function
| Tick ->
    ActorTaskScheduler.RunTask (fun () ->
        async {
            let! counters = ddata.AsyncGet(key, Consistency.readLocal)
            ctx.Self <! GotCounter counters
        }
        |> Async.StartAsTask
        |> fun x -> x :> Task)
    ignored()

| GotCounter counters ->
    match counters with
    | Some counters ->
        counters.Entries
        |> Seq.map (fun (KeyValue(k, v)) -> sprintf "%s -> %O" k v)
        |> String.concat "\n"
        |> logInfof ctx "Counters!\n%s"
    | None ->
        logInfof ctx "Got null counters"
        
    ctx.Schedule (TimeSpan.FromSeconds 1.0) ctx.Self Tick |> ignore
    ignored()
[INFO][05-Jan-18 12:51:20][Thread 0003][[akka://TestSystem/user/$a#1434317428]] Counters!
5 -> 65
4 -> 65
7 -> 65
6 -> 65
1 -> 65
0 -> 64
3 -> 65
2 -> 65
9 -> 64
8 -> 64
@cotyar
Copy link
Contributor

cotyar commented Jan 8, 2018

Not 100% sure it's related but found interesting behaviour on the ORSet with lmdb enabled.
The AsyncGet (and GetAsync directly - same story) doesn't load the saved state from the db but AsyncGet followed by AsyncUpdate (with returned state) followed by AsyncGet gets everything loaded.

@cotyar
Copy link
Contributor

cotyar commented Jan 10, 2018

ORMultiMap has the same behaviour - AsyncGet doesn't read the storage until AsyncUpdate.
Do I need to send some kind of a warm-up message to the replicator maybe?
Sorry if I'm missing something obvious.
Running it all with the latest Akka beta

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants