Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions clients/ember-go/ember.go
Original file line number Diff line number Diff line change
Expand Up @@ -455,3 +455,180 @@ func (c *Client) DBSize(ctx context.Context) (int64, error) {
}
return resp.Value, nil
}

// Echo sends a message and returns it back.
func (c *Client) Echo(ctx context.Context, message string) (string, error) {
resp, err := c.rpc.Echo(c.ctx(ctx), &pb.EchoRequest{Message: message})
if err != nil {
return "", err
}
return resp.Message, nil
}

// Decr decrements a key by 1 and returns the new value.
func (c *Client) Decr(ctx context.Context, key string) (int64, error) {
resp, err := c.rpc.Decr(c.ctx(ctx), &pb.DecrRequest{Key: key})
if err != nil {
return 0, err
}
return resp.Value, nil
}

// Unlink removes keys asynchronously (background deallocation). Returns the
// number of keys removed.
func (c *Client) Unlink(ctx context.Context, keys ...string) (int64, error) {
resp, err := c.rpc.Unlink(c.ctx(ctx), &pb.UnlinkRequest{Keys: keys})
if err != nil {
return 0, err
}
return resp.Deleted, nil
}

// BgSave triggers a background snapshot.
func (c *Client) BgSave(ctx context.Context) (string, error) {
resp, err := c.rpc.BgSave(c.ctx(ctx), &pb.BgSaveRequest{})
if err != nil {
return "", err
}
return resp.Status, nil
}

// BgRewriteAof triggers a background AOF rewrite.
func (c *Client) BgRewriteAof(ctx context.Context) (string, error) {
resp, err := c.rpc.BgRewriteAof(c.ctx(ctx), &pb.BgRewriteAofRequest{})
if err != nil {
return "", err
}
return resp.Status, nil
}

// SlowLogEntry represents a single entry in the slow log.
type SlowLogEntry struct {
ID uint64
TimestampUnix uint64
DurationMicro uint64
Command string
}

// SlowLogGet returns slow log entries.
func (c *Client) SlowLogGet(ctx context.Context, count *uint32) ([]SlowLogEntry, error) {
req := &pb.SlowLogGetRequest{}
if count != nil {
req.Count = count
}
resp, err := c.rpc.SlowLogGet(c.ctx(ctx), req)
if err != nil {
return nil, err
}
entries := make([]SlowLogEntry, len(resp.Entries))
for i, e := range resp.Entries {
entries[i] = SlowLogEntry{
ID: e.Id,
TimestampUnix: e.TimestampUnix,
DurationMicro: e.DurationMicros,
Command: e.Command,
}
}
return entries, nil
}

// SlowLogLen returns the number of slow log entries.
func (c *Client) SlowLogLen(ctx context.Context) (int64, error) {
resp, err := c.rpc.SlowLogLen(c.ctx(ctx), &pb.SlowLogLenRequest{})
if err != nil {
return 0, err
}
return resp.Value, nil
}

// SlowLogReset clears the slow log.
func (c *Client) SlowLogReset(ctx context.Context) error {
_, err := c.rpc.SlowLogReset(c.ctx(ctx), &pb.SlowLogResetRequest{})
return err
}

// Publish sends a message to a channel. Returns the number of subscribers
// that received the message.
func (c *Client) Publish(ctx context.Context, channel string, message []byte) (int64, error) {
resp, err := c.rpc.Publish(c.ctx(ctx), &pb.PublishRequest{
Channel: channel,
Message: message,
})
if err != nil {
return 0, err
}
return resp.Value, nil
}

// SubscribeEvent represents a message received on a subscription.
type SubscribeEvent struct {
Kind string // "message" or "pmessage"
Channel string
Data []byte
Pattern string // only set for pmessage
}

// Subscribe opens a server-streaming subscription for the given channels
// and/or patterns. Returns a channel that yields events until the context
// is cancelled or the stream ends.
func (c *Client) Subscribe(ctx context.Context, channels []string, patterns []string) (<-chan SubscribeEvent, error) {
stream, err := c.rpc.Subscribe(c.ctx(ctx), &pb.SubscribeRequest{
Channels: channels,
Patterns: patterns,
})
if err != nil {
return nil, err
}

ch := make(chan SubscribeEvent, 64)
go func() {
defer close(ch)
for {
evt, err := stream.Recv()
if err != nil {
return
}
se := SubscribeEvent{
Kind: evt.Kind,
Channel: evt.Channel,
Data: evt.Data,
}
if evt.Pattern != nil {
se.Pattern = *evt.Pattern
}
select {
case ch <- se:
case <-ctx.Done():
return
}
}
}()

return ch, nil
}

// PubSubChannels returns active channel names, optionally filtered by pattern.
func (c *Client) PubSubChannels(ctx context.Context, pattern *string) ([]string, error) {
req := &pb.PubSubChannelsRequest{}
if pattern != nil {
req.Pattern = pattern
}
resp, err := c.rpc.PubSubChannels(c.ctx(ctx), req)
if err != nil {
return nil, err
}
return resp.Keys, nil
}

// PubSubNumSub returns subscriber counts for the given channels.
func (c *Client) PubSubNumSub(ctx context.Context, channels ...string) (map[string]int64, error) {
resp, err := c.rpc.PubSubNumSub(c.ctx(ctx), &pb.PubSubNumSubRequest{Channels: channels})
if err != nil {
return nil, err
}
result := make(map[string]int64, len(resp.Counts))
for _, c := range resp.Counts {
result[c.Channel] = c.Count
}
return result, nil
}
Loading