Skip to content

Commit e77cc7e

Browse files
committed
Support local Kinesis e.g. Kinesalite
And add some tests that run against the local Kinesis. I need this for testing my own app which uses go-kinesis. But I think it’s also useful for testing go-kinesis itself. I have a feature I’d like to add but I don’t think it’d be safe to do so without some functional tests.
1 parent c027bf8 commit e77cc7e

File tree

4 files changed

+180
-9
lines changed

4 files changed

+180
-9
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,15 @@ Example you can find in folder `examples`.
1212
## Command line interface
1313

1414
You can find a tool for interacting with kinesis from the command line in folder `kinesis-cli`.
15+
16+
## Testing
17+
18+
The tests require a local Kinesis server such as [Kinesalite](https://github.com/mhart/kinesalite)
19+
to be running and reachable at `http://127.0.0.1:4567`.
20+
21+
To make the tests complete faster, you might want to have Kinesalite perform stream creation and
22+
deletion faster than the default of 500ms, like so:
23+
24+
kinesalite --createStreamMs 5 --deleteStreamMs 5 &
25+
26+
The `&` runs Kinesalite in the background, which is probably what you want.

client.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,10 @@ func (c *Client) client() *http.Client {
141141

142142
// Do some request, but sign it before sending
143143
func (c *Client) Do(req *http.Request) (resp *http.Response, err error) {
144-
Sign(c.Auth, req)
144+
err = Sign(c.Auth, req)
145+
if err != nil {
146+
return nil, err
147+
}
145148

146149
if !c.Auth.Expiry.IsZero() {
147150
if time.Now().After(c.Auth.Expiry) {

kinesis.go

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,10 @@ var (
3131

3232
// Structure for kinesis client
3333
type Kinesis struct {
34-
client *Client
35-
Region string
36-
Version string
34+
client *Client
35+
endpoint string
36+
Region string
37+
Version string
3738
}
3839

3940
// Interface implemented by Kinesis
@@ -50,9 +51,19 @@ type KinesisClient interface {
5051
SplitShard(args *RequestArgs) error
5152
}
5253

53-
// New returns an initialized AWS Kinesis client
54+
// New returns an initialized AWS Kinesis client using the canonical live “production” endpoint
55+
// for AWS Kinesis, i.e. https://kinesis.{region}.amazonaws.com
5456
func New(auth *Auth, region Region) *Kinesis {
55-
return &Kinesis{client: NewClient(auth), Version: "20131202", Region: GetRegion(region)}
57+
endpoint := fmt.Sprintf("https://kinesis.%s.amazonaws.com", GetRegion(region))
58+
return NewWithEndpoint(auth, region, endpoint)
59+
}
60+
61+
// NewWithEndpoint returns an initialized AWS Kinesis client using the specified endpoint.
62+
// This is generally useful for testing, so a local Kinesis server can be used.
63+
func NewWithEndpoint(auth *Auth, region Region, endpoint string) *Kinesis {
64+
// TODO: remove trailing slash on endpoint if there is one? does it matter?
65+
// TODO: validate endpoint somehow?
66+
return &Kinesis{client: NewClient(auth), Version: "20131202", Region: GetRegion(region), endpoint: endpoint}
5667
}
5768

5869
// Create params object for request
@@ -115,7 +126,7 @@ func buildError(r *http.Response) error {
115126
err.Message = errors.Message
116127
err.StatusCode = r.StatusCode
117128
if err.Message == "" {
118-
err.Message = r.Status
129+
err.Message = fmt.Sprintf("%s\n%s", r.Status, r.Body)
119130
}
120131
return &err
121132
}
@@ -130,7 +141,7 @@ func (kinesis *Kinesis) query(params map[string]string, data interface{}, resp i
130141
// request
131142
request, err := http.NewRequest(
132143
"POST",
133-
fmt.Sprintf("https://kinesis.%s.amazonaws.com", kinesis.Region),
144+
kinesis.endpoint,
134145
bytes.NewReader(jsonData),
135146
)
136147

kinesis_test.go

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
package kinesis
22

3-
import "testing"
3+
import (
4+
"errors"
5+
"fmt"
6+
"strings"
7+
"testing"
8+
"time"
9+
)
10+
11+
const localEndpoint = "http://127.0.0.1:4567"
412

513
// Trivial test to make sure that Kinesis implements KinesisClient.
614
func TestInterfaceIsImplemented(t *testing.T) {
@@ -40,3 +48,140 @@ func TestAddRecord(t *testing.T) {
4048
t.Errorf("%q != %q", len(args.Records), 1)
4149
}
4250
}
51+
52+
func TestListStreams(t *testing.T) {
53+
auth := &Auth{
54+
AccessKey: "BAD_ACCESS_KEY",
55+
SecretKey: "BAD_SECRET_KEY",
56+
}
57+
58+
client := NewWithEndpoint(auth, USEast1, localEndpoint)
59+
resp, err := client.ListStreams(NewArgs())
60+
if resp == nil {
61+
t.Error("resp == nil")
62+
}
63+
if err != nil {
64+
t.Errorf("%q != nil", err)
65+
}
66+
}
67+
68+
func TestCreateStream(t *testing.T) {
69+
auth := &Auth{
70+
AccessKey: "BAD_ACCESS_KEY",
71+
SecretKey: "BAD_SECRET_KEY",
72+
}
73+
74+
client := NewWithEndpoint(auth, USEast1, localEndpoint)
75+
76+
streamName := "test2"
77+
78+
err := client.CreateStream(streamName, 1)
79+
if err != nil {
80+
t.Errorf("%q != nil", err)
81+
}
82+
83+
err = waitForStreamStatus(client, streamName, "ACTIVE")
84+
if err != nil {
85+
t.Errorf("%q != nil", err)
86+
}
87+
88+
client.DeleteStream(streamName)
89+
err = waitForStreamDeletion(client, streamName)
90+
if err != nil {
91+
t.Errorf("%q != nil", err)
92+
}
93+
}
94+
95+
func TestPutRecord(t *testing.T) {
96+
auth := &Auth{
97+
AccessKey: "BAD_ACCESS_KEY",
98+
SecretKey: "BAD_SECRET_KEY",
99+
}
100+
101+
client := NewWithEndpoint(auth, USEast1, localEndpoint)
102+
103+
streamName := "pizza"
104+
err := createStream(client, streamName, 1)
105+
106+
if err != nil {
107+
t.Errorf("%q != nil", err)
108+
}
109+
110+
args := NewArgs()
111+
args.Add("StreamName", streamName)
112+
args.AddData([]byte("The cheese is old and moldy, where is the bathroom?"))
113+
args.Add("PartitionKey", "key-1")
114+
115+
resp, err := client.PutRecord(args)
116+
if resp == nil {
117+
t.Error("resp == nil")
118+
}
119+
if err != nil {
120+
t.Errorf("%q != nil", err)
121+
}
122+
123+
client.DeleteStream(streamName)
124+
err = waitForStreamDeletion(client, streamName)
125+
if err != nil {
126+
t.Errorf("%q != nil", err)
127+
}
128+
}
129+
130+
// waitForStreamStatus will poll for a stream status repeatedly, once every MS, for up to 1000 MS,
131+
// blocking until the stream has the desired status. It will return an error if the stream never
132+
// achieves the desired status. If a stream doesn’t exist then an error will be returned.
133+
func waitForStreamStatus(client *Kinesis, streamName string, statusToAwait string) error {
134+
args := NewArgs()
135+
args.Add("StreamName", streamName)
136+
var resp3 *DescribeStreamResp
137+
var err error
138+
139+
for i := 1; i < 1000; i++ {
140+
resp3, err = client.DescribeStream(args)
141+
if err != nil {
142+
return err
143+
}
144+
145+
if resp3.StreamDescription.StreamStatus == statusToAwait {
146+
break
147+
} else {
148+
time.Sleep(1 * time.Millisecond)
149+
}
150+
}
151+
152+
if resp3 == nil {
153+
return errors.New("Could not get Stream Description")
154+
}
155+
156+
if resp3.StreamDescription.StreamStatus != statusToAwait {
157+
return errors.New(fmt.Sprintf("Timed out waiting for stream to enter status %v; last status was %v.", statusToAwait, resp3.StreamDescription.StreamStatus))
158+
}
159+
160+
return nil
161+
}
162+
163+
// waitForStreamDeletion will poll for a stream status repeatedly, once every MS, for up to 1000 MS,
164+
// blocking until the stream has been deleted. It will return an error if the stream is never deleted
165+
// or some other error occurs. If it succeeds then the return value will be nil.
166+
func waitForStreamDeletion(client *Kinesis, streamName string) error {
167+
err := waitForStreamStatus(client, streamName, "FOO")
168+
if !strings.Contains(err.Error(), "not found") {
169+
return err
170+
}
171+
return nil
172+
}
173+
174+
// helper
175+
func createStream(client *Kinesis, streamName string, partitions int) error {
176+
err := client.CreateStream(streamName, partitions)
177+
if err != nil {
178+
return err
179+
}
180+
181+
err = waitForStreamStatus(client, streamName, "ACTIVE")
182+
if err != nil {
183+
return err
184+
}
185+
186+
return nil
187+
}

0 commit comments

Comments
 (0)