Skip to content

Commit 6e5e5aa

Browse files
committed
encapsulate the kinesis struct
1 parent 53ffb74 commit 6e5e5aa

File tree

2 files changed

+20
-20
lines changed

2 files changed

+20
-20
lines changed

kinesis.go

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func NewRegionFromEnv() string {
3333
}
3434

3535
// Structure for kinesis client
36-
type Kinesis struct {
36+
type kinesis struct {
3737
client *Client
3838
endpoint string
3939
region string
@@ -56,25 +56,25 @@ type KinesisClient interface {
5656

5757
// New returns an initialized AWS Kinesis client using the canonical live “production” endpoint
5858
// for AWS Kinesis, i.e. https://kinesis.{region}.amazonaws.com
59-
func New(auth Auth, region string) *Kinesis {
59+
func New(auth Auth, region string) KinesisClient {
6060
endpoint := fmt.Sprintf(kinesisURL, region)
6161
return NewWithEndpoint(auth, region, endpoint)
6262
}
6363

6464
// NewWithClient returns an initialized AWS Kinesis client using the canonical live “production” endpoint
6565
// for AWS Kinesis, i.e. https://kinesis.{region}.amazonaws.com but with the ability to create a custom client
6666
// with specific configurations like a timeout
67-
func NewWithClient(auth Auth, region string, client *Client) *Kinesis {
67+
func NewWithClient(auth Auth, region string, client *Client) KinesisClient {
6868
endpoint := fmt.Sprintf(kinesisURL, region)
69-
return &Kinesis{client: client, version: "20131202", region: region, endpoint: endpoint}
69+
return &kinesis{client: client, version: "20131202", region: region, endpoint: endpoint}
7070
}
7171

7272
// NewWithEndpoint returns an initialized AWS Kinesis client using the specified endpoint.
7373
// This is generally useful for testing, so a local Kinesis server can be used.
74-
func NewWithEndpoint(auth Auth, region string, endpoint string) *Kinesis {
74+
func NewWithEndpoint(auth Auth, region string, endpoint string) KinesisClient {
7575
// TODO: remove trailing slash on endpoint if there is one? does it matter?
7676
// TODO: validate endpoint somehow?
77-
return &Kinesis{client: NewClient(auth), version: "20131202", region: region, endpoint: endpoint}
77+
return &kinesis{client: NewClient(auth), version: "20131202", region: region, endpoint: endpoint}
7878
}
7979

8080
// Create params object for request
@@ -152,7 +152,7 @@ func buildError(r *http.Response) error {
152152
}
153153

154154
// Query by AWS API
155-
func (kinesis *Kinesis) query(params map[string]string, data interface{}, resp interface{}) error {
155+
func (kinesis *kinesis) query(params map[string]string, data interface{}, resp interface{}) error {
156156
jsonData, err := json.Marshal(data)
157157
if err != nil {
158158
return err
@@ -195,7 +195,7 @@ func (kinesis *Kinesis) query(params map[string]string, data interface{}, resp i
195195
// CreateStream adds a new Amazon Kinesis stream to your AWS account
196196
// StreamName is a name of stream, ShardCount is number of shards
197197
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html
198-
func (kinesis *Kinesis) CreateStream(StreamName string, ShardCount int) error {
198+
func (kinesis *kinesis) CreateStream(StreamName string, ShardCount int) error {
199199
params := makeParams("CreateStream")
200200
requestParams := struct {
201201
StreamName string
@@ -214,7 +214,7 @@ func (kinesis *Kinesis) CreateStream(StreamName string, ShardCount int) error {
214214
// DeleteStream deletes a stream and all of its shards and data from your AWS account
215215
// StreamName is a name of stream
216216
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeleteStream.html
217-
func (kinesis *Kinesis) DeleteStream(StreamName string) error {
217+
func (kinesis *kinesis) DeleteStream(StreamName string) error {
218218
params := makeParams("DeleteStream")
219219
requestParams := struct {
220220
StreamName string
@@ -230,7 +230,7 @@ func (kinesis *Kinesis) DeleteStream(StreamName string) error {
230230

231231
// MergeShards merges two adjacent shards in a stream and combines them into a single shard to reduce the stream's capacity to ingest and transport data
232232
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_MergeShards.html
233-
func (kinesis *Kinesis) MergeShards(args *RequestArgs) error {
233+
func (kinesis *kinesis) MergeShards(args *RequestArgs) error {
234234
params := makeParams("MergeShards")
235235
err := kinesis.query(params, args.params, nil)
236236
if err != nil {
@@ -241,7 +241,7 @@ func (kinesis *Kinesis) MergeShards(args *RequestArgs) error {
241241

242242
// SplitShard splits a shard into two new shards in the stream, to increase the stream's capacity to ingest and transport data
243243
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_SplitShard.html
244-
func (kinesis *Kinesis) SplitShard(args *RequestArgs) error {
244+
func (kinesis *kinesis) SplitShard(args *RequestArgs) error {
245245
params := makeParams("SplitShard")
246246
err := kinesis.query(params, args.params, nil)
247247
if err != nil {
@@ -258,7 +258,7 @@ type ListStreamsResp struct {
258258

259259
// ListStreams returns an array of the names of all the streams that are associated with the AWS account making the ListStreams request
260260
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html
261-
func (kinesis *Kinesis) ListStreams(args *RequestArgs) (resp *ListStreamsResp, err error) {
261+
func (kinesis *kinesis) ListStreams(args *RequestArgs) (resp *ListStreamsResp, err error) {
262262
params := makeParams("ListStreams")
263263
resp = &ListStreamsResp{}
264264
err = kinesis.query(params, args.params, resp)
@@ -300,7 +300,7 @@ type DescribeStreamResp struct {
300300
// the shard spans, and the IDs of any earlier shards that played in a role in a MergeShards or
301301
// SplitShard operation that created the shard
302302
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html
303-
func (kinesis *Kinesis) DescribeStream(args *RequestArgs) (resp *DescribeStreamResp, err error) {
303+
func (kinesis *kinesis) DescribeStream(args *RequestArgs) (resp *DescribeStreamResp, err error) {
304304
params := makeParams("DescribeStream")
305305
resp = &DescribeStreamResp{}
306306
err = kinesis.query(params, args.params, resp)
@@ -317,7 +317,7 @@ type GetShardIteratorResp struct {
317317

318318
// GetShardIterator returns a shard iterator
319319
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html
320-
func (kinesis *Kinesis) GetShardIterator(args *RequestArgs) (resp *GetShardIteratorResp, err error) {
320+
func (kinesis *kinesis) GetShardIterator(args *RequestArgs) (resp *GetShardIteratorResp, err error) {
321321
params := makeParams("GetShardIterator")
322322
resp = &GetShardIteratorResp{}
323323
err = kinesis.query(params, args.params, resp)
@@ -346,7 +346,7 @@ type GetRecordsResp struct {
346346

347347
// GetRecords returns one or more data records from a shard
348348
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html
349-
func (kinesis *Kinesis) GetRecords(args *RequestArgs) (resp *GetRecordsResp, err error) {
349+
func (kinesis *kinesis) GetRecords(args *RequestArgs) (resp *GetRecordsResp, err error) {
350350
params := makeParams("GetRecords")
351351
resp = &GetRecordsResp{}
352352
err = kinesis.query(params, args.params, resp)
@@ -365,7 +365,7 @@ type PutRecordResp struct {
365365
// PutRecord puts a data record into an Amazon Kinesis stream from a producer.
366366
// args must contain a single record added with AddRecord.
367367
// More info: http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html
368-
func (kinesis *Kinesis) PutRecord(args *RequestArgs) (resp *PutRecordResp, err error) {
368+
func (kinesis *kinesis) PutRecord(args *RequestArgs) (resp *PutRecordResp, err error) {
369369
params := makeParams("PutRecord")
370370

371371
if _, ok := args.params["Data"]; !ok && len(args.Records) == 0 {
@@ -391,7 +391,7 @@ func (kinesis *Kinesis) PutRecord(args *RequestArgs) (resp *PutRecordResp, err e
391391

392392
// PutRecords puts multiple data records into an Amazon Kinesis stream from a producer
393393
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
394-
func (kinesis *Kinesis) PutRecords(args *RequestArgs) (resp *PutRecordsResp, err error) {
394+
func (kinesis *kinesis) PutRecords(args *RequestArgs) (resp *PutRecordsResp, err error) {
395395
params := makeParams("PutRecords")
396396
resp = &PutRecordsResp{}
397397
args.Add("Records", args.Records)

kinesis_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ func TestPutRecordWithAddRecord(t *testing.T) {
144144
// waitForStreamStatus will poll for a stream status repeatedly, once every MS, for up to 1000 MS,
145145
// blocking until the stream has the desired status. It will return an error if the stream never
146146
// achieves the desired status. If a stream doesn’t exist then an error will be returned.
147-
func waitForStreamStatus(client *Kinesis, streamName string, statusToAwait string) error {
147+
func waitForStreamStatus(client KinesisClient, streamName string, statusToAwait string) error {
148148
args := NewArgs()
149149
args.Add("StreamName", streamName)
150150
var resp3 *DescribeStreamResp
@@ -177,7 +177,7 @@ func waitForStreamStatus(client *Kinesis, streamName string, statusToAwait strin
177177
// waitForStreamDeletion will poll for a stream status repeatedly, once every MS, for up to 1000 MS,
178178
// blocking until the stream has been deleted. It will return an error if the stream is never deleted
179179
// or some other error occurs. If it succeeds then the return value will be nil.
180-
func waitForStreamDeletion(client *Kinesis, streamName string) error {
180+
func waitForStreamDeletion(client KinesisClient, streamName string) error {
181181
err := waitForStreamStatus(client, streamName, "FOO")
182182
if !strings.Contains(err.Error(), "not found") {
183183
return err
@@ -186,7 +186,7 @@ func waitForStreamDeletion(client *Kinesis, streamName string) error {
186186
}
187187

188188
// helper
189-
func createStream(client *Kinesis, streamName string, partitions int) error {
189+
func createStream(client KinesisClient, streamName string, partitions int) error {
190190
err := client.CreateStream(streamName, partitions)
191191
if err != nil {
192192
return err

0 commit comments

Comments
 (0)