Skip to content

Commit

Permalink
Address PR feedback; add table truncation, in-memory map initializati…
Browse files Browse the repository at this point in the history
…on, and rate-limiting of events
  • Loading branch information
rjobanp committed Nov 8, 2023
1 parent bbe50a4 commit ccc67bf
Showing 1 changed file with 48 additions and 31 deletions.
79 changes: 48 additions & 31 deletions doc/developer/design/20231103_privatelink_status_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,51 +43,62 @@ Add a new table to `mz_internal`:
| Field | Type | Meaning |
|-------------------|----------------------------|------------------------------------------------------------|
| `connection_id` | `text` | The unique identifier of the AWS PrivateLink connection. Corresponds to `mz_catalog.mz_connections.id` |
| `status` | `text` | The status: one of `pending-service-discovery`, `creating-endpoint`, `recreating-endpoint`, `updating-endpoint`, `available`, `deleted`, `deleting`, `expired`, `failed`, `pending`, `pending-acceptance`, `rejected`, `unknown` |
| `occured_at` | `timestamp with time zone` | The timestamp at which the state change occured. |
| `status` | `text` | The status: one of `pending-service-discovery`, `creating-endpoint`, `recreating-endpoint`, `updating-endpoint`, `available`, `deleted`, `deleting`, `expired`, `failed`, `pending`, `pending-acceptance`, `rejected`, `unknown` |
| `occurred_at` | `timestamp with time zone` | The timestamp at which the state change occured. |

The events in this table will be persisted via storage-managed collections,
rather than in system tables, so they won't be refreshed and cleared on
startup. The table columns are modeled after `mz_source_status_history`.

The table will be truncated to only keep a small number of status history
events per `connection_id` to avoid the table growing forever without bound.
The truncation will happen on Storage Controller 'start' by leveraging the
`partially_truncate_status_history` method currently used for truncating
the source/sink status history tables.

The `CloudResourceController` will expose a `watch_vpc_endpoints` method
that will establish a Kubernetes `watch` on all `VpcEndpoint`s in the
namespace and translate them into `VpcEndpointEvent`s (modeled after
the `watch_services` method on the `NamespacedKubernetesOrchestrator`)

where `VpcEndpointEvent` is defined as follows:

``` rust
struct VpcEndpointEvent {
connection_id: GlobalId,
vpc_endpoint_id: String,
status: VpcEndpointState,
transition_time: DateTime,
}
```

This `watch_vpc_endpoints` method will maintain an in-memory map of the last
known state value for each `VpcEndpoint`, compare that to any received
Kubernetes watch event, and upon detecting changes emit a `VpcEndpointEvent`
to the returned stream.

The `transition_time` field will be determined by inspecting the `Available`
- where `VpcEndpointEvent` is defined as follows:
``` rust
struct VpcEndpointEvent {
connection_id: GlobalId,
status: VpcEndpointState,
time: DateTime<Utc>,
}
```
- The `time` field will be determined by inspecting the `Available`
"condition" on the `VpcEndpointStatus` which contains a `last_transition_time`
field populated by the VpcEndpoint Controller in the cloud repository.
- The `status` field will be populated using the `VpcEndpointStatus.state`
field.

The `status` field will be populated using the `VpcEndpointStatus.state` field.

The adapter `Coordinator` (which has a handle to `cloud_resource_controller`)
The Adapter `Coordinator` (which has a handle to `cloud_resource_controller`)
will spawn a task on `serve` (similar to where it calls
`spawn_statement_logging_task`) that calls `watch_vpc_endpoints` to
receive the stream of `VpcEndpointEvent`s.

This task will translate received `VpcEndpointEvent`s into `Row`s and buffer
these in a vector. On a defined interval (e.g. 5 seconds) it will issue a
coordinator message via `internal_cmd_tx` to flush the recorded events
to storage using the `StorageController`'s `record_introspection_updates`.
This is modeled after `StatementLogging::drain_statement_log` to avoid
unnecessary load on cockroach if there is a spike of received events.
receive a stream of `VpcEndpointEvent`s. This single stream will include events
for all `VpcEndpoint`s in the namespace including newly-created ones.
- This task will maintain an in-memory map of the last known state value for
each connection, compare that to any received `VpcEndpointEvent` event,
and filter out redundant events.
- The in-memory map will be initialized based on the last state written to the
table for each connection. These rows are already read from the table on
startup in the Storage Controller `partially_truncate_status_history` call,
which will be refactored to store the `last_n_entries_per_id` it constructs as
a field on the Storage Controller state, to be consumed by the this task.
- The task will rate-limit received events using the
[governor](https://docs.rs/governor/latest/governor/index.html) crate with some
burst capacity to avoid overloading the coordinator if any endpoint gets stuck
in a hot fail loop.
- For each rate-limited batch of events the task will emit a Coordinator
message `Message::VpcEndpointEvents(BTreeMap<GlobalId, VpcEndpointEvent>)`.

The Coordinator will receive the message and translate the events into writes
to the table's storage-managed collection via the `StorageController`'s
`record_introspection_updates` method.


## Alternatives
Expand All @@ -109,7 +120,10 @@ unnecessary load on cockroach if there is a spike of received events.

## Open questions

1. We are likely to record duplicate events on startup, since the
1. *UPDATE 11/6: Resolved -> We will read in the table on startup and use it to
initialize the in-memory current state for each VPC endpoint.*

We are likely to record duplicate events on startup, since the
`watch_vpc_endpoints` method won't know the 'last known state' of each
`VpcEndpoint` recorded into the table.

Expand All @@ -127,7 +141,10 @@ unnecessary load on cockroach if there is a spike of received events.
`last_transition_time` field on their `Available` condition. This seems odd
so we should confirm that this field is being updated appropriately.

3. Do we need to buffer events? Instead we could write to storage on each event
3. *UPDATE 11/6: Resolved -> We will use a governor Quota for rate-limiting
rather than buffering events on a timer.*

Do we need to buffer events? Instead we could write to storage on each event
received. Since we don't expect to receive a high-frequency of events it's
unclear if the buffering is as necessary as it is with statement logging.
Without the buffering we are less likely to drop a new event received right
Expand Down

0 comments on commit ccc67bf

Please sign in to comment.