Skip to content

Commit 0cec171

Browse files
committed
#12 fix formating
1 parent a5e47ee commit 0cec171

File tree

7 files changed

+696
-696
lines changed

7 files changed

+696
-696
lines changed

client.go

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,56 @@
11
package kinesis
22

33
import (
4-
"net/http"
5-
"os"
4+
"net/http"
5+
"os"
66
)
77

88
const (
9-
ACCESS_ENV_KEY = "AWS_ACCESS_KEY"
10-
SECRET_ENV_KEY = "AWS_SECRET_KEY"
11-
REGION_ENV_NAME = "AWS_REGION_NAME"
9+
ACCESS_ENV_KEY = "AWS_ACCESS_KEY"
10+
SECRET_ENV_KEY = "AWS_SECRET_KEY"
11+
REGION_ENV_NAME = "AWS_REGION_NAME"
1212
)
1313

1414
// Auth store information about AWS Credentials
1515
type Auth struct {
16-
AccessKey, SecretKey, Token string
16+
AccessKey, SecretKey, Token string
1717
}
1818

1919
// Client is like http.Client, but signs all requests using Auth.
2020
type Client struct {
21-
Auth *Auth
22-
// The http client to make requests with. If nil, http.DefaultClient is used.
23-
Client *http.Client
21+
Auth *Auth
22+
// The http client to make requests with. If nil, http.DefaultClient is used.
23+
Client *http.Client
2424
}
2525

2626
// New creates a new Client.
2727
func NewClient(auth *Auth) *Client {
28-
if auth.AccessKey == "" {
29-
auth.AccessKey = os.Getenv(ACCESS_ENV_KEY)
30-
}
31-
if auth.SecretKey == "" {
32-
auth.SecretKey = os.Getenv(SECRET_ENV_KEY)
33-
}
34-
return &Client{Auth: auth}
28+
if auth.AccessKey == "" {
29+
auth.AccessKey = os.Getenv(ACCESS_ENV_KEY)
30+
}
31+
if auth.SecretKey == "" {
32+
auth.SecretKey = os.Getenv(SECRET_ENV_KEY)
33+
}
34+
return &Client{Auth: auth}
3535
}
3636

3737
func GetRegion(region Region) string {
38-
if region.Name == "" {
39-
return os.Getenv(REGION_ENV_NAME)
40-
}
41-
return region.Name
38+
if region.Name == "" {
39+
return os.Getenv(REGION_ENV_NAME)
40+
}
41+
return region.Name
4242
}
4343

4444
// get client
4545
func (c *Client) client() *http.Client {
46-
if c.Client == nil {
47-
return http.DefaultClient
48-
}
49-
return c.Client
46+
if c.Client == nil {
47+
return http.DefaultClient
48+
}
49+
return c.Client
5050
}
5151

5252
// do some request, but sign it before sending
5353
func (c *Client) Do(req *http.Request) (resp *http.Response, err error) {
54-
Sign(c.Auth, req)
55-
return c.client().Do(req)
54+
Sign(c.Auth, req)
55+
return c.client().Do(req)
5656
}

examples/example.go

Lines changed: 90 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,101 +1,101 @@
11
package main
22

33
import (
4-
"fmt"
5-
"time"
4+
"fmt"
5+
"time"
66

7-
kinesis "github.com/sendgridlabs/go-kinesis"
7+
kinesis "github.com/sendgridlabs/go-kinesis"
88
)
99

1010
func getRecords(ksis *kinesis.Kinesis, streamName, ShardId string) {
11-
args := kinesis.NewArgs()
12-
args.Add("StreamName", streamName)
13-
args.Add("ShardId", ShardId)
14-
args.Add("ShardIteratorType", "TRIM_HORIZON")
15-
resp10, _ := ksis.GetShardIterator(args)
16-
17-
shardIterator := resp10.ShardIterator
18-
19-
for {
20-
args = kinesis.NewArgs()
21-
args.Add("ShardIterator", shardIterator)
22-
resp11, err := ksis.GetRecords(args)
23-
24-
if len(resp11.Records) > 0 {
25-
fmt.Printf("GetRecords Data BEGIN\n")
26-
for _, d := range resp11.Records {
27-
res, err := d.GetData()
28-
fmt.Printf("GetRecords Data: %v, err: %v\n", string(res), err)
29-
}
30-
fmt.Printf("GetRecords Data END\n")
31-
} else if resp11.NextShardIterator == "" || shardIterator == resp11.NextShardIterator || err != nil {
32-
fmt.Printf("GetRecords ERROR: %v\n", err)
33-
break
34-
}
35-
36-
shardIterator = resp11.NextShardIterator
37-
}
11+
args := kinesis.NewArgs()
12+
args.Add("StreamName", streamName)
13+
args.Add("ShardId", ShardId)
14+
args.Add("ShardIteratorType", "TRIM_HORIZON")
15+
resp10, _ := ksis.GetShardIterator(args)
16+
17+
shardIterator := resp10.ShardIterator
18+
19+
for {
20+
args = kinesis.NewArgs()
21+
args.Add("ShardIterator", shardIterator)
22+
resp11, err := ksis.GetRecords(args)
23+
24+
if len(resp11.Records) > 0 {
25+
fmt.Printf("GetRecords Data BEGIN\n")
26+
for _, d := range resp11.Records {
27+
res, err := d.GetData()
28+
fmt.Printf("GetRecords Data: %v, err: %v\n", string(res), err)
29+
}
30+
fmt.Printf("GetRecords Data END\n")
31+
} else if resp11.NextShardIterator == "" || shardIterator == resp11.NextShardIterator || err != nil {
32+
fmt.Printf("GetRecords ERROR: %v\n", err)
33+
break
34+
}
35+
36+
shardIterator = resp11.NextShardIterator
37+
}
3838
}
3939

4040
func main() {
41-
fmt.Println("Begin")
42-
43-
streamName := "test"
44-
// set env variables AWS_ACCESS_KEY and AWS_SECRET_KEY AWS_REGION_NAME
45-
ksis := kinesis.New("", "", kinesis.Region{})
46-
47-
err := ksis.CreateStream(streamName, 2)
48-
if err != nil {
49-
fmt.Printf("CreateStream ERROR: %v\n", err)
50-
}
51-
52-
args := kinesis.NewArgs()
53-
resp2, _ := ksis.ListStreams(args)
54-
fmt.Printf("ListStreams: %v\n", resp2)
55-
56-
resp3 := &kinesis.DescribeStreamResp{}
57-
58-
timeout := make(chan bool, 30)
59-
for {
60-
61-
args = kinesis.NewArgs()
62-
args.Add("StreamName", streamName)
63-
resp3, _ = ksis.DescribeStream(args)
64-
fmt.Printf("DescribeStream: %v\n", resp3)
65-
66-
if resp3.StreamDescription.StreamStatus != "ACTIVE" {
67-
time.Sleep(4 * time.Second)
68-
timeout <- true
69-
} else {
70-
break
71-
}
72-
73-
}
74-
75-
for i := 0; i < 10; i++ {
76-
args = kinesis.NewArgs()
77-
args.Add("StreamName", streamName)
78-
args.AddData([]byte(fmt.Sprintf("Hello AWS Kinesis %d", i)))
79-
args.Add("PartitionKey", fmt.Sprintf("partitionKey-%d", i))
80-
resp4, err := ksis.PutRecord(args)
81-
if err != nil {
82-
fmt.Printf("PutRecord err: %v\n", err)
83-
} else {
84-
fmt.Printf("PutRecord: %v\n", resp4)
85-
}
86-
}
87-
88-
for _, shard := range resp3.StreamDescription.Shards {
89-
go getRecords(ksis, streamName, shard.ShardId)
90-
}
91-
92-
var inputGuess string
93-
fmt.Scanf("%s\n", &inputGuess)
94-
95-
err1 := ksis.DeleteStream("test")
96-
if err1 != nil {
97-
fmt.Printf("DeleteStream ERROR: %v\n", err1)
98-
}
99-
100-
fmt.Println("End")
41+
fmt.Println("Begin")
42+
43+
streamName := "test"
44+
// set env variables AWS_ACCESS_KEY and AWS_SECRET_KEY AWS_REGION_NAME
45+
ksis := kinesis.New("", "", kinesis.Region{})
46+
47+
err := ksis.CreateStream(streamName, 2)
48+
if err != nil {
49+
fmt.Printf("CreateStream ERROR: %v\n", err)
50+
}
51+
52+
args := kinesis.NewArgs()
53+
resp2, _ := ksis.ListStreams(args)
54+
fmt.Printf("ListStreams: %v\n", resp2)
55+
56+
resp3 := &kinesis.DescribeStreamResp{}
57+
58+
timeout := make(chan bool, 30)
59+
for {
60+
61+
args = kinesis.NewArgs()
62+
args.Add("StreamName", streamName)
63+
resp3, _ = ksis.DescribeStream(args)
64+
fmt.Printf("DescribeStream: %v\n", resp3)
65+
66+
if resp3.StreamDescription.StreamStatus != "ACTIVE" {
67+
time.Sleep(4 * time.Second)
68+
timeout <- true
69+
} else {
70+
break
71+
}
72+
73+
}
74+
75+
for i := 0; i < 10; i++ {
76+
args = kinesis.NewArgs()
77+
args.Add("StreamName", streamName)
78+
args.AddData([]byte(fmt.Sprintf("Hello AWS Kinesis %d", i)))
79+
args.Add("PartitionKey", fmt.Sprintf("partitionKey-%d", i))
80+
resp4, err := ksis.PutRecord(args)
81+
if err != nil {
82+
fmt.Printf("PutRecord err: %v\n", err)
83+
} else {
84+
fmt.Printf("PutRecord: %v\n", resp4)
85+
}
86+
}
87+
88+
for _, shard := range resp3.StreamDescription.Shards {
89+
go getRecords(ksis, streamName, shard.ShardId)
90+
}
91+
92+
var inputGuess string
93+
fmt.Scanf("%s\n", &inputGuess)
94+
95+
err1 := ksis.DeleteStream("test")
96+
if err1 != nil {
97+
fmt.Printf("DeleteStream ERROR: %v\n", err1)
98+
}
99+
100+
fmt.Println("End")
101101
}

0 commit comments

Comments
 (0)