|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +# Copyright 2015 OpenMarket Ltd |
| 3 | +# |
| 4 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | +# you may not use this file except in compliance with the License. |
| 6 | +# You may obtain a copy of the License at |
| 7 | +# |
| 8 | +# http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | +# |
| 10 | +# Unless required by applicable law or agreed to in writing, software |
| 11 | +# distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | +# See the License for the specific language governing permissions and |
| 14 | +# limitations under the License. |
| 15 | + |
| 16 | +from synapse.util.async import ObservableDeferred |
| 17 | + |
| 18 | + |
| 19 | +class SnapshotCache(object): |
| 20 | + """Cache for snapshots like the response of /initialSync. |
| 21 | + The response of initialSync only has to be a recent snapshot of the |
| 22 | + server state. It shouldn't matter to clients if it is a few minutes out |
| 23 | + of date. |
| 24 | +
|
| 25 | + This caches a deferred response. Until the deferred completes it will be |
| 26 | + returned from the cache. This means that if the client retries the request |
| 27 | + while the response is still being computed, that original response will be |
| 28 | + used rather than trying to compute a new response. |
| 29 | +
|
| 30 | + Once the deferred completes it will removed from the cache after 5 minutes. |
| 31 | + We delay removing it from the cache because a client retrying its request |
| 32 | + could race with us finishing computing the response. |
| 33 | +
|
| 34 | + Rather than tracking precisely how long something has been in the cache we |
| 35 | + keep two generations of completed responses. Every 5 minutes discard the |
| 36 | + old generation, move the new generation to the old generation, and set the |
| 37 | + new generation to be empty. This means that a result will be in the cache |
| 38 | + somewhere between 5 and 10 minutes. |
| 39 | + """ |
| 40 | + |
| 41 | + DURATION_MS = 5 * 60 * 1000 # Cache results for 5 minutes. |
| 42 | + |
| 43 | + def __init__(self): |
| 44 | + self.pending_result_cache = {} # Request that haven't finished yet. |
| 45 | + self.prev_result_cache = {} # The older requests that have finished. |
| 46 | + self.next_result_cache = {} # The newer requests that have finished. |
| 47 | + self.time_last_rotated_ms = 0 |
| 48 | + |
| 49 | + def rotate(self, time_now_ms): |
| 50 | + # Rotate once if the cache duration has passed since the last rotation. |
| 51 | + if time_now_ms - self.time_last_rotated_ms >= self.DURATION_MS: |
| 52 | + self.prev_result_cache = self.next_result_cache |
| 53 | + self.next_result_cache = {} |
| 54 | + self.time_last_rotated_ms += self.DURATION_MS |
| 55 | + |
| 56 | + # Rotate again if the cache duration has passed twice since the last |
| 57 | + # rotation. |
| 58 | + if time_now_ms - self.time_last_rotated_ms >= self.DURATION_MS: |
| 59 | + self.prev_result_cache = self.next_result_cache |
| 60 | + self.next_result_cache = {} |
| 61 | + self.time_last_rotated_ms = time_now_ms |
| 62 | + |
| 63 | + def get(self, time_now_ms, key): |
| 64 | + self.rotate(time_now_ms) |
| 65 | + # This cache is intended to deduplicate requests, so we expect it to be |
| 66 | + # missed most of the time. So we just lookup the key in all of the |
| 67 | + # dictionaries rather than trying to short circuit the lookup if the |
| 68 | + # key is found. |
| 69 | + result = self.prev_result_cache.get(key) |
| 70 | + result = self.next_result_cache.get(key, result) |
| 71 | + result = self.pending_result_cache.get(key, result) |
| 72 | + if result is not None: |
| 73 | + return result.observe() |
| 74 | + else: |
| 75 | + return None |
| 76 | + |
| 77 | + def set(self, time_now_ms, key, deferred): |
| 78 | + self.rotate(time_now_ms) |
| 79 | + |
| 80 | + result = ObservableDeferred(deferred) |
| 81 | + |
| 82 | + self.pending_result_cache[key] = result |
| 83 | + |
| 84 | + def shuffle_along(r): |
| 85 | + # When the deferred completes we shuffle it along to the first |
| 86 | + # generation of the result cache. So that it will eventually |
| 87 | + # expire from the rotation of that cache. |
| 88 | + self.next_result_cache[key] = result |
| 89 | + self.pending_result_cache.pop(key, None) |
| 90 | + |
| 91 | + result.observe().addBoth(shuffle_along) |
| 92 | + |
| 93 | + return result.observe() |
0 commit comments