Skip to content

Commit 1fd5669

Browse files
authored
Conn timeout as flag in topicctl (#200)
* Conn timeout as flag in topicctl * Fixing test cases topicctl
1 parent aedd232 commit 1fd5669

File tree

8 files changed

+31
-13
lines changed

8 files changed

+31
-13
lines changed

cmd/topicctl/subcmd/shared.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"os"
7+
"time"
78

89
"github.com/aws/aws-sdk-go/aws/session"
910
"github.com/hashicorp/go-multierror"
@@ -29,6 +30,7 @@ type sharedOptions struct {
2930
tlsServerName string
3031
zkAddr string
3132
zkPrefix string
33+
connTimeout time.Duration
3234
}
3335

3436
func (s sharedOptions) validate() error {
@@ -164,6 +166,7 @@ func (s sharedOptions) getAdminClient(
164166
Username: s.saslUsername,
165167
SecretsManagerArn: s.saslSecretsManagerArn,
166168
},
169+
ConnTimeout: s.connTimeout,
167170
},
168171
ReadOnly: readOnly,
169172
},
@@ -172,10 +175,11 @@ func (s sharedOptions) getAdminClient(
172175
return admin.NewZKAdminClient(
173176
ctx,
174177
admin.ZKAdminClientConfig{
175-
ZKAddrs: []string{s.zkAddr},
176-
ZKPrefix: s.zkPrefix,
177-
Sess: sess,
178-
ReadOnly: readOnly,
178+
ZKAddrs: []string{s.zkAddr},
179+
ZKPrefix: s.zkPrefix,
180+
Sess: sess,
181+
ReadOnly: readOnly,
182+
KafkaConnTimeout: s.connTimeout,
179183
},
180184
)
181185
}
@@ -275,6 +279,12 @@ func addSharedFlags(cmd *cobra.Command, options *sharedOptions) {
275279
"",
276280
"Prefix for cluster-related nodes in zk",
277281
)
282+
cmd.PersistentFlags().DurationVar(
283+
&options.connTimeout,
284+
"conn-timeout",
285+
10*time.Second,
286+
"Kafka connection timeout",
287+
)
278288
}
279289

280290
func addSharedConfigOnlyFlags(cmd *cobra.Command, options *sharedOptions) {

pkg/admin/connector.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ const (
3535

3636
// ConnectorConfig contains the configuration used to contruct a connector.
3737
type ConnectorConfig struct {
38-
BrokerAddr string
39-
TLS TLSConfig
40-
SASL SASLConfig
38+
BrokerAddr string
39+
TLS TLSConfig
40+
SASL SASLConfig
41+
ConnTimeout time.Duration
4142
}
4243

4344
// TLSConfig stores the TLS-related configuration for a connection.

pkg/admin/zkclient.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ type ZKAdminClientConfig struct {
6363
ExpectedClusterID string
6464
Sess *session.Session
6565
ReadOnly bool
66+
KafkaConnTimeout time.Duration
6667
}
6768

6869
// NewZKAdminClient creates and returns a new Client instance.
@@ -136,7 +137,8 @@ func NewZKAdminClient(
136137
client.bootstrapAddrs = bootstrapAddrs
137138
client.Connector, err = NewConnector(
138139
ConnectorConfig{
139-
BrokerAddr: bootstrapAddrs[0],
140+
BrokerAddr: bootstrapAddrs[0],
141+
ConnTimeout: config.KafkaConnTimeout,
140142
},
141143
)
142144

pkg/groups/groups_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ func TestGetGroups(t *testing.T) {
1818
ctx := context.Background()
1919
connector, err := admin.NewConnector(admin.ConnectorConfig{
2020
BrokerAddr: util.TestKafkaAddr(),
21+
ConnTimeout: 10 * time.Second,
2122
})
2223
require.NoError(t, err)
2324

@@ -83,6 +84,7 @@ func TestGetGroupsMultiMember(t *testing.T) {
8384
ctx := context.Background()
8485
connector, err := admin.NewConnector(admin.ConnectorConfig{
8586
BrokerAddr: util.TestKafkaAddr(),
87+
ConnTimeout: 10 * time.Second,
8688
})
8789
require.NoError(t, err)
8890

@@ -164,6 +166,7 @@ func TestGetGroupsMultiMemberMultiTopic(t *testing.T) {
164166
ctx := context.Background()
165167
connector, err := admin.NewConnector(admin.ConnectorConfig{
166168
BrokerAddr: util.TestKafkaAddr(),
169+
ConnTimeout: 10 * time.Second,
167170
})
168171
require.NoError(t, err)
169172

@@ -260,6 +263,7 @@ func TestGetLags(t *testing.T) {
260263
ctx := context.Background()
261264
connector, err := admin.NewConnector(admin.ConnectorConfig{
262265
BrokerAddr: util.TestKafkaAddr(),
266+
ConnTimeout: 10 * time.Second,
263267
})
264268
require.NoError(t, err)
265269

@@ -303,6 +307,7 @@ func TestGetEarliestOrLatestOffset(t *testing.T) {
303307
ctx := context.Background()
304308
connector, err := admin.NewConnector(admin.ConnectorConfig{
305309
BrokerAddr: util.TestKafkaAddr(),
310+
ConnTimeout: 10 * time.Second,
306311
})
307312
require.NoError(t, err)
308313

@@ -350,6 +355,7 @@ func TestResetOffsets(t *testing.T) {
350355
ctx := context.Background()
351356
connector, err := admin.NewConnector(admin.ConnectorConfig{
352357
BrokerAddr: util.TestKafkaAddr(),
358+
ConnTimeout: 10 * time.Second,
353359
})
354360
require.NoError(t, err)
355361

pkg/messages/bounds.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,6 @@ const (
2626
// Parameters for backoff when there are connection errors
2727
maxRetries = 4
2828
backoffInitSleepDuration = 200 * time.Millisecond
29-
30-
// Connection timeout
31-
connTimeout = 10 * time.Second
3229
)
3330

3431
// Bounds represents the start and end "bounds" of the messages in
@@ -284,6 +281,6 @@ func dialLeaderRetries(
284281
return nil, fmt.Errorf("Error dialing partition %d: %+v", partition, err)
285282
}
286283

287-
conn.SetDeadline(time.Now().Add(connTimeout))
284+
conn.SetDeadline(time.Now().Add(connector.Config.ConnTimeout))
288285
return conn, nil
289286
}

pkg/messages/bounds_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func TestGetAllPartitionBounds(t *testing.T) {
1717
ctx := context.Background()
1818
connector, err := admin.NewConnector(admin.ConnectorConfig{
1919
BrokerAddr: util.TestKafkaAddr(),
20+
ConnTimeout: 10 * time.Second,
2021
})
2122
require.NoError(t, err)
2223

pkg/messages/tail_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ func TestTailerGetMessages(t *testing.T) {
2020

2121
connector, err := admin.NewConnector(admin.ConnectorConfig{
2222
BrokerAddr: util.TestKafkaAddr(),
23+
ConnTimeout: 10 * time.Second,
2324
})
2425
require.NoError(t, err)
2526

pkg/version/version.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
package version
22

33
// Version is the current topicctl version.
4-
const Version = "1.17.0"
4+
const Version = "1.18.0"

0 commit comments

Comments
 (0)