Skip to content

Commit ad7c280

Browse files
committed
merge
2 parents 8f0ac1d + 82af5bf commit ad7c280

File tree

3 files changed

+96
-8
lines changed

3 files changed

+96
-8
lines changed

examples/example.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ func main() {
7171

7272
}
7373

74+
// Put records individually
7475
for i := 0; i < 10; i++ {
7576
args = kinesis.NewArgs()
7677
args.Add("StreamName", streamName)
@@ -88,9 +89,29 @@ func main() {
8889
go getRecords(ksis, streamName, shard.ShardId)
8990
}
9091

92+
// Put records in batch
93+
args = kinesis.NewArgs()
94+
args.Add("StreamName", streamName)
95+
96+
for i := 0; i < 10; i++ {
97+
args.AddRecord(
98+
[]byte(fmt.Sprintf("Hello AWS Kinesis %d", i)),
99+
fmt.Sprintf("partitionKey-%d", i),
100+
)
101+
}
102+
103+
resp4, err := ksis.PutRecords(args)
104+
if err != nil {
105+
fmt.Printf("PutRecords err: %v\n", err)
106+
} else {
107+
fmt.Printf("PutRecords: %v\n", resp4)
108+
}
109+
110+
// Wait for user input
91111
var inputGuess string
92112
fmt.Scanf("%s\n", &inputGuess)
93113

114+
// Delete the stream
94115
err1 := ksis.DeleteStream("test")
95116
if err1 != nil {
96117
fmt.Printf("DeleteStream ERROR: %v\n", err1)

kinesis.go

Lines changed: 62 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,14 @@ type Kinesis struct {
3636
type KinesisClient interface {
3737
CreateStream(StreamName string, ShardCount int) error
3838
DeleteStream(StreamName string) error
39-
MergeShards(args *RequestArgs) error
40-
SplitShard(args *RequestArgs) error
41-
ListStreams(args *RequestArgs) (resp *ListStreamsResp, err error)
4239
DescribeStream(args *RequestArgs) (resp *DescribeStreamResp, err error)
43-
GetShardIterator(args *RequestArgs) (resp *GetShardIteratorResp, err error)
4440
GetRecords(args *RequestArgs) (resp *GetRecordsResp, err error)
41+
GetShardIterator(args *RequestArgs) (resp *GetShardIteratorResp, err error)
42+
ListStreams(args *RequestArgs) (resp *ListStreamsResp, err error)
43+
MergeShards(args *RequestArgs) error
4544
PutRecord(args *RequestArgs) (resp *PutRecordResp, err error)
45+
PutRecords(args *RequestArgs) (resp *PutRecordsResp, err error)
46+
SplitShard(args *RequestArgs) error
4647
}
4748

4849
// Initialize new client for AWS Kinesis
@@ -63,12 +64,15 @@ func makeParams(action string) map[string]string {
6364

6465
// RequestArgs store params for request
6566
type RequestArgs struct {
66-
params map[string]interface{}
67+
params map[string]interface{}
68+
Records []PutRecordsRecord
6769
}
6870

6971
// NewFilter creates a new Filter.
7072
func NewArgs() *RequestArgs {
71-
return &RequestArgs{make(map[string]interface{})}
73+
return &RequestArgs{
74+
params: make(map[string]interface{}),
75+
}
7276
}
7377

7478
// Add appends a filtering parameter with the given name and value(s).
@@ -124,14 +128,21 @@ func (kinesis *Kinesis) query(params map[string]string, data interface{}, resp i
124128
}
125129

126130
// request
127-
request, err := http.NewRequest("POST", fmt.Sprintf("https://kinesis.%s.amazonaws.com", kinesis.Region), bytes.NewReader(jsonData))
131+
request, err := http.NewRequest(
132+
"POST",
133+
fmt.Sprintf("https://kinesis.%s.amazonaws.com", kinesis.Region),
134+
bytes.NewReader(jsonData),
135+
)
136+
128137
if err != nil {
129138
return err
130139
}
140+
131141
// headers
132142
request.Header.Set("Content-Type", "application/x-amz-json-1.1")
133143
request.Header.Set("X-Amz-Target", fmt.Sprintf("Kinesis_%s.%s", kinesis.Version, params[ACTION_KEY]))
134144
request.Header.Set("User-Agent", "Golang Kinesis")
145+
135146
// response
136147
response, err := kinesis.client.Do(request)
137148
if err != nil {
@@ -292,7 +303,7 @@ type GetRecordsRecords struct {
292303
SequenceNumber string
293304
}
294305

295-
func (r GetRecordsRecords) GetData() ([]byte) {
306+
func (r GetRecordsRecords) GetData() []byte {
296307
return r.Data
297308
}
298309

@@ -331,3 +342,46 @@ func (kinesis *Kinesis) PutRecord(args *RequestArgs) (resp *PutRecordResp, err e
331342
}
332343
return
333344
}
345+
346+
// PutRecords puts multiple data records into an Amazon Kinesis stream from a producer
347+
// more info http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html
348+
func (kinesis *Kinesis) PutRecords(args *RequestArgs) (resp *PutRecordsResp, err error) {
349+
params := makeParams("PutRecords")
350+
resp = &PutRecordsResp{}
351+
args.Add("Records", args.Records)
352+
err = kinesis.query(params, args.params, resp)
353+
354+
if err != nil {
355+
return nil, err
356+
}
357+
return
358+
}
359+
360+
// PutRecordsResp stores the information that provides by PutRecord API call
361+
type PutRecordsResp struct {
362+
FailedRecordCount int
363+
Records []PutRecordsRespRecord
364+
}
365+
366+
// RecordResp stores individual Record information provided by PutRecords API call
367+
type PutRecordsRespRecord struct {
368+
ErrorCode string
369+
ErrorMessage string
370+
SequenceNumber string
371+
ShardId string
372+
}
373+
374+
// Add data and partition for sending multiple Records to Kinesis in one API call
375+
func (f *RequestArgs) AddRecord(value []byte, partitionKey string) {
376+
r := PutRecordsRecord{
377+
Data: value,
378+
PartitionKey: partitionKey,
379+
}
380+
f.Records = append(f.Records, r)
381+
}
382+
383+
// PutRecordsRecord stores the Data and PartitionKey for batch calls to Kinesis API
384+
type PutRecordsRecord struct {
385+
Data []byte
386+
PartitionKey string
387+
}

kinesis_test.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,3 +22,16 @@ func TestRegions(t *testing.T) {
2222
t.Errorf("%q != %q", USEast.Name, "us-east-1")
2323
}
2424
}
25+
26+
func TestAddRecord(t *testing.T) {
27+
args := NewArgs()
28+
29+
args.AddRecord(
30+
[]byte("data"),
31+
"partition_key",
32+
)
33+
34+
if len(args.Records) != 1 {
35+
t.Errorf("%q != %q", len(args.Records), 1)
36+
}
37+
}

0 commit comments

Comments
 (0)