Skip to content

Commit f223d92

Browse files
committed
Reformat lines; Bump Uploader InitialInterval backoff to 5 seconds
1 parent ec2a964 commit f223d92

File tree

3 files changed

+72
-33
lines changed

3 files changed

+72
-33
lines changed

main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111

1212
var (
1313
// Version indicates the current version of the Kafka Achiever.
14-
Version = "0.100"
14+
Version = "0.101"
1515
)
1616

1717
func init() {

uploader.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func (w *Worker) upload(f *buffer.Flush) error {
9393
// Create a new exponential backoff provider.
9494
b := backoff.NewExponentialBackOff()
9595
b.MaxElapsedTime = 5 * time.Minute
96-
b.InitialInterval = 2 * time.Second
96+
b.InitialInterval = 5 * time.Second
9797

9898
// S3 Uploader is wrapped in a backoff function with an exponential provider.
9999
var s3Response *s3manager.UploadOutput

worker.go

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -85,41 +85,69 @@ type Worker struct {
8585
func (w *Worker) Init() {
8686

8787
// Define application flags using the `flag` package.
88-
flag.Var(&w.brokers, "brokers", "Kafka brokers to connect to, as a comma separated list. (required)")
89-
flag.Var(&w.topics, "topics", "Kafka topics to subscribe to.")
90-
flag.StringVar(&w.topicWhitelist, "topic-whitelist", "", "An additional whitelist of topics to subscribe to.")
91-
flag.StringVar(&w.topicBlacklist, "topic-blacklist", "", "An additional blacklist of topics, precedes the whitelist.")
92-
flag.StringVar(&w.consumerGroup, "consumer-group", "", "Name of your kafka consumer group. (required)")
93-
flag.StringVar(&w.consumerAutoOffsetReset, "consumer-auto-offset-reset", "oldest", "Kafka consumer group topic default offset.")
94-
flag.Var(&w.consumerDelayStart, "consumer-delay-start", "Number of seconds to wait before starting consuming. (default \"0s\")")
88+
flag.Var(&w.brokers, "brokers",
89+
"Kafka brokers to connect to, as a comma separated list. (required)")
90+
flag.Var(&w.topics, "topics",
91+
"Kafka topics to subscribe to.")
92+
flag.StringVar(&w.topicWhitelist, "topic-whitelist", "",
93+
"An additional whitelist of topics to subscribe to.")
94+
flag.StringVar(&w.topicBlacklist, "topic-blacklist", "",
95+
"An additional blacklist of topics, precedes the whitelist.")
96+
flag.StringVar(&w.consumerGroup, "consumer-group", "",
97+
"Name of your kafka consumer group. (required)")
98+
flag.StringVar(&w.consumerAutoOffsetReset, "consumer-auto-offset-reset", "oldest",
99+
"Kafka consumer group topic default offset.")
100+
flag.Var(&w.consumerDelayStart, "consumer-delay-start",
101+
"Number of seconds to wait before starting consuming. (default \"0s\")")
95102

96103
// Define Buffer flags.
97-
flag.Var(&w.bufferFlushInterval, "buffer-interval", "The duration when the buffer should close the file. (default \"15m\")")
98-
flag.StringVar(&w.bufferFlushSize, "buffer-size", "100MiB", "The size in bytes when the buffer should close the file.")
99-
flag.IntVar(&w.bufferQueueLength, "buffer-queue-length", 32, "The size of the queue to hold the files to be uploaded.")
100-
flag.StringVar(&w.bufferLocation, "buffer-location", os.TempDir(), "The base folder where to store temporary files.")
101-
flag.StringVar(&w.bufferMem, "buffer-mem", "8KB", "Amount of memory a buffer can use.")
104+
flag.Var(&w.bufferFlushInterval, "buffer-interval",
105+
"The duration when the buffer should close the file. (default \"15m\")")
106+
flag.StringVar(&w.bufferFlushSize, "buffer-size", "100MiB",
107+
"The size in bytes when the buffer should close the file.")
108+
flag.IntVar(&w.bufferQueueLength, "buffer-queue-length", 32,
109+
"The size of the queue to hold the files to be uploaded.")
110+
flag.StringVar(&w.bufferLocation, "buffer-location", os.TempDir(),
111+
"The base folder where to store temporary files.")
112+
flag.StringVar(&w.bufferMem, "buffer-mem", "8KB",
113+
"Amount of memory a buffer can use.")
102114

103115
// Define Partitioner flags.
104-
flag.StringVar(&w.partitionerClass, "partitioner", "DefaultPartitioner", "The name of the partitioner to use.")
105-
flag.StringVar(&w.partitionerFieldName, "partitioner-key", "", "Name of the JSON field to parse.")
106-
flag.StringVar(&w.partitionerPathBaseFolder, "partitioner-path-folder", "backup", "The top level folder to prepend to the path used when partitioning files.")
107-
flag.StringVar(&w.partitionerPathTopicNamePrefix, "partitioner-path-topic-prefix", "topic=", "A prefix to prepend to the path used when partitioning files.")
116+
flag.StringVar(&w.partitionerClass, "partitioner", "DefaultPartitioner",
117+
"The name of the partitioner to use.")
118+
flag.StringVar(&w.partitionerFieldName, "partitioner-key", "",
119+
"Name of the JSON field to parse.")
120+
flag.StringVar(&w.partitionerPathBaseFolder,
121+
"partitioner-path-folder", "backup",
122+
"The top level folder to prepend to the path used when partitioning files.")
123+
flag.StringVar(&w.partitionerPathTopicNamePrefix,
124+
"partitioner-path-topic-prefix", "topic=",
125+
"A prefix to prepend to the path used when partitioning files.")
108126

109127
// Define S3 flags.
110128
flag.BoolVar(&w.s3enabled, "s3", false, "Enable S3 uploader.")
111129
flag.StringVar(&w.s3region, "s3-region", "", "S3 Bucket Region.")
112-
flag.StringVar(&w.s3bucket, "s3-bucket", "", "S3 Bucket where to upload files.")
113-
flag.StringVar(&w.s3endpoint, "s3-endpoint", "", "S3 Bucket Endpoint to use for the client.")
114-
flag.StringVar(&w.s3PartSize, "s3-part-size", "5MiB", "S3 Uploader part size.")
115-
flag.IntVar(&w.s3Concurrency, "s3-concurrency", 5, "S3 Uploader part size.")
116-
flag.BoolVar(&w.s3ForcePathStyle, "s3-force-path-style", false, "Enable to force the request to use path-style addressing on S3.")
117-
flag.BoolVar(&w.s3ClientDebug, "s3-client-debug", false, "Enable to enable debug logging on S3 client.")
118-
flag.StringVar(&w.s3NotificationTopic, "s3-notification-topic", "", "Kafka topic used to store uploaded S3 files.")
130+
flag.StringVar(&w.s3bucket, "s3-bucket", "",
131+
"S3 Bucket where to upload files.")
132+
flag.StringVar(&w.s3endpoint, "s3-endpoint", "",
133+
"S3 Bucket Endpoint to use for the client.")
134+
flag.StringVar(&w.s3PartSize, "s3-part-size", "5MiB",
135+
"S3 Uploader part size.")
136+
flag.IntVar(&w.s3Concurrency, "s3-concurrency", 5,
137+
"S3 Uploader part size.")
138+
flag.BoolVar(&w.s3ForcePathStyle, "s3-force-path-style", false,
139+
"Enable to force the request to use path-style addressing on S3.")
140+
flag.BoolVar(&w.s3ClientDebug, "s3-client-debug", false,
141+
"Enable to enable debug logging on S3 client.")
142+
flag.StringVar(&w.s3NotificationTopic, "s3-notification-topic", "",
143+
"Kafka topic used to store uploaded S3 files.")
119144

120145
// Define other flags.
121-
flag.StringVar(&w.datadogHost, "datadog-host", flagutil.EnvOrDefault("DATADOG_HOST", "localhost:2585"), "The host where the datadog agents listens to.")
122-
flag.StringVar(&w.statsdPrefix, "statsd-prefix", "kafka-archiver", "The name prefix for statsd metrics.")
146+
flag.StringVar(&w.datadogHost, "datadog-host",
147+
flagutil.EnvOrDefault("DATADOG_HOST", "localhost:2585"),
148+
"The host where the datadog agents listens to.")
149+
flag.StringVar(&w.statsdPrefix, "statsd-prefix", "kafka-archiver",
150+
"The name prefix for statsd metrics.")
123151

124152
// Parse the flags.
125153
flag.Parse()
@@ -199,8 +227,12 @@ func (w *Worker) Init() {
199227
BufferSize: int(bufferMemBytes),
200228
}
201229

202-
glog.Infof("created buffer configuration flush-bytes=%d flush-interval=%v buffer-mem=%d",
203-
int64(bufferFlushSize), w.bufferFlushInterval.Duration, int(bufferMemBytes))
230+
glog.Infof(
231+
"created buffer configuration flush-bytes=%v flush-interval=%v buffer-mem=%d",
232+
int64(bufferFlushSize),
233+
w.bufferFlushInterval.Duration,
234+
int(bufferMemBytes),
235+
)
204236

205237
// Create partitioner.
206238
w.partitioner = partitioner.New(&partitioner.Config{
@@ -285,7 +317,12 @@ func (w *Worker) Init() {
285317
}
286318

287319
// Create KAFKA consumer.
288-
glog.Infof("creating consumer brokers=%+v topics=%+v topic-whitelist=%+v topic-blacklist=%+v", w.brokers, w.topics, w.topicWhitelist, w.topicBlacklist)
320+
glog.Infof(
321+
`creating consumer brokers=%+v topics=%+v
322+
topic-whitelist=%+v topic-blacklist=%+v`,
323+
w.brokers, w.topics, w.topicWhitelist, w.topicBlacklist,
324+
)
325+
289326
w.consumer, err = cluster.NewConsumer(w.brokers, w.consumerGroup, w.topics, config)
290327
if err != nil {
291328
glog.Fatal(err)
@@ -303,8 +340,9 @@ func (w *Worker) Init() {
303340
w.wg.Wait()
304341
}
305342

306-
// bufferListener listens on the channel that declares a file as flushed. Calls the upload function
307-
// to perform a backup to a cloud provider. Removes the file and marks the partition offset.
343+
// bufferListener listens on the channel that declares a file as flushed.
344+
// Calls the upload function to perform a backup to a cloud provider.
345+
// Removes the file and marks the partition offset.
308346
func (w *Worker) bufferListener() {
309347
defer w.wg.Done()
310348

@@ -318,7 +356,8 @@ func (w *Worker) bufferListener() {
318356
}
319357

320358
w.consumer.MarkPartitionOffset(fl.Topic, fl.Partition, fl.LastOffset, "")
321-
glog.V(2).Infof("marked offset: topic=%s partition=%d offset=%d", fl.Topic, fl.Partition, fl.LastOffset)
359+
glog.V(2).Infof("marked offset: topic=%s partition=%d offset=%d",
360+
fl.Topic, fl.Partition, fl.LastOffset)
322361
}
323362
}
324363

0 commit comments

Comments
 (0)