The purpose of this doc is to supplement the code in the crdt-example-app
with notes that help describe how things work. While the example app uses a todo list as a use case, the real focus is on demonstrating the underlying distributed database and CRDT concepts; that is also the focus of these notes.
In a nutshell, the UI sits on top of a distributed database that is kept in sync using CRDT techniques:
- CRUD operations do not modify an underlying data store directly as would be the case with a typical database.
- Instead, when a change is made to a specific field, a message is created that logs which field was changed, what the new value is for that field, and when the change happened,.
- The "when" is a hybrid logical clock timestamp.
- Each agent maintains its own collection of these messages; in effect, they create an "operation journal / log".
- An accompanying data structure, a merkle tree, is also maintained that makes it possible to quickly compare different message collections and figure out (roughly) which messages need to be exchanged to sync the collections.
- The merkle tree only stores what it needs to answer the question "at what time did the collections begin to have different messages?": time (as keys) and hashes (as values) made from all known messages at those times.
- These messages are shared among agents, including a server agent that just acts as a centralized "message buffer" for syncing agents that can't connect directly.
- When an agent (A₁) syncs with another agent (A₂):
- A₁ sends its own merkle tree (JSON) to A₂ (which could be a server).
- A₂ can compare the incoming merkle tree to its own tree to quickly establish a rough time at which the trees started to differ (T₁).
- A₂ can then send all of its messages that have a timestamps >= T₁.
- A₁ receives these messages from A₂ and applies them to its local database.
- Note that it's possible that it will apply some messages that it already has--that's ok. The main goal of the merkle tree approach is to quickly figure out which messages need to be exchanged based if they were created after some time, not the exact messages that need to be exchanged (which could result in an expensive process of lookig up a lot of individual messages by some identifier before exchanging them).
- If, in the set of incoming messages, there is more than one message targeting the same field, only the most recent one (as determined from the message's timestamp) is used to update the field value.
- Each message timestamp is based on a hybrid logical clock.
-
This combines a "physical" time (i.e., a normal-looking date/time) and a counter that can never go backwards (a.k.a., a "monotonic" clock)
-
HLC timestamps make it possible to determine the order of events amongst a group of agents whose physical clocks might not be in sync (i.e., the goal is to establish causality--A happened before B--NOT the actual time things happened).
-
- Each message timestamp is based on a hybrid logical clock.
-
When UI widgest are used to create/edit/delete todos, the following
db.*
functions are called with data params. Example:db.insert('todos', { name, type, order: getNumTodos() }); db.update('todos', { id: uiState.editingTodo.id, name: value }); db.delete_('todos', e.target.dataset.id);
-
The
db.*
method creates one or more messages from the object that is passed in and then "sends" them:// db.js function update(table, params) { let fields = Object.keys(params).filter(k => k !== 'id'); sync.sendMessages( fields.map(k => { return { dataset: table, row: params.id, column: k, value: params[k], // Note that every message we create/send gets its own, globally- // unique timestamp. In effect, there is a 1-1 relationship between // the timestamp and this specific message. timestamp: Timestamp.send(getClock()).toString() }; }) ); }
-
sync.sendMessages()
applies the message to the local data store first, then attempts to initiate a sync.// sync.js function sendMessages(messages) { applyMessages(messages); sync(messages); }
-
sync.applyMessages()
looks at each incoming message (which will be aimed at changing data for a specific dataset + row + field) and does the following:
- Do we have any local messages for the same field?
- If so, is the incoming message for that field newer than ours?
- If so,
apply()
the incoming message's value for the specified dataset/row/field to our own datastore.
- If so,
- If so, is the incoming message for that field newer than ours?
- Do we already have a copy of this message in our local store of messages and merkle tree?
- If not, add it.
The main goal with clocks as they pertain to distributed databases is to be able to order events. In other words, knowing which event was the last to happen is more of a concern than knowing exactly when they happened, because in this context, the question being asked is: what is the most recent message that set a value for a field?
A logical clock is a mechanism that makes it possible to determine "one-way causality": if "A happened before B". It cannot, however, determine if "B happened after A". (Note that two-way causality is supported by a logical clock variant called a "vector clock.")
Commonly used in distributed systems where actual clocks may not be in sync
- Main goal is to be able to determine the "causal relationship" between events (i.e., A happened before B), NOT the actual time things happened.
The Lamport clock was the first implementation of a logical clock. It is basically just a counter that is shared among all nodes/processes in a distributed system. The counter gets incremented every time:
- A process event occurs (e.g., some data is updated)
- A process receives message with counter from another process.
- i.e., when nodes exchanges messages, the receiver re-synchronizes its logical clock with that sender
In other words, every node/process maintains its own counter/timestamp and always ensures that it re-synchronizes with the counter when messages are received from other nodes. Specifically, it'll make sure to reset its own counter to whichever counter is greater--possibly the one from another node--and then increment.
A Lamport timestamp is just a "monotonically" increasing (i.e., never decreasing) counter.
// Sending is an event. Any time an event happens, ensure time moves forward (i.e., increment the timestamp/counter)
time = time + 1
// Send the message with the incremented timestamp
send(message, time)
// Receiving is an event. Any time an event happens, ensure time moves forward
function receive(message, time_stamp) {
// We'll always increment time, but if the sender's timestamp is greater than ours, use that as the new basis. This is
// how we ensure that the counter is moving forward throughout the distributed system.
time = max(time_stamp, time) + 1;
}
An HLC combines both a physical and logical clock. It was designed to provide one-way (as with LC rather than VC) causality detection while maintaining a clock value close to the physical clock, so one can use HLC timestamp as a drop-in replacement for a physical clock timestamp. Rules:
- Each node maintain its own monotonic counter,
c
(just like with logical clocks) - Each node keeps track of the largest physical time it has encountered so far
- this is called the "logical" time (
l
) - When a message is received:
- The receiving node updates its own logical clock to ensure that it moves forward by picking whichever of the following is greater:
a. the current physical time (e.g.,
Date.now()
), or b. the logical time stored in the message - If the logical times are all equal, increment the counter (c
)
In other words, if the physical clocks on all nodes are in perfect sync, then the counter is the only thing that gets incremented each time a message is received. However, it's more common that a node is always going to reset its logical time and counter each time a message is received.
- Extremely helpful: http://sergeiturukin.com/2017/06/26/hybrid-logical-clocks.html\
- MongoDB 3.6 (released ~2017) uses HLC's and "oplogs" (i.e., a log of operations, much like messages in this app): https://www.mongodb.com/blog/post/transactions-background-part-4-the-global-logical-clock
- https://www.youtube.com/watch?v=CMBjvCzDVkY
- Creates
- loads the following:
murmurhash.js
: a library for generating hashes quickly (not crypto-grade).uuidv4
: library for genearting v4 UUIDsshared/timestamp.js
:shared/merkle.js
clock.js
sync.js
db.js
main.js
This file exposes functions for creating/getting/setting the singleton app "clock" object (a timestamp, really, but we called a "clock" because it will periodically be updated when events occur and continues to move "forward" in time). The "clock" object consists of two things:
- a mutable timestamp
- a merkle tree
Note that there are also functions for serializing/deserializing clock objects (i.e., to/from JSON).
- Initializes the "clock" object when loaded (by calling
clock.js:setClock()
).- This is just an object with two props: a
MutableTimestamp
and a merkle tree - When we talk about the clock, we're really talking about the
MutableTimestamp
in this object - It's more like a counter... It gets "incremented" every time a message is sent or received
- This is just an object with two props: a
-
Creates a
uiState
variable:- offline: false,
- editingTodo: null,
- isAddingType: false,
- isDeletingType: false
-
render()
- Uses
append()
to insert HTML into - Renders todos and deleted todos
db.js:getTodos()
anddb.js:getDeletedTodos()
return todos from in-memory array
- if
uiState.editingTodo
, renders HTML for editing todo - if
uiState.isAddingType
, renders HTML for adding new todo - if
uiState.isDeletingType
, renders HTML for adding new todo - Sets up event listeners
#add-form
submit#btn-sync
click`#btn-offline-simulate
click- etc.
- Uses
-
Registers
onSync()
callback- Every time
sync.js:applyMessages()
finishes, it will trigger callback - callback just re-renders all the HTML
- Every time
-
Calls
sync.js:sync()
to start the first sync, then- If there aren't any todo types after the sync, it creates some default ones via
db.js:insertTodoType()
- If there aren't any todo types after the sync, it creates some default ones via
-
Sets up timer to call
sync()
every 4 seconds- 👉 Note that this initializes the clock:
setClock(makeClock(new Timestamp(0, 0, makeClientId())));
makeClientId()
is just part of a UUID (specifically, the last 16 chars).- UUID:
37c2877f-fbf4-40f3-bdb7-87f4536dc989
- client ID:
bdb7-87f4536dc989
(without the hyphen)
- UUID:
- 👉 Note that this initializes the clock:
Defines Timestamp
and MutableTimestamp
classes. Comprised of millis
, counter
, and node
.
The stringified timestamps are FIXED LENGTH in the format <date/time>-<counter>-<client ID>
, where:
-
<date/time>
is ISO 8601 date string viaDate.toISOString()
-
<counter>
is a hexadecimal encoded version of the counter, always 4 chars in length- ensuring that we never have more that 4 chars means there is a limit to how big the counter can be: 65535.
- (65533).toString(16) -> fffd (4 chars)
- (65534).toString(16) -> fffe
- (65535).toString(16) -> ffff
- (65536).toString(16) -> 10000 -- oops, this is 5 chars
-
<client ID>
is the last 16 chars of a UUID (with hyphen removed):- UUID:
xxxxxxxx-xxxx-xxxx-bdb7-87f4536dc989
, client/node:bdb787f4536dc989
- UUID:
-
millis
: milliseconds- the Timestamp used to init the clock at startup has this set to
0
Timestamp.parse()
sets this to elapsed msecs since 1/1/70 (e.g., when receiving a message)
- the Timestamp used to init the clock at startup has this set to
-
counter
- the Timestamp used to init the clock at startup has this set to
0
- the Timestamp used to init the clock at startup has this set to
-
node
- identifies the client, or node, that created the timestamp
Examples:
2020-02-02T16:29:22.946Z-0000-97bf28e64e4128b0
(millis = 1580660962946, counter = 0, node = 97bf28e64e4128b0)2020-02-02T16:30:12.281Z-0001-bc5fd821dc0e3653
(millis = 1580661012281, counter = 1, node = bc5fd821dc0e3653)- Note that
<ISO 8601 date string>
is viaDate.toISOString()
- Note that
Timestamp.send(clock)
- This function is used to create a new timestamp every time a message is sent (i.e., every time a database CRUD operation causes a new message to be created/sent)
- Creates/returns a new
Timestamp
using theclock
arg.
This file exposes functions that resemble a database API. It sets up a couple of global variables that are in-memory data stores for messages and todo objects, and creates global functions for CRUD operations on those stores. In a more realistic app, one might use something like IndexedDB or SQLite as the underlying storage mechanism.
Each data store is comparable to a database table:
todo
: an array of{ name: string, type: string, order: number }
objectstodoTypes
: an array of{ name: string, color: string }
objectstodoTypeMappings
: an array of{ id: <typeId>, targetId: <typeId>} }
objects
GET functions all return objects from the various in-memory arrays:
getTodos()
returns_data.todos
(filters.tombstone !== true
)getDeletedTodos()
returns_data.todos
(filters.tombstone === true
)getTodoTypes()
returns_data.todoTypes
(filters.tombstone !== true
)- etc.
INSERT/UPDATE functions don't modify the in-memory stores; instead, they create and send a message for each property/value pair of the object being inserted/updated/deleted.
// For example, inserting the following to-do object:
{
"name": "Make dinner",
// This is an ID that points to the 'Personal todo' type
"type": "570694fc-6e30-496a-8a37-95ab5bec0311",
"order": 5
}
// Results in code like this running in db.insert():
id = uuidv4(); // Comparable to creating our own primary key if it were an RDBMS,
sendMessages([{
dataset: 'todos',
row: id
column: 'name',
value: 'Make dinner',
// Timestamp.send() returns a new, "next" time each time it's called, and since each time is unique, this basically
// means that every message gets its own, globally-unique timestamp. In other words: there is a 1-1 relationship
// between each message and its HLC timestamp.
timestamp: Timestamp.send(getClock()).toString()
},{
dataset: 'todos',
type: '570694fc-6e30-496a-8a37-95ab5bec0311',
row: id,
column: 'type',
value: '570694fc-6e30-496a-8a37-95ab5bec0311',
timestamp: Timestamp.send(getClock()).toString()
}, ...])
// Which results in JSON messages like this:
[
{
"dataset": "todos",
"row": "5a9c7c59-3a73-455c-8c5b-49a03a09c852",
"column": "name",
"value": "Make dinner",
"timestamp": "2020-02-09T20:28:21.212Z-0000-87854eaf99288a48"
},
{
"dataset": "todos",
"row": "5a9c7c59-3a73-455c-8c5b-49a03a09c852",
"column": "type",
"value": "570694fc-6e30-496a-8a37-95ab5bec0311",
"timestamp": "2020-02-09T20:28:21.212Z-0001-87854eaf99288a48"
},
{
"dataset": "todos",
"row": "5a9c7c59-3a73-455c-8c5b-49a03a09c852",
"column": "order",
"value": 4,
"timestamp": "2020-02-09T20:28:21.212Z-0002-87854eaf99288a48"
}
]
In other words, when inserting a new object:
- the object gets a unique ID (UUID).
- This is comparable to an auto-incremented ROWID in a database table.
- A message is sent for setting each key/value pair in the object
- This is comparable to issuing a SET statement for each column in the table
- Each message gets a unique timestamp (i.e., unique because it is an HLC
timestamp that includes the current node's UUID).
- In effect, the timestamp is a unique identifier for the message.
DELETING means setting the tombstone
column to true:
// Delete the "Groceries" type
sendMessages([{
dataset: 'todoTypes', row: id, column: 'tombstone', value: 1, timestamp: Timestamp.send(getClock()).toString()
}])
Every message includes a timestamp generated via Timestamp.send(getClock()).toString()
.
Timestamp.send()
: generates a unique, "monotonic" timestamp as a stringgetClock()
In a nutshell, the application uses a merkle tree to quickly figure out if two clients have the same collection of operation log messages (e.g., a message saying a specific table -> row -> column should be set to some value).
Each client maintains its own merkle tree. The merkle tree is a hierarchy of nodes, where each node contains a string value: a hash made from the hashes of its children (i.e., a "rolling hash" that is derived from all its children). This includes the root node, which means the root hash for two merkle trees will be the same only if the same set of messages have been inserted into both trees. You can tell if two clients have the same messages just by comparing the root hashes of their merkle trees.
Every time a field in the data store is changed, a message (oplog entry) is created with a corresponding HLC timestamp. Whenever a client "applies" a data change message, it needs to also add that message's HLC timestamp to its merkle tree. This is how the client maintains a rolling hash of all the messages it has encountered.
When a message is inserted into the merkle tree, its HLC timestamp's physical time (minutes since 1970--see the section on "merkle tree keys/paths" to understand why minutes are used instead of seconds or milliseconds) is used as the "path" for navigating down the tree and inserting the new "leaf node" with the hash. That new node might be inserted several levels down in the tree; the intermediate node at each level (including at the root node) will have its rolling hash updated with new hash made from the current value and the new node's hash.
Note: the merkle tree depends on hashing being commutative (
hash(C, hash(A, B)) === hash(A, hash(B, C))
) so that the order in which things are inserted doesn't matter. In other words, merkle trees on two different clients will eventually have the same root hash once the same things are inserted into both trees.
Taking a step back, consider that a really simple way to tell if two clients have the same messages would be: each client maintains a "rolling" hash of their messages. Each time a new message is added to the log, the rolling hash would be re-calculated by combining it with a hash of the new message. To tell if the clients have the same messages, you'd just compare their rolling hash values. However, this only tells you if the clients have encountered the same messages (i.e., if their rolling hashes were derived from the same set of message hashes).
The merkle tree used by this app indexes rolling hashes of "known messages" by the times for those messages. This means you can quickly compare two merkle trees, and if they differ, find the most recent "message time" when they were the same.
Knowing when the collections began to differ (i.e., that one node has messages after that time which the other node lacks) makes it possible for a more efficient sync to be done by exchanging only the messages that occurred after that time.
Note: the merkle tree doesn't store hashes of full oplog messages--only the timestamp is used for the hash (i.e.,
Timestamp.hash()
. But since each message's timestamp is unique (they incorporate time, a counter, and a UUID), the timestamp becomes a unique identifier for the message.
As previously stated, the keys for the merkle tree are the times for each message in "minutes since 1970." The minutes are base-3 encoded, so numbers only consist of digits 0, 1, or 2 (also, note that the base-3 encoded minutes are converted to strings). This means that you end up with keys like "1211121022121121".
Note: you could use a more precise unit of time (e.g., milliseconds instead of minutes since 1970). However, a more precise time means a bigger number, which means a longer string, which means more nodes and hashes in the merkle tree. In other words, more precision means a bigger data structure. That may be necessary in an application where the syncing performance is greatly improved if the "time of divergence" can be more precise (e.g., an application that creates many oplog entries per minute/second/msec). But for most applications whose data changes less frequently, being able to find the minute at which collections began to differ is good enough and helps keep the tree smaller and can make the diffing algorithm faster.
Each character in the string is used to access the next child node. In other words, each node in this application's merkle tree is an object with 1-4 things:
- A
hash
property. This is a hash of the Timestamp (as calculated byTimestamp.hash()
). - (maybe) a
"0"
property referencing a child node - (maybe) a
"1"
property referencing a child node - (maybe) a
"2"
property referencing a child node
This means that each node can have, at most, 3 children. In other words, this is a ternary tree structure (vs. a binary tree, for example).
If you visualize each node's children as being sorted from left-to-right, this means a tree that might look something like this:
Root
┌─────────────────────────┼─────────────────────────┐
0 1 2
┌────────┼───────┐ ┌────────┼───────┐ ┌────────┼───────┐
0 1 2 0 1 2 0 1 2
┌──┼──┐ ┌──┼──┐ ┌──┼──┐ ┌──┼──┐ ┌──┼──┐ ┌──┼──┐ ┌──┼──┐ ┌──┼──┐ ┌──┼──┐
0 1 2 0 1 2 0 1 2 0 1 2 0 1 2 0 1 2 0 1 2 0 1 2 0 1 2
For an over-simplified example, a "time" (in base-3) like 120 becomes "120". So first you'd use '1' to get the first node, then '2' to get the next node, and finally '0' to get the last node.
Note: the minutes could use any base (e.g., it could be in hex) as long as the individual characters in the stringified version of a key can be sorted/ordered (which is a key part of the
merkle.js:diff()
function).
It's important that whatever number system is used for the keys (e.g., base-10, base-16, etc.), the "stringified" version of the keys should naturally sort in the same order as the times they represent. In other words, when the keys are converted to strings, it should be possible to sort them chronologically.
If each "step" (i.e., character) in the tree path uses a characters system where and "smaller" values represent older times and "greater" represents more recent times, then the overall merkle tree will be ordered and it's possible to more efficiently find the last time when two trees were "equal."
If time starts at t₀ (000), then the path to the first time is the branch furthest to the right. As the integer for time increases, you're basically moving from left to right. Use the diagram above to walk through each of the paths for the following sequence of time (which is basically counting up in base-3):
- t₀ = 000
- t₁ = 001
- t₂ = 002
- t₃ = 010
- t₄ = 011
- t₅ = 012
- t₆ = 020
The merkle.js:insertKey()
function implements the "insert" operation. Here's an example that shows how it works
(where h(...)
is shorthand for "hash of ..."):
- Two clients #1 and #2 both record a message timestamp: { time: '000', hash: A }
root: h(A)
│
0: h(A)
│
0: h(A)
│
0: h(A)
- Both clients record a message timestamp: { time: '101', hash: B }
root: h(A,B)
│
0: h(A,B)
┌──────┴─────┐
0: h(A) 1: h(B)
│ │
0: h(A) 0: h(B)
- Both clients record a message timestamp: { time: '011', hash: C }
root: h(A,B,C)
│
0: h(A,B,C)
┌───────┴───────┐
0: h(A) 1: h(B,C)
│ ┌─────┴─────┐
0: h(A) 0: h(B) 1: h(C)
- New messages:
- Client #1 records message timestamp: { time: '012', hash: 🍏 }.
- Client #2 records message timestamp: { time: '012', hash: 🍊 }
- Both clients record message timestamp: { time: '100', hash: 🍓 }
Now they have different merkle trees:
Client #1 Client #2
root:h(A,B,C,🍏,🍓) root:h(A,B,C,🍊,🍓)
┌─────────┴────────────┐ ┌──────────┴────────────┐
0:h(A,B,C,🍏) 1:h(🍓) 0:h(A,B,C,🍊) 1:h(🍓)
┌───────┴──────┐ │ ┌───────┴──────┐ │
0:h(A) 1:h(B,C,🍏) 0:h(🍓) 0:h(A) 1:h(B,C,🍊) 0:h(🍓)
│ ┌──────┼──────┐ │ │ ┌──────┼──────┐ │
0:h(A) 0:h(B) 1:h(C) 2:h(🍏) 0:h(🍓) 0:h(A) 0:h(B) 1:h(C) 2:h(🍊) 0:h(🍓)
The merkle.js:diff()
function implements an algorithm for finding the approximate "time of divergence" which, at a
high level, works like this:
- Compare the top-level hashes. A^B^C^🍏^🍓 !== A^B^C^🍊^🍓 so we immediately know the trees are different.
- Get get all the keys from the nodes in both trees, at the 1st level: ['0', '1']
- Sort the keys alphabetically, then for each key (k), find the first key where
node1[k].hash !== node2[k].hash
- In this case,
k === '0'
because A^B^C^🍏 !== A^B^C^🍊; setkey = '0'
- Repeat the process. Get all the keys for
node1['0']
andnode2['0']
: ['0', '1']] - Iterate over the sorted keys (0, 1) until you find a pair of nodes with different hashes.
- In this case,
k === '1'
because B^C^🍏 !== B^C^🍊; setkey += '1'
(i.e., key is now '01') - Repeat the process. Get all the keys for
node1['1']
andnode2['1']
: ['0', '1', '2']] - Iterate over the sorted keys (0, 1) until you find a pair of nodes with different hashes.
- In this case,
k === '2'
because 🍏 !== 🍊; setkey += '2'
(i.e., key is now '012') - Repeat the process. . Get all the keys for
node1['2']
andnode2['2']
. - Neither of the nodes have children, so they are equal: exit.
- We now have a
key
that is a base-3 encoded integer012
(minutes since 1970).
To sync, Client #1 can ask for all of Client #2's messages with timestamps >= 012
: this means the 🍊 and 🍓 messages.
Client #1 already knows about the 🍓 message, so this shows that the mechanism isn't going to result in only unknown messages being sync'ed; there will be dupes. But the trade-off for complete efficiency is speed.
TODO
- A unique, 128-bit number
- In string form, represented as 36 chars: 32 hex digits (i.e., 0-f) + 4 hyphen separators
- Format: {8 chars}-{4 chars}-{4 chars}-{4 chars}-{12 chars}
- Example:
123e4567-e89b-12d3-a456-426655440000
TODO
- Hybrid Logical Clocks by Jared Forsyth