Skip to content

Commit d02d139

Browse files
committed
kingpin to kong
1 parent 1f99c7f commit d02d139

File tree

345 files changed

+61652
-11944
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

345 files changed

+61652
-11944
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
## Changelog
22

3+
### [v0.1.1](https://github.com/grepplabs/mqtt-proxy/releases/tag/v0.1.1) - 6 November 2022
4+
* Update dependencies
5+
* Migrate from [kingpin](https://github.com/alecthomas/kingpin) to [kong](https://github.com/alecthomas/kong)
6+
37
### [v0.1.0](https://github.com/grepplabs/mqtt-proxy/releases/tag/v0.1.0) - 1 November 2022
48
* Add server certificates rotation
59
* Add client certificate revocation list

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
.PHONY: clean build fmt test
44

5-
TAG ?= v0.1.0
5+
TAG ?= v0.1.1
66

77
BUILD_FLAGS ?=
88
BINARY ?= mqtt-proxy

cmd/root.go

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,47 +5,52 @@ import (
55
"fmt"
66
"os"
77
"os/signal"
8-
"path/filepath"
98
"syscall"
109

10+
"github.com/alecthomas/kong"
11+
kongyaml "github.com/alecthomas/kong-yaml"
12+
"github.com/grepplabs/mqtt-proxy/pkg/config"
1113
"github.com/grepplabs/mqtt-proxy/pkg/log"
1214
"github.com/oklog/run"
1315
"github.com/prometheus/client_golang/prometheus"
1416
"github.com/prometheus/client_golang/prometheus/collectors"
1517
"github.com/prometheus/common/version"
1618
"go.uber.org/automaxprocs/maxprocs"
17-
"gopkg.in/alecthomas/kingpin.v2"
1819
)
1920

2021
type setupFunc func(*run.Group, log.Logger, *prometheus.Registry) error
2122

22-
func Execute() {
23-
app := kingpin.New(filepath.Base(os.Args[0]), "MQTT Proxy")
24-
25-
app.Version(version.Print("mqtt-proxy"))
26-
app.HelpFlag.Short('h')
27-
28-
logConfig := log.LogConfig{}
29-
app.Flag("log.level", "Log filtering One of: [fatal, error, warn, info, debug]").Default(log.Info).EnumVar(&logConfig.LogLevel, log.Fatal, log.Error, log.Warn, log.Info, log.Debug)
30-
app.Flag("log.format", "Log format to use. One of: [logfmt, json, plain]").Default(log.LogFormatLogfmt).EnumVar(&logConfig.LogFormat, log.LogFormatLogfmt, log.LogFormatJson, log.LogFormatPlain)
31-
app.Flag("log.field-name.time", "Log time field name").Default(log.TimeKey).StringVar(&logConfig.LogFieldNames.Time)
32-
app.Flag("log.field-name.message", "Log message field name").Default(log.MessageKey).StringVar(&logConfig.LogFieldNames.Message)
33-
app.Flag("log.field-name.error", "Log error field name").Default(log.ErrorKey).StringVar(&logConfig.LogFieldNames.Error)
34-
app.Flag("log.field-name.caller", "Log caller field name").Default(log.CallerKey).StringVar(&logConfig.LogFieldNames.Caller)
35-
app.Flag("log.field-name.level", "Log time field name").Default(log.LevelKey).StringVar(&logConfig.LogFieldNames.Level)
23+
type CLI struct {
24+
LogConfig log.Config `embed:"" prefix:"log."`
25+
Server config.Server `name:"server" cmd:"" help:"MQTT Proxy"`
26+
Version struct{} `name:"version" cmd:"" help:"Version information"`
27+
}
3628

29+
func Execute() {
3730
cmds := map[string]setupFunc{}
3831

39-
registerServer(cmds, app)
40-
41-
cmd, err := app.Parse(os.Args[1:])
42-
if err != nil {
43-
_, _ = fmt.Fprintln(os.Stderr, fmt.Errorf("error parsing commandline arguments: %w", err))
44-
app.Usage(os.Args[1:])
45-
os.Exit(2)
32+
var cli CLI
33+
ctx := kong.Parse(&cli,
34+
kong.Name(os.Args[0]),
35+
kong.Description("MQTT Proxy"),
36+
kong.Configuration(kong.JSON, "/etc/mqtt-proxy/config.json", "~/.mqtt-proxy.json"),
37+
kong.Configuration(kongyaml.Loader, "/etc/mqtt-proxy/config.yaml", "~/.mqtt-proxy.yaml"),
38+
kong.UsageOnError(),
39+
log.Vars(), config.ServerVars())
40+
switch ctx.Command() {
41+
case "server":
42+
cmds[ctx.Command()] = func(group *run.Group, logger log.Logger, registry *prometheus.Registry) error {
43+
return runServer(group, logger, registry, &cli.Server)
44+
}
45+
case "version":
46+
fmt.Println(version.Print("mqtt-proxy"))
47+
os.Exit(0)
48+
default:
49+
fmt.Println(ctx.Command())
50+
os.Exit(1)
4651
}
4752

48-
logger := log.NewLogger(logConfig)
53+
logger := log.NewLogger(cli.LogConfig)
4954
log.InitInstance(logger)
5055

5156
undo, err := maxprocs.Set(maxprocs.Logger(func(template string, args ...interface{}) {
@@ -68,9 +73,9 @@ func Execute() {
6873
)
6974
var g run.Group
7075

71-
if err := cmds[cmd](&g, logger, metrics); err != nil {
76+
if err := cmds[ctx.Command()](&g, logger, metrics); err != nil {
7277
// Use %+v for github.com/pkg/errors error to print with stack.
73-
logger.WithError(err).Fatalf("preparing %s command failed", cmd)
78+
logger.WithError(err).Fatalf("preparing %s command failed", ctx.Command())
7479
}
7580
{
7681
cancel := make(chan struct{})
@@ -81,7 +86,7 @@ func Execute() {
8186
})
8287
}
8388
if err := g.Run(); err != nil {
84-
logger.WithError(err).Fatalf("%s command failed", cmd)
89+
logger.WithError(err).Fatalf("%s command failed", ctx.Command())
8590
}
8691
logger.Infof("exiting")
8792
}

cmd/root_test.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
package cmd
2+
3+
import (
4+
"github.com/alecthomas/kong"
5+
"github.com/confluentinc/confluent-kafka-go/kafka"
6+
"github.com/grepplabs/mqtt-proxy/pkg/config"
7+
"github.com/grepplabs/mqtt-proxy/pkg/log"
8+
"github.com/stretchr/testify/require"
9+
"testing"
10+
)
11+
12+
func TestDefaultServerConfig(t *testing.T) {
13+
testCLI, command, err := parseTestCLI([]string{"server"})
14+
require.NoError(t, err)
15+
require.Equal(t, "server", command)
16+
require.Nil(t, testCLI.Server.MQTT.Handler.Authenticator.Plain.Credentials)
17+
require.Nil(t, testCLI.Server.MQTT.Publisher.Kafka.ConfArgs.ConfigMap())
18+
require.Nil(t, testCLI.Server.MQTT.Publisher.Kafka.TopicMappings.Mappings)
19+
require.Equal(t, 1, testCLI.Server.MQTT.Publisher.Kafka.Workers)
20+
}
21+
22+
func TestKafkaConfigOneParam(t *testing.T) {
23+
testCLI, _, err := parseTestCLI([]string{
24+
"server",
25+
"--mqtt.publisher.kafka.config", "producer.sasl.mechanisms=PLAIN,producer.security.protocol=SASL_SSL,producer.sasl.username=myuser,producer.sasl.password=mypasswd",
26+
})
27+
require.NoError(t, err)
28+
require.EqualValues(t, map[string]kafka.ConfigValue{
29+
"producer.sasl.mechanisms": "PLAIN",
30+
"producer.security.protocol": "SASL_SSL",
31+
"producer.sasl.username": "myuser",
32+
"producer.sasl.password": "mypasswd",
33+
}, testCLI.Server.MQTT.Publisher.Kafka.ConfArgs.ConfigMap())
34+
}
35+
36+
func TestKafkaConfigMultipleParams(t *testing.T) {
37+
testCLI, _, err := parseTestCLI([]string{
38+
"server",
39+
"--mqtt.publisher.kafka.config", "producer.sasl.mechanisms=PLAIN,producer.security.protocol=SASL_SSL",
40+
"--mqtt.publisher.kafka.config", "producer.sasl.username=myuser,producer.sasl.password=mypasswd",
41+
})
42+
require.NoError(t, err)
43+
require.EqualValues(t, map[string]kafka.ConfigValue{
44+
"producer.sasl.mechanisms": "PLAIN",
45+
"producer.security.protocol": "SASL_SSL",
46+
"producer.sasl.username": "myuser",
47+
"producer.sasl.password": "mypasswd",
48+
}, testCLI.Server.MQTT.Publisher.Kafka.ConfArgs.ConfigMap())
49+
}
50+
51+
func TestTopicMappingConfig(t *testing.T) {
52+
testCLI, _, err := parseTestCLI([]string{
53+
"server",
54+
"--mqtt.publisher.kafka.topic-mappings", "temperature=temperature, humidity=.*/humidity,brightness=.*brightness, temperature=^cool$",
55+
})
56+
require.NoError(t, err)
57+
require.Equal(t, 4, len(testCLI.Server.MQTT.Publisher.Kafka.TopicMappings.Mappings))
58+
}
59+
60+
func TestPlainCredentialsConfig(t *testing.T) {
61+
testCLI, _, err := parseTestCLI([]string{
62+
"server",
63+
"--mqtt.handler.auth.plain.credentials", "alice=test1",
64+
"--mqtt.handler.auth.plain.credentials", "bob=test2",
65+
})
66+
require.NoError(t, err)
67+
require.EqualValues(t, map[string]string{
68+
"alice": "test1",
69+
"bob": "test2",
70+
}, testCLI.Server.MQTT.Handler.Authenticator.Plain.Credentials)
71+
}
72+
73+
func parseTestCLI(args []string) (*CLI, string, error) {
74+
testCLI := &CLI{}
75+
parser, err := kong.New(testCLI,
76+
kong.Name("mqtt-proxy"),
77+
kong.Description("MQTT Proxy"),
78+
log.Vars(), config.ServerVars())
79+
if err != nil {
80+
return nil, "", err
81+
}
82+
command, err := parser.Parse(args)
83+
if err != nil {
84+
return nil, "", err
85+
}
86+
return testCLI, command.Command(), nil
87+
}

cmd/server.go

Lines changed: 0 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -23,62 +23,8 @@ import (
2323
"github.com/oklog/run"
2424
"github.com/prometheus/client_golang/prometheus"
2525
"github.com/prometheus/client_golang/prometheus/promauto"
26-
"gopkg.in/alecthomas/kingpin.v2"
2726
)
2827

29-
func registerServer(m map[string]setupFunc, app *kingpin.Application) {
30-
command := "server"
31-
32-
cmd := app.Command(command, "mqtt-proxy server")
33-
34-
cfg := new(config.Server)
35-
cfg.Init()
36-
37-
cmd.Flag("http.listen-address", "Listen host:port for HTTP endpoints.").Default("0.0.0.0:9090").StringVar(&cfg.HTTP.ListenAddress)
38-
cmd.Flag("http.grace-period", "Time to wait after an interrupt received for HTTP Server.").Default("10s").DurationVar(&cfg.HTTP.GracePeriod)
39-
40-
cmd.Flag("mqtt.listen-address", "Listen host:port for MQTT endpoints.").Default("0.0.0.0:1883").StringVar(&cfg.MQTT.ListenAddress)
41-
cmd.Flag("mqtt.grace-period", "Time to wait after an interrupt received for MQTT Server.").Default("10s").DurationVar(&cfg.MQTT.GracePeriod)
42-
cmd.Flag("mqtt.read-timeout", "Maximum duration for reading the entire request.").Default("5s").DurationVar(&cfg.MQTT.ReadTimeout)
43-
cmd.Flag("mqtt.write-timeout", "Maximum duration before timing out writes of the response.").Default("5s").DurationVar(&cfg.MQTT.WriteTimeout)
44-
cmd.Flag("mqtt.idle-timeout", "Maximum duration before timing out writes of the response.").Default("0s").DurationVar(&cfg.MQTT.IdleTimeout)
45-
46-
cmd.Flag("mqtt.reader-buffer-size", "Read buffer size pro tcp connection.").Default("1024").IntVar(&cfg.MQTT.ReaderBufferSize)
47-
cmd.Flag("mqtt.writer-buffer-size", "Write buffer size pro tcp connection.").Default("1024").IntVar(&cfg.MQTT.WriterBufferSize)
48-
49-
cmd.Flag("mqtt.server-tls.enable", "Enable server side TLS").Default("false").BoolVar(&cfg.MQTT.TLSSrv.Enable)
50-
cmd.Flag("mqtt.server-tls.cert-source", "TLS certificate source").Default(config.CertSourceFile).EnumVar(&cfg.MQTT.TLSSrv.CertSource, config.CertSourceFile)
51-
cmd.Flag("mqtt.server-tls.refresh", "Option to specify the refresh interval for the TLS certificates.").Default("0s").DurationVar(&cfg.MQTT.TLSSrv.Refresh)
52-
53-
cmd.Flag("mqtt.server-tls.file.cert", "TLS Certificate for MQTT server, leave blank to disable TLS").Default("").StringVar(&cfg.MQTT.TLSSrv.File.Cert)
54-
cmd.Flag("mqtt.server-tls.file.key", "TLS Key for the MQTT server, leave blank to disable TLS").Default("").StringVar(&cfg.MQTT.TLSSrv.File.Key)
55-
cmd.Flag("mqtt.server-tls.file.client-ca", "TLS CA to verify clients against. If no client CA is specified, there is no client verification on server side.").Default("").StringVar(&cfg.MQTT.TLSSrv.File.ClientCA)
56-
cmd.Flag("mqtt.server-tls.file.client-clr", "TLS X509 CLR signed be the client CA. If no revocation list is specified, only client CA is verified").Default("").StringVar(&cfg.MQTT.TLSSrv.File.ClientCLR)
57-
58-
cmd.Flag("mqtt.handler.ignore-unsupported", "List of unsupported messages which are ignored. One of: [SUBSCRIBE, UNSUBSCRIBE]").PlaceHolder("MSG").EnumsVar(&cfg.MQTT.Handler.IgnoreUnsupported, "SUBSCRIBE", "UNSUBSCRIBE")
59-
cmd.Flag("mqtt.handler.allow-unauthenticated", "List of messages for which connection is not disconnected if unauthenticated request is received. One of: [PUBLISH, PUBREL, PINGREQ]").PlaceHolder("MSG").EnumsVar(&cfg.MQTT.Handler.AllowUnauthenticated, "PUBLISH", "PUBREL", "PINGREQ")
60-
cmd.Flag("mqtt.handler.publish.timeout", "Maximum duration of sending publish request to broker.").Default("0s").DurationVar(&cfg.MQTT.Handler.Publish.Timeout)
61-
cmd.Flag("mqtt.handler.publish.async.at-most-once", "Async publish for AT_MOST_ONCE QoS.").Default("false").BoolVar(&cfg.MQTT.Handler.Publish.Async.AtMostOnce)
62-
cmd.Flag("mqtt.handler.publish.async.at-least-once", "Async publish for AT_LEAST_ONCE QoS.").Default("false").BoolVar(&cfg.MQTT.Handler.Publish.Async.AtLeastOnce)
63-
cmd.Flag("mqtt.handler.publish.async.exactly-once", "Async publish for EXACTLY_ONCE QoS.").Default("false").BoolVar(&cfg.MQTT.Handler.Publish.Async.ExactlyOnce)
64-
65-
cmd.Flag("mqtt.handler.auth.name", "Authenticator name. One of: [noop, plain]").Default(config.AuthNoop).EnumVar(&cfg.MQTT.Handler.Authenticator.Name, config.AuthNoop, config.AuthPlain)
66-
cmd.Flag("mqtt.handler.auth.plain.credentials", "List of username and password fields.").Default("USERNAME=PASSWORD").StringMapVar(&cfg.MQTT.Handler.Authenticator.Plain.Credentials)
67-
cmd.Flag("mqtt.handler.auth.plain.credentials-file", "Location of a headerless CSV file containing `usernanme,password` records").Default("").StringVar(&cfg.MQTT.Handler.Authenticator.Plain.CredentialsFile)
68-
69-
cmd.Flag("mqtt.publisher.name", "Publisher name. One of: [noop, kafka]").Default(config.PublisherNoop).EnumVar(&cfg.MQTT.Publisher.Name, config.PublisherNoop, config.PublisherKafka)
70-
cmd.Flag("mqtt.publisher.kafka.config", "Comma separated list of properties").PlaceHolder("PROP=VAL").SetValue(&cfg.MQTT.Publisher.Kafka.ConfArgs)
71-
cmd.Flag("mqtt.publisher.kafka.bootstrap-servers", "Kafka bootstrap servers").Default("localhost:9092").StringVar(&cfg.MQTT.Publisher.Kafka.BootstrapServers)
72-
cmd.Flag("mqtt.publisher.kafka.grace-period", "Time to wait after an interrupt received for Kafka publisher.").Default("10s").DurationVar(&cfg.MQTT.Publisher.Kafka.GracePeriod)
73-
cmd.Flag("mqtt.publisher.kafka.default-topic", "Default Kafka topic for MQTT publish messages").Default("").StringVar(&cfg.MQTT.Publisher.Kafka.DefaultTopic)
74-
cmd.Flag("mqtt.publisher.kafka.topic-mappings", "Comma separated list of Kafka topic to MQTT topic mappings").PlaceHolder("TOPIC=REGEX").SetValue(&cfg.MQTT.Publisher.Kafka.TopicMappings)
75-
cmd.Flag("mqtt.publisher.kafka.workers", "Number of kafka publisher workers").Default("1").IntVar(&cfg.MQTT.Publisher.Kafka.Workers)
76-
77-
m[command] = func(group *run.Group, logger log.Logger, registry *prometheus.Registry) error {
78-
return runServer(group, logger, registry, cfg)
79-
}
80-
}
81-
8228
func runServer(
8329
group *run.Group,
8430
logger log.Logger,

go.mod

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ module github.com/grepplabs/mqtt-proxy
33
go 1.19
44

55
require (
6+
github.com/alecthomas/kong v0.7.0
7+
github.com/alecthomas/kong-yaml v0.1.1
68
github.com/confluentinc/confluent-kafka-go v1.9.2
9+
github.com/go-playground/validator/v10 v10.11.1
710
github.com/oklog/run v1.1.0
811
github.com/prometheus/client_golang v1.13.1
912
github.com/prometheus/common v0.37.0
@@ -12,23 +15,26 @@ require (
1215
go.uber.org/atomic v1.10.0
1316
go.uber.org/automaxprocs v1.5.1
1417
go.uber.org/zap v1.23.0
15-
gopkg.in/alecthomas/kingpin.v2 v2.2.6
1618
)
1719

1820
require (
19-
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
20-
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
2121
github.com/beorn7/perks v1.0.1 // indirect
2222
github.com/cespare/xxhash/v2 v2.1.2 // indirect
2323
github.com/davecgh/go-spew v1.1.1 // indirect
24+
github.com/go-playground/locales v0.14.0 // indirect
25+
github.com/go-playground/universal-translator v0.18.0 // indirect
2426
github.com/golang/protobuf v1.5.2 // indirect
25-
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
27+
github.com/leodido/go-urn v1.2.1 // indirect
28+
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
2629
github.com/pmezard/go-difflib v1.0.0 // indirect
27-
github.com/prometheus/client_model v0.2.0 // indirect
30+
github.com/prometheus/client_model v0.3.0 // indirect
2831
github.com/prometheus/procfs v0.8.0 // indirect
2932
go.uber.org/multierr v1.8.0 // indirect
30-
golang.org/x/sys v0.0.0-20220808155132-1c4a2a72c664 // indirect
33+
golang.org/x/crypto v0.1.0 // indirect
34+
golang.org/x/sys v0.1.0 // indirect
35+
golang.org/x/text v0.4.0 // indirect
3136
google.golang.org/protobuf v1.28.1 // indirect
37+
gopkg.in/yaml.v2 v2.4.0 // indirect
3238
gopkg.in/yaml.v3 v3.0.1 // indirect
3339
)
3440

0 commit comments

Comments
 (0)