Skip to content

Commit 2b89ab3

Browse files
authored
feat: add deployment_id support for multi-deployment (#512)
* chore: add deployment_id config for multi-tenancy support * test: refactor entity store test suite * fix: catch unhandled err * feat: entity store support deployment id * test: move credentials encryption test out of the suite * test: move max destination out of suite * test: deployment isolation * feat: deployment id support to alert store * feat: namespace rsmq with deployment id * chore: makefile * chore: application integration * docs: add comments to clarify migration tool * test: deployment id e2e suite * fix: remove redis singleton pattern * docs: config generate * style: gofmt * fix: avoid panic * chore: deployment validation * chore: include deployment id on startup log * chore: remove deployment prefix
1 parent 902c016 commit 2b89ab3

File tree

26 files changed

+1207
-872
lines changed

26 files changed

+1207
-872
lines changed

Makefile

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
TEST?=$$(go list ./...)
2+
RUN?=
23

34
# Build targets
45
.PHONY: build
@@ -122,13 +123,17 @@ test/setup:
122123
@echo ""
123124

124125
test:
125-
go test $(TEST) $(TESTARGS)
126+
@if [ "$(RUN)" != "" ]; then \
127+
$(if $(TESTINFRA),TESTINFRA=$(TESTINFRA)) go test $(TEST) $(TESTARGS) -run "$(RUN)"; \
128+
else \
129+
$(if $(TESTINFRA),TESTINFRA=$(TESTINFRA)) go test $(TEST) $(TESTARGS); \
130+
fi
126131

127132
test/unit:
128-
go test $(TEST) $(TESTARGS) -short
133+
$(if $(TESTINFRA),TESTINFRA=$(TESTINFRA)) go test $(TEST) $(TESTARGS) -short
129134

130135
test/integration:
131-
go test $(TEST) $(TESTARGS) -run "Integration"
136+
$(if $(TESTINFRA),TESTINFRA=$(TESTINFRA)) go test $(TEST) $(TESTARGS) -run "Integration"
132137

133138
test/e2e/rediscluster:
134139
@echo "Running Redis cluster e2e tests in Docker container..."

cmd/e2e/configs/basic.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,9 @@ const (
2525
)
2626

2727
type BasicOpts struct {
28-
LogStorage LogStorageType
29-
RedisConfig *redis.RedisConfig // Optional Redis config override
28+
LogStorage LogStorageType
29+
RedisConfig *redis.RedisConfig // Optional Redis config override
30+
DeploymentID string // Optional deployment ID for multi-tenancy testing
3031
}
3132

3233
func Basic(t *testing.T, opts BasicOpts) config.Config {
@@ -83,6 +84,7 @@ func Basic(t *testing.T, opts BasicOpts) config.Config {
8384
c.RetryMaxLimit = 3
8485
c.LogBatchThresholdSeconds = 1
8586
c.LogBatchSize = 100
87+
c.DeploymentID = opts.DeploymentID
8688

8789
// Setup cleanup
8890
t.Cleanup(func() {
@@ -132,7 +134,7 @@ func CreateRedisClusterConfig(t *testing.T) *redis.RedisConfig {
132134
// Test Redis connection before returning
133135
t.Logf("Testing Redis cluster connection to %s:%d", redisHost, redisPort)
134136
testCtx := context.Background()
135-
_, err := redis.NewClient(testCtx, redisConfig)
137+
_, err := redis.New(testCtx, redisConfig)
136138
if err != nil {
137139
t.Fatalf("Failed to create Redis client: %v", err)
138140
}

cmd/e2e/suites_test.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ type basicSuite struct {
126126
e2eSuite
127127
logStorageType configs.LogStorageType
128128
redisConfig *redis.RedisConfig // Optional Redis config override
129+
deploymentID string // Optional deployment ID
129130
alertServer *alert.AlertMockServer
130131
}
131132

@@ -143,8 +144,9 @@ func (suite *basicSuite) SetupSuite() {
143144

144145
// Configure alert callback URL
145146
cfg := configs.Basic(t, configs.BasicOpts{
146-
LogStorage: suite.logStorageType,
147-
RedisConfig: suite.redisConfig,
147+
LogStorage: suite.logStorageType,
148+
RedisConfig: suite.redisConfig,
149+
DeploymentID: suite.deploymentID,
148150
})
149151
cfg.Alert.CallbackURL = alertServer.GetCallbackURL()
150152

@@ -204,3 +206,15 @@ func TestRedisClusterBasicSuite(t *testing.T) {
204206
redisConfig: redisConfig,
205207
})
206208
}
209+
210+
func TestBasicSuiteWithDeploymentID(t *testing.T) {
211+
t.Parallel()
212+
if testing.Short() {
213+
t.Skip("skipping e2e test")
214+
}
215+
216+
suite.Run(t, &basicSuite{
217+
logStorageType: configs.LogStorageTypePostgres,
218+
deploymentID: "dp_e2e_test",
219+
})
220+
}

cmd/outpost-migrate-redis/README.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,15 @@ Migrates Redis keys from legacy format to hash-tagged format for Redis Cluster c
109109
**Purpose:** Ensures all keys for a tenant are routed to the same Redis Cluster node by using hash tags.
110110

111111
**Key Transformations:**
112-
- `tenant:<id>:*``{tenant:<id>}:*`
113-
- `destination_summary:<tenant>:<dest>``{tenant:<tenant>}:destination_summary:<dest>`
114-
- Individual destination keys are properly hash-tagged by tenant
112+
- `tenant:123``tenant:{123}:tenant`
113+
- `tenant:123:destinations``tenant:{123}:destinations`
114+
- `tenant:123:destination:abc``tenant:{123}:destination:abc`
115+
116+
**Deployment Mode Note:** If you are using `DEPLOYMENT_ID` configuration, this migration is **not needed**. Deployment-scoped keys already include hash tags:
117+
- `dp_001:tenant:{123}:tenant` (already has hash tags)
118+
- `dp_001:tenant:{123}:destinations` (already has hash tags)
119+
120+
See [001_hash_tags/README.md](./migration/001_hash_tags/README.md) for details.
115121

116122
**Safety:** This migration preserves original keys. Use the cleanup command after verification to remove old keys.
117123

cmd/outpost-migrate-redis/migration/001_hash_tags/README.md

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,47 @@ After verification, removes all legacy keys:
4444
- Requires confirmation unless `-force` flag is used
4545
- Processes deletions in batches of 100 keys
4646

47+
## Deployment Mode Compatibility
48+
49+
### When This Migration is NOT Needed
50+
51+
If you are using the `DEPLOYMENT_ID` configuration option (or `deployment_id` in YAML), **you can skip this migration entirely**. Deployments using deployment IDs already have keys in the correct format:
52+
53+
```
54+
dp_001:tenant:{123}:tenant
55+
dp_001:tenant:{123}:destinations
56+
dp_001:tenant:{123}:destination:abc
57+
```
58+
59+
These keys already include hash tags `{123}` and are Redis Cluster compatible.
60+
61+
### When This Migration IS Needed
62+
63+
This migration is only required for legacy deployments that:
64+
1. Started before hash tag support was added
65+
2. Are **NOT** using `DEPLOYMENT_ID` configuration
66+
3. Have keys in the old format without curly braces:
67+
```
68+
tenant:123
69+
tenant:123:destinations
70+
tenant:123:destination:abc
71+
```
72+
73+
### Checking If You Need This Migration
74+
75+
Run the migration planner to check:
76+
```bash
77+
outpost-migrate-redis plan
78+
```
79+
80+
If the output shows `0 tenants to migrate`, your deployment either:
81+
- Already has hash tags (you're good!)
82+
- Is using deployment IDs (you're good!)
83+
- Has no data yet (you're good!)
84+
4785
## Notes
4886

4987
- Original keys are preserved during Apply phase for rollback safety
5088
- Migration is idempotent - can be run multiple times safely
51-
- Skips tenants that are already migrated
89+
- Skips tenants that are already migrated
90+
- Does not touch deployment-prefixed keys (e.g., `dp_001:*`)

cmd/outpost-migrate-redis/migration/001_hash_tags/hash_tags.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,17 @@ import (
1010
"github.com/hookdeck/outpost/internal/redis"
1111
)
1212

13-
// HashTagsMigration migrates from legacy format (tenant:*) to hash-tagged format ({tenant}:*)
13+
// HashTagsMigration migrates from legacy format (tenant:*) to hash-tagged format (tenant:{ID}:*)
14+
//
15+
// NOTE: This migration only handles non-deployment-prefixed keys.
16+
// If you are using DEPLOYMENT_ID configuration, your keys already have the correct format
17+
// with deployment prefixes (dp_001:tenant:{TENANT_ID}:*) and hash tags are already
18+
// in place. In that case, this migration can be safely skipped.
19+
//
20+
// This migration is only needed for legacy deployments that:
21+
// - Started before hash tag support was added
22+
// - Are NOT using DEPLOYMENT_ID configuration
23+
// - Have keys in the old format: tenant:ID:* (without curly braces)
1424
type HashTagsMigration struct {
1525
client redis.Client
1626
logger migration.Logger

docs/pages/references/configuration.mdx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ Global configurations are provided through env variables or a YAML file. ConfigM
4343
| `AZURE_SERVICEBUS_TENANT_ID` | Azure Active Directory tenant ID | `nil` | Yes |
4444
| `DELIVERY_MAX_CONCURRENCY` | Maximum number of delivery attempts to process concurrently. | `1` | No |
4545
| `DELIVERY_TIMEOUT_SECONDS` | Timeout in seconds for HTTP requests made during event delivery to webhook destinations. | `5` | No |
46+
| `DEPLOYMENT_ID` | Optional deployment identifier for multi-tenancy. Enables multiple deployments to share the same infrastructure while maintaining data isolation. | `nil` | No |
4647
| `DESTINATIONS_AWS_KINESIS_METADATA_IN_PAYLOAD` | If true, includes Outpost metadata (event ID, topic, etc.) within the Kinesis record payload. | `true` | No |
4748
| `DESTINATIONS_INCLUDE_MILLISECOND_TIMESTAMP` | If true, includes a 'timestamp-ms' field with millisecond precision in destination metadata. Useful for load testing and debugging. | `false` | No |
4849
| `DESTINATIONS_METADATA_PATH` | Path to the directory containing custom destination type definitions. This can be overridden by the root-level 'destination_metadata_path' if also set. | `config/outpost/destinations` | No |
@@ -164,6 +165,9 @@ delivery_max_concurrency: 1
164165
# Timeout in seconds for HTTP requests made during event delivery to webhook destinations.
165166
delivery_timeout_seconds: 5
166167

168+
# Optional deployment identifier for multi-tenancy. Enables multiple deployments to share the same infrastructure while maintaining data isolation.
169+
deployment_id: ""
170+
167171
# Path to the directory containing custom destination type definitions. Overrides 'destinations.metadata_path' if set.
168172
destination_metadata_path: ""
169173

internal/alert/monitor.go

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,13 @@ func WithLogger(logger *logging.Logger) AlertOption {
7373
}
7474
}
7575

76+
// WithDeploymentID sets the deployment ID for the monitor
77+
func WithDeploymentID(deploymentID string) AlertOption {
78+
return func(m *alertMonitor) {
79+
m.deploymentID = deploymentID
80+
}
81+
}
82+
7683
// DeliveryAttempt represents a single delivery attempt
7784
type DeliveryAttempt struct {
7885
Success bool
@@ -83,11 +90,12 @@ type DeliveryAttempt struct {
8390
}
8491

8592
type alertMonitor struct {
86-
logger *logging.Logger
87-
store AlertStore
88-
evaluator AlertEvaluator
89-
notifier AlertNotifier
90-
disabler DestinationDisabler
93+
logger *logging.Logger
94+
store AlertStore
95+
evaluator AlertEvaluator
96+
notifier AlertNotifier
97+
disabler DestinationDisabler
98+
deploymentID string
9199

92100
// autoDisableFailureCount is the number of consecutive failures before auto-disabling
93101
autoDisableFailureCount int
@@ -119,7 +127,7 @@ func NewAlertMonitor(logger *logging.Logger, redisClient redis.Cmdable, opts ...
119127
}
120128

121129
if alertMonitor.store == nil {
122-
alertMonitor.store = NewRedisAlertStore(redisClient)
130+
alertMonitor.store = NewRedisAlertStore(redisClient, alertMonitor.deploymentID)
123131
}
124132

125133
if alertMonitor.evaluator == nil {

internal/alert/store.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,16 @@ type AlertStore interface {
2020
}
2121

2222
type redisAlertStore struct {
23-
client redis.Cmdable
23+
client redis.Cmdable
24+
deploymentID string
2425
}
2526

2627
// NewRedisAlertStore creates a new Redis-backed alert store
27-
func NewRedisAlertStore(client redis.Cmdable) AlertStore {
28-
return &redisAlertStore{client: client}
28+
func NewRedisAlertStore(client redis.Cmdable, deploymentID string) AlertStore {
29+
return &redisAlertStore{
30+
client: client,
31+
deploymentID: deploymentID,
32+
}
2933
}
3034

3135
func (s *redisAlertStore) IncrementConsecutiveFailureCount(ctx context.Context, tenantID, destinationID string) (int, error) {
@@ -57,6 +61,13 @@ func (s *redisAlertStore) ResetConsecutiveFailureCount(ctx context.Context, tena
5761
return s.client.Del(ctx, s.getFailuresKey(destinationID)).Err()
5862
}
5963

64+
func (s *redisAlertStore) deploymentPrefix() string {
65+
if s.deploymentID == "" {
66+
return ""
67+
}
68+
return fmt.Sprintf("%s:", s.deploymentID)
69+
}
70+
6071
func (s *redisAlertStore) getFailuresKey(destinationID string) string {
61-
return fmt.Sprintf("%s:%s:%s", keyPrefixAlert, destinationID, keyFailures)
72+
return fmt.Sprintf("%s%s:%s:%s", s.deploymentPrefix(), keyPrefixAlert, destinationID, keyFailures)
6273
}

internal/alert/store_test.go

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ func TestRedisAlertStore(t *testing.T) {
1616
t.Run("increment consecutive failures", func(t *testing.T) {
1717
t.Parallel()
1818
redisClient := testutil.CreateTestRedisClient(t)
19-
store := alert.NewRedisAlertStore(redisClient)
19+
store := alert.NewRedisAlertStore(redisClient, "")
2020

2121
// First increment
2222
count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1")
@@ -32,7 +32,7 @@ func TestRedisAlertStore(t *testing.T) {
3232
t.Run("reset consecutive failures", func(t *testing.T) {
3333
t.Parallel()
3434
redisClient := testutil.CreateTestRedisClient(t)
35-
store := alert.NewRedisAlertStore(redisClient)
35+
store := alert.NewRedisAlertStore(redisClient, "")
3636

3737
// Set up initial failures
3838
count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_2", "dest_2")
@@ -49,3 +49,76 @@ func TestRedisAlertStore(t *testing.T) {
4949
assert.Equal(t, 1, count)
5050
})
5151
}
52+
53+
func TestRedisAlertStore_WithDeploymentID(t *testing.T) {
54+
t.Parallel()
55+
56+
redisClient := testutil.CreateTestRedisClient(t)
57+
store := alert.NewRedisAlertStore(redisClient, "dp_test_001")
58+
59+
// Test increment with deployment ID
60+
count, err := store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1")
61+
require.NoError(t, err)
62+
assert.Equal(t, 1, count)
63+
64+
// Second increment
65+
count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1")
66+
require.NoError(t, err)
67+
assert.Equal(t, 2, count)
68+
69+
// Test reset with deployment ID
70+
err = store.ResetConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1")
71+
require.NoError(t, err)
72+
73+
// Verify counter is reset
74+
count, err = store.IncrementConsecutiveFailureCount(context.Background(), "tenant_1", "dest_1")
75+
require.NoError(t, err)
76+
assert.Equal(t, 1, count)
77+
}
78+
79+
func TestAlertStoreIsolation(t *testing.T) {
80+
t.Parallel()
81+
82+
redisClient := testutil.CreateTestRedisClient(t)
83+
84+
// Create two stores with different deployment IDs
85+
store1 := alert.NewRedisAlertStore(redisClient, "dp_001")
86+
store2 := alert.NewRedisAlertStore(redisClient, "dp_002")
87+
88+
// Use same tenant/destination IDs for both
89+
tenantID := "tenant_shared"
90+
destinationID := "dest_shared"
91+
92+
// Increment in store1
93+
count1, err := store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID)
94+
require.NoError(t, err)
95+
assert.Equal(t, 1, count1)
96+
97+
count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID)
98+
require.NoError(t, err)
99+
assert.Equal(t, 2, count1)
100+
101+
// Increment in store2 - should start at 1 (isolated from store1)
102+
count2, err := store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID)
103+
require.NoError(t, err)
104+
assert.Equal(t, 1, count2, "Store 2 should have its own counter")
105+
106+
// Increment store1 again - should continue from 2
107+
count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID)
108+
require.NoError(t, err)
109+
assert.Equal(t, 3, count1, "Store 1 counter should be unaffected by store 2")
110+
111+
// Reset store1 - should not affect store2
112+
err = store1.ResetConsecutiveFailureCount(context.Background(), tenantID, destinationID)
113+
require.NoError(t, err)
114+
115+
// Verify store1 is reset
116+
count1, err = store1.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID)
117+
require.NoError(t, err)
118+
assert.Equal(t, 1, count1, "Store 1 should be reset")
119+
120+
// Verify store2 is unaffected
121+
count2, err = store2.IncrementConsecutiveFailureCount(context.Background(), tenantID, destinationID)
122+
require.NoError(t, err)
123+
assert.Equal(t, 2, count2, "Store 2 should be unaffected by store 1 reset")
124+
}

0 commit comments

Comments
 (0)