Skip to content

Commit f88dd4d

Browse files
committed
Add uploader_test.go
1 parent c34c83d commit f88dd4d

File tree

1 file changed

+148
-0
lines changed

1 file changed

+148
-0
lines changed

uploader_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"io/ioutil"
6+
"testing"
7+
"time"
8+
9+
"github.com/UnityTech/kafka-archiver/buffer"
10+
"github.com/UnityTech/kafka-archiver/partitioner"
11+
"github.com/aws/aws-sdk-go/aws/request"
12+
"github.com/aws/aws-sdk-go/aws/session"
13+
"github.com/aws/aws-sdk-go/service/s3"
14+
"github.com/aws/aws-sdk-go/service/s3/s3manager"
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
func TestUploadWithDefaultPartitioner(t *testing.T) {
19+
w := NewWorker()
20+
w.s3enabled = true
21+
w.s3Concurrency = 1
22+
w.partitioner = partitioner.New(&partitioner.Config{Type: "DefaultPartitioner"})
23+
w.s3bucket = "test-s3-bucket"
24+
25+
s3ClientMock := s3.New(session.New())
26+
s3ClientMock.Handlers.Clear()
27+
s3ClientMock.Handlers.Send.PushBack(func(r *request.Request) {
28+
switch x := r.Params.(type) {
29+
case *s3.PutObjectInput:
30+
bt, err := ioutil.ReadAll(x.Body)
31+
require.NoError(t, err)
32+
require.Equal(t, "{\"message\": 1}{\"message\": 2}{\"message\": 3}", string(bt))
33+
require.Contains(t, *x.Key, "/testTopic1/partition=10/0010.5555.")
34+
require.Equal(t, *x.Bucket, w.s3bucket)
35+
}
36+
})
37+
38+
w.uploader = s3manager.NewUploaderWithClient(s3ClientMock)
39+
40+
f, err := ioutil.TempFile("", "kafka-archiver-upload-test")
41+
require.NoError(t, err)
42+
43+
f.WriteString(`{"message": 1}`)
44+
f.WriteString(`{"message": 2}`)
45+
f.WriteString(`{"message": 3}`)
46+
47+
require.NoError(t, f.Close())
48+
49+
w.upload(&buffer.Flush{
50+
Topic: "testTopic1",
51+
Path: f.Name(),
52+
Partition: 10,
53+
LastOffset: 5555,
54+
})
55+
}
56+
57+
func TestUploadWithTimeFieldPartitioner(t *testing.T) {
58+
w := NewWorker()
59+
w.s3enabled = true
60+
w.s3Concurrency = 1
61+
w.partitioner = partitioner.New(&partitioner.Config{
62+
Type: "TimeFieldPartitioner",
63+
FieldKeyName: "ts",
64+
TopicNamePrefix: "topic=",
65+
BaseFolder: "backup",
66+
})
67+
w.s3bucket = "test-s3-bucket-timefield"
68+
69+
s3ClientMock := s3.New(session.New())
70+
s3ClientMock.Handlers.Clear()
71+
s3ClientMock.Handlers.Send.PushBack(func(r *request.Request) {
72+
switch x := r.Params.(type) {
73+
case *s3.PutObjectInput:
74+
bt, err := ioutil.ReadAll(x.Body)
75+
require.NoError(t, err)
76+
require.Equal(t, "{\"message\": 1, \"ts\": 1444262400}{\"message\": 2, \"ts\": 1444262401}{\"message\": 3, \"ts\": 1444262402}", string(bt))
77+
require.Contains(t, *x.Key, "backup/topic=testTopic2/year=2015/month=10/day=08/hour=00/0010.808.")
78+
require.Equal(t, *x.Bucket, w.s3bucket)
79+
}
80+
})
81+
82+
w.uploader = s3manager.NewUploaderWithClient(s3ClientMock)
83+
84+
f, err := ioutil.TempFile("", "kafka-archiver-upload-test")
85+
require.NoError(t, err)
86+
87+
f.WriteString(`{"message": 1, "ts": 1444262400}`)
88+
f.WriteString(`{"message": 2, "ts": 1444262401}`)
89+
f.WriteString(`{"message": 3, "ts": 1444262402}`)
90+
91+
require.NoError(t, f.Close())
92+
93+
w.upload(&buffer.Flush{
94+
Topic: "testTopic2",
95+
Path: f.Name(),
96+
Partition: 10,
97+
LastOffset: 808,
98+
Ctx: context.WithValue(context.Background(), "hourBucket", time.Unix(1444262400, 0).UTC()),
99+
})
100+
}
101+
102+
func TestUploadWithIsoDateFieldPartitioner(t *testing.T) {
103+
w := NewWorker()
104+
w.s3enabled = true
105+
w.s3Concurrency = 1
106+
w.partitioner = partitioner.New(&partitioner.Config{
107+
Type: "IsoDateFieldPartitioner",
108+
FieldKeyName: "ts",
109+
TopicNamePrefix: "topic=",
110+
BaseFolder: "backup",
111+
})
112+
w.s3bucket = "test-s3-bucket-timefield"
113+
114+
s3ClientMock := s3.New(session.New())
115+
s3ClientMock.Handlers.Clear()
116+
s3ClientMock.Handlers.Send.PushBack(func(r *request.Request) {
117+
switch x := r.Params.(type) {
118+
case *s3.PutObjectInput:
119+
bt, err := ioutil.ReadAll(x.Body)
120+
require.NoError(t, err)
121+
require.Equal(t, "{\"message\": 1, \"ts\": \"2015-10-08T00:00:00Z\"}{\"message\": 2, \"ts\": \"2015-10-08T00:00:01Z\"}{\"message\": 3, \"ts\": \"2015-10-08T00:00:02Z\"}", string(bt))
122+
require.Contains(t, *x.Key, "backup/topic=testTopic2/date=2015-10-08/hour=00/0010.808.")
123+
require.Equal(t, *x.Bucket, w.s3bucket)
124+
}
125+
})
126+
127+
w.uploader = s3manager.NewUploaderWithClient(s3ClientMock)
128+
129+
f, err := ioutil.TempFile("", "kafka-archiver-upload-test")
130+
require.NoError(t, err)
131+
132+
f.WriteString(`{"message": 1, "ts": "2015-10-08T00:00:00Z"}`)
133+
f.WriteString(`{"message": 2, "ts": "2015-10-08T00:00:01Z"}`)
134+
f.WriteString(`{"message": 3, "ts": "2015-10-08T00:00:02Z"}`)
135+
136+
require.NoError(t, f.Close())
137+
138+
hourBucket, err := time.Parse(time.RFC3339Nano, "2015-10-08T00:00:00Z")
139+
require.NoError(t, err)
140+
141+
w.upload(&buffer.Flush{
142+
Topic: "testTopic2",
143+
Path: f.Name(),
144+
Partition: 10,
145+
LastOffset: 808,
146+
Ctx: context.WithValue(context.Background(), "hourBucket", hourBucket.UTC()),
147+
})
148+
}

0 commit comments

Comments
 (0)