Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions rmap/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,15 @@ map should later be closed to free up resources.
The `Join` function creates a new replicated map or joins an existing one. The
`Close` function closes the subscription channels and the subscription to Redis.

### Retention (TTL)

By default, replicated maps do not expire. Pulse can optionally set a TTL on the
Redis hash backing a map:

- `rmap.WithTTL(ttl)` sets an **absolute TTL** (set once when the hash is first
created and never extended).
- `rmap.WithSlidingTTL(ttl)` sets a **sliding TTL** (refreshed on every write).

### Writing to the Map

* The `Set` method sets the value for a given key and returns the previous value.
Expand Down
21 changes: 21 additions & 0 deletions rmap/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type (
Name string
chankey string // Redis pubsub channel name
hashkey string // Redis hash key
ttl time.Duration // TTL applied to hashkey (optional)
ttlSliding bool // refresh TTL on every write when true
msgch <-chan *redis.Message // channel to receive map updates
chans []chan EventKind // channels to send notifications
done chan struct{} // channel to signal shutdown
Expand Down Expand Up @@ -112,11 +114,16 @@ func Join(ctx context.Context, name string, rdb *redis.Client, opts ...MapOption
if o.Logger == nil {
o.Logger = pulse.NoopLogger()
}
if o.TTL < 0 {
return nil, fmt.Errorf("pulse map: %s ttl must be >= 0", name)
}
closectx, closer := context.WithCancel(context.Background())
sm := &Map{
Name: name,
chankey: fmt.Sprintf("map:%s:updates", name),
hashkey: fmt.Sprintf("map:%s:content", name),
ttl: o.TTL,
ttlSliding: o.TTLSliding,
done: make(chan struct{}),
closectx: closectx,
closer: closer,
Expand Down Expand Up @@ -870,10 +877,24 @@ func (sm *Map) runLuaScript(ctx context.Context, name string, script *redis.Scri
if err != nil && err != redis.Nil {
return nil, fmt.Errorf("pulse map: %s failed to run %q for key %s: %w", sm.Name, name, key, err)
}
if err := sm.applyTTL(ctx); err != nil {
return nil, fmt.Errorf("pulse map: %s failed to apply TTL: %w", sm.Name, err)
}

return res, nil
}

func (sm *Map) applyTTL(ctx context.Context) error {
if sm.ttl <= 0 {
return nil
}
if sm.ttlSliding {
return sm.rdb.Expire(ctx, sm.hashkey, sm.ttl).Err()
}
_, err := sm.rdb.ExpireNX(ctx, sm.hashkey, sm.ttl).Result()
return err
}

// reconnect attempts to reconnect to the Redis server forever.
func (sm *Map) reconnect() {
var count int
Expand Down
68 changes: 68 additions & 0 deletions rmap/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,74 @@ func TestMapLocal(t *testing.T) {
cleanup(t, m)
}

func TestMapTTLAbsolute(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: redisAddr,
Password: redisPwd,
})
ctx := context.Background()

m, err := Join(ctx, "ttl-absolute", rdb, WithTTL(2*time.Second))
if err != nil {
if strings.Contains(err.Error(), "WRONGPASS") {
t.Fatal("Unexpected Redis password error (did you set REDIS_PASSWORD?)")
} else if strings.Contains(err.Error(), "connection refused") {
t.Fatal("Unexpected Redis connection error (is Redis running?)")
}
}
require.NoError(t, m.Reset(ctx))

_, err = m.Set(ctx, "k", "v")
require.NoError(t, err)

time.Sleep(200 * time.Millisecond)
before, err := rdb.PTTL(ctx, "map:ttl-absolute:content").Result()
require.NoError(t, err)
require.Greater(t, before, time.Duration(0))

_, err = m.Set(ctx, "k2", "v2")
require.NoError(t, err)
after, err := rdb.PTTL(ctx, "map:ttl-absolute:content").Result()
require.NoError(t, err)

// Absolute TTL: writes must not refresh the expiry.
require.LessOrEqual(t, after, before)
}

func TestMapTTLSliding(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: redisAddr,
Password: redisPwd,
})
ctx := context.Background()

m, err := Join(ctx, "ttl-sliding", rdb, WithSlidingTTL(2*time.Second))
if err != nil {
if strings.Contains(err.Error(), "WRONGPASS") {
t.Fatal("Unexpected Redis password error (did you set REDIS_PASSWORD?)")
} else if strings.Contains(err.Error(), "connection refused") {
t.Fatal("Unexpected Redis connection error (is Redis running?)")
}
}
require.NoError(t, m.Reset(ctx))

_, err = m.Set(ctx, "k", "v")
require.NoError(t, err)

time.Sleep(200 * time.Millisecond)
before, err := rdb.PTTL(ctx, "map:ttl-sliding:content").Result()
require.NoError(t, err)
require.Greater(t, before, time.Duration(0))

_, err = m.Set(ctx, "k2", "v2")
require.NoError(t, err)
after, err := rdb.PTTL(ctx, "map:ttl-sliding:content").Result()
require.NoError(t, err)

// Sliding TTL: writes should refresh expiry back toward ttl.
require.Greater(t, after, before)
}

func TestSetAndWait(t *testing.T) {
rdb := redis.NewClient(&redis.Options{
Addr: redisAddr,
Expand Down
32 changes: 30 additions & 2 deletions rmap/options.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
package rmap

import "goa.design/pulse/pulse"
import (
"time"

"goa.design/pulse/pulse"
)

type (
// MapOption is a Map creation option.
MapOption func(*options)

options struct {
// Channel name
// Logger
Logger pulse.Logger

// TTL configures a retention window for the Redis hash backing the map.
// When zero, no TTL is applied.
TTL time.Duration
// TTLSliding controls whether the TTL is refreshed on every write.
// When false, the TTL is applied once (absolute TTL) and never extended.
TTLSliding bool
}
)

Expand All @@ -20,6 +30,24 @@ func WithLogger(logger pulse.Logger) MapOption {
}
}

// WithTTL sets an absolute TTL on the Redis hash backing the map.
// The TTL is set once (when the hash is created) and never extended.
func WithTTL(ttl time.Duration) MapOption {
return func(o *options) {
o.TTL = ttl
o.TTLSliding = false
}
}

// WithSlidingTTL sets a sliding TTL on the Redis hash backing the map.
// The TTL is refreshed on every write operation.
func WithSlidingTTL(ttl time.Duration) MapOption {
return func(o *options) {
o.TTL = ttl
o.TTLSliding = true
}
}

// parseOptions parses the given options and returns the corresponding
// options.
func parseOptions(opts ...MapOption) *options {
Expand Down
18 changes: 16 additions & 2 deletions streaming/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func newSink(ctx context.Context, name string, stream *Stream, opts ...options.S
}

logger := stream.rootLogger.WithPrefix("sink", name)
cm, err := rmap.Join(ctx, consumersMapName(stream), stream.rdb, rmap.WithLogger(logger))
cm, err := rmap.Join(ctx, consumersMapName(stream), stream.rdb, consumersMapOptions(stream, logger)...)
if err != nil {
return nil, fmt.Errorf("failed to join replicated map for sink %s: %w", name, err)
}
Expand Down Expand Up @@ -242,7 +242,7 @@ func (s *Sink) AddStream(ctx context.Context, stream *Stream, opts ...options.Ad
startID = options.LastEventID
}

cm, err := rmap.Join(ctx, consumersMapName(stream), stream.rdb, rmap.WithLogger(stream.logger))
cm, err := rmap.Join(ctx, consumersMapName(stream), stream.rdb, consumersMapOptions(stream, stream.logger)...)
if err != nil {
return fmt.Errorf("failed to join consumer replicated map for stream %s: %w", stream.Name, err)
}
Expand Down Expand Up @@ -598,6 +598,20 @@ func consumersMapName(stream *Stream) string {
return fmt.Sprintf("stream:%s:sinks", stream.Name)
}

func consumersMapOptions(stream *Stream, logger pulse.Logger) []rmap.MapOption {
opts := []rmap.MapOption{
rmap.WithLogger(logger),
}
if stream.ttl > 0 {
if stream.ttlSliding {
opts = append(opts, rmap.WithSlidingTTL(stream.ttl))
} else {
opts = append(opts, rmap.WithTTL(stream.ttl))
}
}
return opts
}

// sinkKeepAliveMapName is the name of the replicated map that backs a sink keep-alives.
func sinkKeepAliveMapName(sink string) string {
return fmt.Sprintf("sink:%s:keepalive", sink)
Expand Down
28 changes: 28 additions & 0 deletions streaming/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,34 @@ func (s *Stream) Destroy(ctx context.Context) error {
s.logger.Error(err)
return err
}
// Destroy per-stream sink metadata map used by Pulse sinks to track consumers.
//
// Pulse sinks use an rmap keyed by "stream:<streamName>:sinks". The rmap stores
// its state in:
// - map:<name>:content (hash)
// - map:<name>:updates (pubsub channel)
//
// When a stream is explicitly destroyed, any sink metadata for that stream is
// also garbage. Cleaning it up here prevents leaks for short-lived streams
// (e.g., per-call result streams) that are destroyed explicitly.
//
// Note: we intentionally do not delete per-sink keepalive maps
// (sink:<sinkName>:keepalive) since those are shared across streams.
mapName := fmt.Sprintf("stream:%s:sinks", s.Name)
mapHashKey := fmt.Sprintf("map:%s:content", mapName)
mapChanKey := fmt.Sprintf("map:%s:updates", mapName)
if err := s.rdb.Del(ctx, mapHashKey).Err(); err != nil {
err := fmt.Errorf("failed to destroy stream sink map: %w", err)
s.logger.Error(err)
return err
}
// Mirror rmap's destroy semantics so any in-flight subscribers can notice the
// map is gone.
if err := s.rdb.Publish(ctx, mapChanKey, "reset:*").Err(); err != nil {
err := fmt.Errorf("failed to publish sink map reset: %w", err)
s.logger.Error(err)
return err
}
s.logger.Info("stream deleted")
return nil
}
Expand Down
23 changes: 23 additions & 0 deletions streaming/streams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,29 @@ func TestStreamTTLSliding(t *testing.T) {
assert.NoError(t, s.Destroy(ctx))
}

func TestStreamDestroyDeletesSinkMap(t *testing.T) {
rdb := ptesting.NewRedisClient(t)
defer ptesting.CleanupRedis(t, rdb, false, "")
ctx := ptesting.NewTestContext(t)

s, err := NewStream("testStreamDestroyDeletesSinkMap", rdb)
assert.NoError(t, err)

sink, err := s.NewSink(ctx, "gateway")
assert.NoError(t, err)
sink.Close(ctx)

mapKey := "map:stream:testStreamDestroyDeletesSinkMap:sinks:content"
exists, err := rdb.Exists(ctx, mapKey).Result()
assert.NoError(t, err)
assert.EqualValues(t, 1, exists)

assert.NoError(t, s.Destroy(ctx))
exists, err = rdb.Exists(ctx, mapKey).Result()
assert.NoError(t, err)
assert.EqualValues(t, 0, exists)
}

func TestRemove(t *testing.T) {
rdb := ptesting.NewRedisClient(t)
defer ptesting.CleanupRedis(t, rdb, false, "")
Expand Down