|
| 1 | +## Streams |
| 2 | + |
| 3 | +Synapse has a concept of "streams", which are roughly described in [`id_generators.py`]( |
| 4 | + https://github.com/matrix-org/synapse/blob/develop/synapse/storage/util/id_generators.py |
| 5 | +). |
| 6 | +Generally speaking, streams are a series of notifications that something in Synapse's database has changed that the application might need to respond to. |
| 7 | +For example: |
| 8 | + |
| 9 | +- The events stream reports new events (PDUs) that Synapse creates, or that Synapse accepts from another homeserver. |
| 10 | +- The account data stream reports changes to users' [account data](https://spec.matrix.org/v1.7/client-server-api/#client-config). |
| 11 | +- The to-device stream reports when a device has a new [to-device message](https://spec.matrix.org/v1.7/client-server-api/#send-to-device-messaging). |
| 12 | + |
| 13 | +See [`synapse.replication.tcp.streams`]( |
| 14 | + https://github.com/matrix-org/synapse/blob/develop/synapse/replication/tcp/streams/__init__.py |
| 15 | +) for the full list of streams. |
| 16 | + |
| 17 | +It is very helpful to understand the streams mechanism when working on any part of Synapse that needs to respond to changes—especially if those changes are made by different workers. |
| 18 | +To that end, let's describe streams formally, paraphrasing from the docstring of [`AbstractStreamIdGenerator`]( |
| 19 | + https://github.com/matrix-org/synapse/blob/a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37/synapse/storage/util/id_generators.py#L96 |
| 20 | +). |
| 21 | + |
| 22 | +### Definition |
| 23 | + |
| 24 | +A stream is an append-only log `T1, T2, ..., Tn, ...` of facts[^1] which grows over time. |
| 25 | +Only "writers" can add facts to a stream, and there may be multiple writers. |
| 26 | + |
| 27 | +Each fact has an ID, called its "stream ID". |
| 28 | +Readers should only process facts in ascending stream ID order. |
| 29 | + |
| 30 | +Roughly speaking, each stream is backed by a database table. |
| 31 | +It should have a `stream_id` (or similar) bigint column holding stream IDs, plus additional columns as necessary to describe the fact. |
| 32 | +Typically, a fact is expressed with a single row in its backing table.[^2] |
| 33 | +Within a stream, no two facts may have the same stream_id. |
| 34 | + |
| 35 | +> _Aside_. Some additional notes on streams' backing tables. |
| 36 | +> |
| 37 | +> 1. Rich would like to [ditch the backing tables](https://github.com/matrix-org/synapse/issues/13456). |
| 38 | +> 2. The backing tables may have other uses. |
| 39 | + > For example, the events table serves backs the events stream, and is read when processing new events. |
| 40 | + > But old rows are read from the table all the time, whenever Synapse needs to lookup some facts about an event. |
| 41 | +> 3. Rich suspects that sometimes the stream is backed by multiple tables, so the stream proper is the union of those tables. |
| 42 | +
|
| 43 | +Stream writers can "reserve" a stream ID, and then later mark it as having being completed. |
| 44 | +Stream writers need to track the completion of each stream fact. |
| 45 | +In the happy case, completion means a fact has been written to the stream table. |
| 46 | +But unhappy cases (e.g. transaction rollback due to an error) also count as completion. |
| 47 | +Once completed, the rows written with that stream ID are fixed, and no new rows |
| 48 | +will be inserted with that ID. |
| 49 | + |
| 50 | +### Current stream ID |
| 51 | + |
| 52 | +For any given stream reader (including writers themselves), we may define a per-writer current stream ID: |
| 53 | + |
| 54 | +> The current stream ID _for a writer W_ is the largest stream ID such that |
| 55 | +> all transactions added by W with equal or smaller ID have completed. |
| 56 | +
|
| 57 | +Similarly, there is a "linear" notion of current stream ID: |
| 58 | + |
| 59 | +> The "linear" current stream ID is the largest stream ID such that |
| 60 | +> all facts (added by any writer) with equal or smaller ID have completed. |
| 61 | +
|
| 62 | +Because different stream readers A and B learn about new facts at different times, A and B may disagree about current stream IDs. |
| 63 | +Put differently: we should think of stream readers as being independent of each other, proceeding through a stream of facts at different rates. |
| 64 | + |
| 65 | +**NB.** For both senses of "current", that if a writer opens a transaction that never completes, the current stream ID will never advance beyond that writer's last written stream ID. |
| 66 | + |
| 67 | +For single-writer streams, the per-writer current ID and the linear current ID are the same. |
| 68 | +Both senses of current ID are monotonic, but they may "skip" or jump over IDs because facts complete out of order. |
| 69 | + |
| 70 | + |
| 71 | +_Example_. |
| 72 | +Consider a single-writer stream which is initially at ID 1. |
| 73 | + |
| 74 | +| Action | Current stream ID | Notes | |
| 75 | +|------------|-------------------|-------------------------------------------------| |
| 76 | +| | 1 | | |
| 77 | +| Reserve 2 | 1 | | |
| 78 | +| Reserve 3 | 1 | | |
| 79 | +| Complete 3 | 1 | current ID unchanged, waiting for 2 to complete | |
| 80 | +| Complete 2 | 3 | current ID jumps from 1 -> 3 | |
| 81 | +| Reserve 4 | 3 | | |
| 82 | +| Reserve 5 | 3 | | |
| 83 | +| Reserve 6 | 3 | | |
| 84 | +| Complete 5 | 3 | | |
| 85 | +| Complete 4 | 5 | current ID jumps 3->5, even though 6 is pending | |
| 86 | +| Complete 6 | 6 | | |
| 87 | + |
| 88 | + |
| 89 | +### Multi-writer streams |
| 90 | + |
| 91 | +There are two ways to view a multi-writer stream. |
| 92 | + |
| 93 | +1. Treat it as a collection of distinct single-writer streams, one |
| 94 | + for each writer. |
| 95 | +2. Treat it as a single stream. |
| 96 | + |
| 97 | +The single stream (option 2) is conceptually simpler, and easier to represent (a single stream id). |
| 98 | +However, it requires each reader to know about the entire set of writers, to ensures that readers don't erroneously advance their current stream position too early and miss a fact from an unknown writer. |
| 99 | +In contrast, multiple parallel streams (option 1) are more complex, requiring more state to represent (map from writer to stream id). |
| 100 | +The payoff for doing so is that readers can "peek" ahead to facts that completed on one writer no matter the state of the others, reducing latency. |
| 101 | + |
| 102 | +Note that a multi-writer stream can be viewed in both ways. |
| 103 | +For example, the events stream is treated as multiple single-writer streams (option 1) by the sync handler, so that events are sent to clients as soon as possible. |
| 104 | +But the background process that works through events treats them as a single linear stream. |
| 105 | + |
| 106 | +Another useful example is the cache invalidation stream. |
| 107 | +The facts this stream holds are instructions to "you should now invalidate these cache entries". |
| 108 | +We only ever treat this as a multiple single-writer streams as there is no important ordering between cache invalidations. |
| 109 | +(Invalidations are self-contained facts; and the invalidations commute/are idempotent). |
| 110 | + |
| 111 | +### Writing to streams |
| 112 | + |
| 113 | +Writers need to track: |
| 114 | + - track their current position (i.e. its own per-writer stream ID). |
| 115 | + - their facts currently awaiting completion. |
| 116 | + |
| 117 | +At startup, |
| 118 | + - the current position of that writer can be found by querying the database (which suggests that facts need to be written to the database atomically, in a transaction); and |
| 119 | + - there are no facts awaiting completion. |
| 120 | + |
| 121 | +To reserve a stream ID, call [`nextval`](https://www.postgresql.org/docs/current/functions-sequence.html) on the appropriate postgres sequence. |
| 122 | + |
| 123 | +To write a fact to the stream: insert the appropriate rows to the appropriate backing table. |
| 124 | + |
| 125 | +To complete a fact, first remove it from your map of facts currently awaiting completion. |
| 126 | +Then, if no earlier fact is awaiting completion, the writer can advance its current position in that stream. |
| 127 | +Upon doing so it should emit an `RDATA` message[^3], once for every fact between the old and the new stream ID. |
| 128 | + |
| 129 | +### Subscribing to streams |
| 130 | + |
| 131 | +Readers need to track the current position of every writer. |
| 132 | + |
| 133 | +At startup, they can find this by contacting each writer with a `REPLICATE` message, |
| 134 | +requesting that all writers reply describing their current position in their streams. |
| 135 | +Writers reply with a `POSITION` message. |
| 136 | + |
| 137 | +To learn about new facts, readers should listen for `RDATA` messages and process them to respond to the new fact. |
| 138 | +The `RDATA` itself is not a self-contained representation of the fact; |
| 139 | +readers will have to query the stream tables for the full details. |
| 140 | +Readers must also advance their record of the writer's current position for that stream. |
| 141 | + |
| 142 | +# Summary |
| 143 | + |
| 144 | +In a nutshell: we have an append-only log with a "buffer/scratchpad" at the end where we have to wait for the sequence to be linear and contiguous. |
| 145 | + |
| 146 | + |
| 147 | +--- |
| 148 | + |
| 149 | +[^1]: we use the word _fact_ here for two reasons. |
| 150 | +Firstly, the word "event" is already heavily overloaded (PDUs, EDUs, account data, ...) and we don't need to make that worse. |
| 151 | +Secondly, "fact" emphasises that the things we append to a stream cannot change after the fact. |
| 152 | + |
| 153 | +[^2]: A fact might be expressed with 0 rows, e.g. if we opened a transaction to persist an event, but failed and rolled the transaction back before marking the fact as completed. |
| 154 | +In principle a fact might be expressed with 2 or more rows; if so, each of those rows should share the fact's stream ID. |
| 155 | + |
| 156 | +[^3]: This communication used to happen directly with the writers [over TCP](../../tcp_replication.md); |
| 157 | +nowadays it's done via Redis's Pubsub. |
0 commit comments