From b39ba889e46e03e1563ec985b71de01715d8a3a6 Mon Sep 17 00:00:00 2001 From: Jade McGough Date: Wed, 29 Aug 2018 16:15:39 -0700 Subject: [PATCH] feat(http): add write path (#682) feat(http): Add write path with embedded NATS streaming server --- Gopkg.lock | 117 ++++++++++++++++++++++++++++++- cmd/idpd/main.go | 82 +++++++++++++++++++++- http/platform_handler.go | 8 ++- http/write_handler.go | 146 +++++++++++++++++++++++++++++++++++++++ kit/errors/errors.go | 9 +++ nats/handler.go | 17 +++++ nats/message.go | 22 ++++++ nats/publisher.go | 54 +++++++++++++++ nats/server.go | 47 +++++++++++++ nats/subscriber.go | 52 ++++++++++++++ nats/subscription.go | 31 +++++++++ 11 files changed, 582 insertions(+), 3 deletions(-) create mode 100644 http/write_handler.go create mode 100644 nats/handler.go create mode 100644 nats/message.go create mode 100644 nats/publisher.go create mode 100644 nats/server.go create mode 100644 nats/subscriber.go create mode 100644 nats/subscription.go diff --git a/Gopkg.lock b/Gopkg.lock index dd29207f290..b625d532ec0 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -71,6 +71,14 @@ pruneopts = "UT" revision = "941dea75d3ebfbdd905a5d8b7b232965c5e5c684" +[[projects]] + branch = "master" + digest = "1:ef5b0622d834c139454148b8fd0c92bb314828900532b267ae62da9fec109866" + name = "github.com/armon/go-metrics" + packages = ["."] + pruneopts = "UT" + revision = "3c58d8115a78a6879e5df75ae900846768d36895" + [[projects]] digest = "1:b42949704aad6dc86a9fa457ce3c80267d8875ba727bc3eb0eaa966e64e960b2" name = "github.com/aws/aws-sdk-go" @@ -127,6 +135,14 @@ pruneopts = "UT" revision = "8bd4349a67f2533b078dbc524689d15dba0f4659" +[[projects]] + digest = "1:0f98f59e9a2f4070d66f0c9c39561f68fcd1dc837b22a852d28d0003aebd1b1e" + name = "github.com/boltdb/bolt" + packages = ["."] + pruneopts = "UT" + revision = "2f1ce7a837dcb8da3ec595b1dac9d0632f0f99e8" + version = "v1.3.1" + [[projects]] digest = "1:e8859ab9dac9bc8cf02728fefcf7da80221a4fc6573374eb3a02d2d3eb66604e" name = "github.com/bouk/httprouter" @@ -485,6 +501,30 @@ pruneopts = "UT" revision = "8e809c8a86450a29b90dcc9efbf062d0fe6d9746" +[[projects]] + branch = "master" + digest = "1:2394f5a25132b3868eff44599cc28d44bdd0330806e34c495d754dd052df612b" + name = "github.com/hashicorp/go-immutable-radix" + packages = ["."] + pruneopts = "UT" + revision = "7f3cd4390caab3250a57f30efdb2a65dd7649ecf" + +[[projects]] + branch = "master" + digest = "1:b8ba23b0b493e601d5ebaf21079bd3433eb304a5d73c0fb40b8aea526e94f46b" + name = "github.com/hashicorp/go-msgpack" + packages = ["codec"] + pruneopts = "UT" + revision = "fa3f63826f7c23912c15263591e65d54d080b458" + +[[projects]] + branch = "master" + digest = "1:e0e3eb7886110c57a7c103fbc5fc419b1651d3014655b47b71f26809c2daeb1c" + name = "github.com/hashicorp/golang-lru" + packages = ["simplelru"] + pruneopts = "UT" + revision = "0fb14efe8c47ae851c0034ed7a448854d3d34cf3" + [[projects]] branch = "master" digest = "1:a361611b8c8c75a1091f00027767f7779b29cb37c456a71b8f2604c88057ab40" @@ -504,6 +544,14 @@ pruneopts = "UT" revision = "ef8a98b0bbce4a65b5aa4c368430a80ddc533168" +[[projects]] + digest = "1:56f26733346010e935b1ddb0e1ffa00bb054133b3ddb3f481261d27a7ae914df" + name = "github.com/hashicorp/raft" + packages = ["."] + pruneopts = "UT" + revision = "6d14f0c70869faabd9e60ba7ed88a6cbbd6a661f" + version = "v1.0.0" + [[projects]] digest = "1:3e260afa138eab6492b531a3b3d10ab4cb70512d423faa78b8949dec76e66a21" name = "github.com/imdario/mergo" @@ -744,6 +792,65 @@ pruneopts = "UT" revision = "90eadee771aeab36e8bf796039b8c261bebebe4f" +[[projects]] + digest = "1:a840bc0035e011ad1062d5f6de8671ac9fe553f12f3ae2040a60e44942432ead" + name = "github.com/nats-io/gnatsd" + packages = [ + "conf", + "logger", + "server", + "server/pse", + "util", + ] + pruneopts = "UT" + revision = "6608e9ac3be979dcb0614b772cc86a87b71acaa3" + version = "v1.2.0" + +[[projects]] + digest = "1:6ec5ca70ff99467ff3b134ca05ae1d247e42a0377a7c27a756ba932f4dfc3a88" + name = "github.com/nats-io/go-nats" + packages = [ + ".", + "encoders/builtin", + "util", + ] + pruneopts = "UT" + revision = "062418ea1c2181f52dc0f954f6204370519a868b" + version = "v1.5.0" + +[[projects]] + digest = "1:cf80370e775e26aa54b8d87304244ad56e19886b422f4ede3bb4aec7ff3f83cc" + name = "github.com/nats-io/go-nats-streaming" + packages = [ + ".", + "pb", + ] + pruneopts = "UT" + revision = "e15a53f85e4932540600a16b56f6c4f65f58176f" + version = "v0.4.0" + +[[projects]] + digest = "1:3ff1cb61cfdc8a7cf5fcda5fbccd683fbf3925b1a43279f50021cbca035935e4" + name = "github.com/nats-io/nats-streaming-server" + packages = [ + "logger", + "server", + "spb", + "stores", + "util", + ] + pruneopts = "UT" + revision = "63e2c334b66dba3edade0047625a25c0d8f18f80" + version = "v0.10.2" + +[[projects]] + digest = "1:c3cd663f2f30b92536b9f290ac85c6310dae36a14cb8961553ae9ccf0d85ae41" + name = "github.com/nats-io/nuid" + packages = ["."] + pruneopts = "UT" + revision = "289cccf02c178dc782430d534e3c1f5b72af807f" + version = "v1.0.0" + [[projects]] branch = "master" digest = "1:f97482f69f8d8e853db82a02f036dfad1ba6250d29d870fdebca4683deaa117c" @@ -1062,11 +1169,16 @@ [[projects]] branch = "master" - digest = "1:105645ed366a17f9ed1007059f0d3506e73e2098f2a1e793ffa92917d6461060" + digest = "1:58e8313f68ec7a8214ada3eee49eb29826775a7247cff938c7cf2a1ca421e1b0" name = "golang.org/x/sys" packages = [ "unix", "windows", + "windows/registry", + "windows/svc", + "windows/svc/debug", + "windows/svc/eventlog", + "windows/svc/mgr", ] pruneopts = "UT" revision = "7c87d13f8e835d2fb3a70a2912c811ed0c1d241b" @@ -1234,6 +1346,9 @@ "github.com/julienschmidt/httprouter", "github.com/kevinburke/go-bindata", "github.com/mna/pigeon", + "github.com/nats-io/go-nats-streaming", + "github.com/nats-io/nats-streaming-server/server", + "github.com/nats-io/nats-streaming-server/stores", "github.com/opentracing/opentracing-go", "github.com/opentracing/opentracing-go/ext", "github.com/opentracing/opentracing-go/log", diff --git a/cmd/idpd/main.go b/cmd/idpd/main.go index 89b8f126cb7..5a93234b0a3 100644 --- a/cmd/idpd/main.go +++ b/cmd/idpd/main.go @@ -3,10 +3,13 @@ package main import ( "context" "fmt" + "io" nethttp "net/http" _ "net/http/pprof" "os" "os/signal" + "os/user" + "path/filepath" "runtime" "syscall" "time" @@ -19,6 +22,7 @@ import ( "github.com/influxdata/platform/chronograf/server" "github.com/influxdata/platform/http" "github.com/influxdata/platform/kit/prom" + "github.com/influxdata/platform/nats" "github.com/influxdata/platform/query" _ "github.com/influxdata/platform/query/builtin" "github.com/influxdata/platform/query/control" @@ -38,12 +42,40 @@ func main() { Execute() } +const ( + // NatsSubject is the subject that subscribers and publishers use for writing and consuming line protocol + NatsSubject = "ingress" + // IngressGroup is the Nats Streaming Subscriber group, allowing multiple subscribers to distribute work + IngressGroup = "ingress" +) + var ( httpBindAddress string authorizationPath string boltPath string + walPath string ) +func influxDir() (string, error) { + var dir string + // By default, store meta and data files in current users home directory + u, err := user.Current() + if err == nil { + dir = u.HomeDir + } else if os.Getenv("HOME") != "" { + dir = os.Getenv("HOME") + } else { + wd, err := os.Getwd() + if err != nil { + return "", err + } + dir = wd + } + dir = filepath.Join(dir, ".influxdbv2") + + return dir, nil +} + func init() { viper.SetEnvPrefix("INFLUX") @@ -64,6 +96,18 @@ func init() { if h := viper.GetString("BOLT_PATH"); h != "" { boltPath = h } + + dir, err := influxDir() + if err != nil { + fmt.Fprintf(os.Stderr, "Failed to determine influx directory: %v", err) + os.Exit(1) + } + + platformCmd.Flags().StringVar(&walPath, "wal-path", filepath.Join(dir, "wal"), "path to persistent WAL files") + viper.BindEnv("WAL_PATH") + if h := viper.GetString("WAL_PATH"); h != "" { + walPath = h + } } var platformCmd = &cobra.Command{ @@ -166,7 +210,32 @@ func platformF(cmd *cobra.Command, args []string) { errc := make(chan error) sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGTERM) + signal.Notify(sigs, syscall.SIGTERM, os.Interrupt) + + // NATS streaming server + natsServer := nats.NewServer(nats.Config{FilestoreDir: walPath}) + if err := natsServer.Open(); err != nil { + logger.Error("failed to start nats streaming server", zap.Error(err)) + os.Exit(1) + } + + publisher := nats.NewAsyncPublisher("nats-publisher") + if err := publisher.Open(); err != nil { + logger.Error("failed to connect to streaming server", zap.Error(err)) + os.Exit(1) + } + + // TODO(jm): this is an example of using a subscriber to consume from the channel. It should be removed. + subscriber := nats.NewQueueSubscriber("nats-subscriber") + if err := subscriber.Open(); err != nil { + logger.Error("failed to connect to streaming server", zap.Error(err)) + os.Exit(1) + } + + if err := subscriber.Subscribe(NatsSubject, IngressGroup, &nats.LogHandler{Logger: logger}); err != nil { + logger.Error("failed to create nats subscriber", zap.Error(err)) + os.Exit(1) + } httpServer := &nethttp.Server{ Addr: httpBindAddress, @@ -204,6 +273,16 @@ func platformF(cmd *cobra.Command, args []string) { taskHandler := http.NewTaskHandler(logger) taskHandler.TaskService = taskSvc + publishFn := func(r io.Reader) error { + return publisher.Publish(NatsSubject, r) + } + + writeHandler := http.NewWriteHandler(publishFn) + writeHandler.AuthorizationService = authSvc + writeHandler.OrganizationService = orgSvc + writeHandler.BucketService = bucketSvc + writeHandler.Logger = logger.With(zap.String("handler", "write")) + // TODO(desa): what to do about idpe. chronografHandler := http.NewChronografHandler(chronografSvc) @@ -219,6 +298,7 @@ func platformF(cmd *cobra.Command, args []string) { SourceHandler: sourceHandler, TaskHandler: taskHandler, ViewHandler: cellHandler, + WriteHandler: writeHandler, } reg.MustRegister(platformHandler.PrometheusCollectors()...) diff --git a/http/platform_handler.go b/http/platform_handler.go index 2df3f775f88..c0a17352602 100644 --- a/http/platform_handler.go +++ b/http/platform_handler.go @@ -22,6 +22,7 @@ type PlatformHandler struct { SourceHandler *SourceHandler TaskHandler *TaskHandler FluxLangHandler *FluxLangHandler + WriteHandler *WriteHandler } func setCORSResponseHeaders(w nethttp.ResponseWriter, r *nethttp.Request) { @@ -55,7 +56,6 @@ func (h *PlatformHandler) serveLinks(w nethttp.ResponseWriter, r *nethttp.Reques // ServeHTTP delegates a request to the appropriate subhandler. func (h *PlatformHandler) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request) { - setCORSResponseHeaders(w, r) if r.Method == "OPTIONS" { return @@ -126,6 +126,12 @@ func (h *PlatformHandler) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request if strings.HasPrefix(r.URL.Path, "/v1/tasks") { h.TaskHandler.ServeHTTP(w, r) + return + } + + if strings.HasSuffix(r.URL.Path, "/write") { + h.WriteHandler.ServeHTTP(w, r) + return } if strings.HasPrefix(r.URL.Path, "/v2/views") { diff --git a/http/write_handler.go b/http/write_handler.go new file mode 100644 index 00000000000..a90e4eda494 --- /dev/null +++ b/http/write_handler.go @@ -0,0 +1,146 @@ +package http + +import ( + "compress/gzip" + "context" + "fmt" + "io" + "net/http" + + "github.com/influxdata/platform" + pcontext "github.com/influxdata/platform/context" + "github.com/influxdata/platform/kit/errors" + "github.com/julienschmidt/httprouter" + "go.uber.org/zap" +) + +const NatsServerID = "nats" +const NatsClientID = "nats-client" + +type WriteHandler struct { + *httprouter.Router + + Logger *zap.Logger + + AuthorizationService platform.AuthorizationService + BucketService platform.BucketService + OrganizationService platform.OrganizationService + + Publish func(io.Reader) error +} + +func NewWriteHandler(publishFn func(io.Reader) error) *WriteHandler { + h := &WriteHandler{ + Router: httprouter.New(), + Logger: zap.NewNop(), + Publish: publishFn, + } + + h.HandlerFunc("POST", "/v2/write", h.handleWrite) + return h +} + +func (h *WriteHandler) handleWrite(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + defer r.Body.Close() + + in := r.Body + if r.Header.Get("Content-Encoding") == "gzip" { + var err error + in, err = gzip.NewReader(r.Body) + if err != nil { + EncodeError(ctx, errors.Wrap(err, "invalid gzip", errors.InvalidData), w) + return + } + defer in.Close() + } + + tok, err := pcontext.GetToken(ctx) + if err != nil { + EncodeError(ctx, err, w) + return + } + + auth, err := h.AuthorizationService.FindAuthorizationByToken(ctx, tok) + if err != nil { + EncodeError(ctx, errors.Wrap(err, "invalid token", errors.InvalidData), w) + return + } + + req := decodeWriteRequest(ctx, r) + if err != nil { + EncodeError(ctx, err, w) + return + } + + logger := h.Logger.With(zap.String("org", req.Org), zap.String("bucket", req.Bucket)) + + var org *platform.Organization + if id, err := platform.IDFromString(req.Org); err == nil { + // Decoded ID successfully. Make sure it's a real org. + if o, err := h.OrganizationService.FindOrganizationByID(ctx, *id); err == nil { + org = o + } + } + if org == nil { + o, err := h.OrganizationService.FindOrganization(ctx, platform.OrganizationFilter{Name: &req.Org}) + if err != nil { + logger.Info("Failed to find organization", zap.Error(err)) + EncodeError(ctx, fmt.Errorf("organization %q not found", req.Org), w) + return + } + + org = o + } + + var bucket *platform.Bucket + if id, err := platform.IDFromString(req.Bucket); err == nil { + // Decoded ID successfully. Make sure it's a real bucket. + if b, err := h.BucketService.FindBucket(ctx, platform.BucketFilter{ + OrganizationID: &org.ID, + ID: id, + }); err == nil { + bucket = b + } + } + + if bucket == nil { + b, err := h.BucketService.FindBucket(ctx, platform.BucketFilter{ + OrganizationID: &org.ID, + Name: &req.Bucket, + }) + if err != nil { + logger.Info("Failed to find bucket", zap.Stringer("org_id", org.ID), zap.Error(err)) + EncodeError(ctx, fmt.Errorf("bucket %q not found", req.Bucket), w) + return + } + + bucket = b + } + + if !platform.Allowed(platform.WriteBucketPermission(bucket.ID), auth.Permissions) { + EncodeError(ctx, errors.Forbiddenf("insufficient permissions for write"), w) + return + } + + if err := h.Publish(in); err != nil { + EncodeError(ctx, errors.BadRequestError(err.Error()), w) + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func decodeWriteRequest(ctx context.Context, r *http.Request) *postWriteRequest { + qp := r.URL.Query() + + return &postWriteRequest{ + Bucket: qp.Get("bucket"), + Org: qp.Get("org"), + } +} + +type postWriteRequest struct { + Org string + Bucket string +} diff --git a/kit/errors/errors.go b/kit/errors/errors.go index 340ba921da6..775239ae337 100644 --- a/kit/errors/errors.go +++ b/kit/errors/errors.go @@ -2,6 +2,7 @@ package errors import ( "fmt" + "net/http" ) // TODO: move to base directory @@ -95,3 +96,11 @@ func InvalidDataf(format string, i ...interface{}) error { func Forbiddenf(format string, i ...interface{}) error { return Errorf(Forbidden, format, i...) } + +func BadRequestError(msg string) error { + return Error{ + Reference: InvalidData, + Code: http.StatusBadRequest, + Err: msg, + } +} diff --git a/nats/handler.go b/nats/handler.go new file mode 100644 index 00000000000..f75a9e752e0 --- /dev/null +++ b/nats/handler.go @@ -0,0 +1,17 @@ +package nats + +import "go.uber.org/zap" + +type Handler interface { + // Process does something with a received subscription message, then acks it. + Process(s Subscription, m Message) +} + +type LogHandler struct { + Logger *zap.Logger +} + +func (lh *LogHandler) Process(s Subscription, m Message) { + lh.Logger.Info(string(m.Data())) + m.Ack() +} diff --git a/nats/message.go b/nats/message.go new file mode 100644 index 00000000000..132b62ca4a8 --- /dev/null +++ b/nats/message.go @@ -0,0 +1,22 @@ +package nats + +import ( + stan "github.com/nats-io/go-nats-streaming" +) + +type Message interface { + Data() []byte + Ack() error +} + +type message struct { + m *stan.Msg +} + +func (m *message) Data() []byte { + return m.m.Data +} + +func (m *message) Ack() error { + return m.m.Ack() +} diff --git a/nats/publisher.go b/nats/publisher.go new file mode 100644 index 00000000000..62ba8c79259 --- /dev/null +++ b/nats/publisher.go @@ -0,0 +1,54 @@ +package nats + +import ( + "io" + "io/ioutil" + + stan "github.com/nats-io/go-nats-streaming" + "go.uber.org/zap" +) + +type Publisher interface { + // Publish a new message to channel + Publish(subject string, r io.Reader) error +} + +type AsyncPublisher struct { + ClientID string + Connection stan.Conn + Logger *zap.Logger +} + +func NewAsyncPublisher(clientID string) *AsyncPublisher { + return &AsyncPublisher{ClientID: clientID} +} + +// Open creates and maintains a connection to NATS server +func (p *AsyncPublisher) Open() error { + sc, err := stan.Connect(ServerName, p.ClientID) + if err != nil { + return err + } + p.Connection = sc + return nil +} + +func (p *AsyncPublisher) Publish(subject string, r io.Reader) error { + if p.Connection == nil { + return ErrNoNatsConnection + } + + ah := func(guid string, err error) { + if err != nil { + p.Logger.Info(err.Error()) + } + } + + data, err := ioutil.ReadAll(r) + if err != nil { + return err + } + + _, err = p.Connection.PublishAsync(subject, data, ah) + return err +} diff --git a/nats/server.go b/nats/server.go new file mode 100644 index 00000000000..0e917b11ceb --- /dev/null +++ b/nats/server.go @@ -0,0 +1,47 @@ +package nats + +import ( + "errors" + + stand "github.com/nats-io/nats-streaming-server/server" + "github.com/nats-io/nats-streaming-server/stores" +) + +const ServerName = "platform" + +var ErrNoNatsConnection = errors.New("Nats connection has not been established. Call Open() first.") + +// Server wraps a connection to a NATS streaming server +type Server struct { + Server *stand.StanServer + config Config +} + +// Open starts a NATS streaming server +func (s *Server) Open() error { + opts := stand.GetDefaultOptions() + opts.StoreType = stores.TypeFile + opts.ID = ServerName + opts.FilestoreDir = s.config.FilestoreDir + server, err := stand.RunServerWithOpts(opts, nil) + if err != nil { + return err + } + + s.Server = server + + return nil +} + +// Config is the configuration for the NATS streaming server +type Config struct { + // The directory where nats persists message information + FilestoreDir string +} + +// NewServer creates and returns a new server struct from the provided config +func NewServer(c Config) *Server { + return &Server{ + config: c, + } +} diff --git a/nats/subscriber.go b/nats/subscriber.go new file mode 100644 index 00000000000..bd174ed54c9 --- /dev/null +++ b/nats/subscriber.go @@ -0,0 +1,52 @@ +package nats + +import ( + stan "github.com/nats-io/go-nats-streaming" +) + +type Subscriber interface { + // Subscribe listens to a channel, handling messages with Handler + Subscribe(subject, group string, handler Handler) error +} + +type QueueSubscriber struct { + ClientID string + Connection stan.Conn +} + +func NewQueueSubscriber(clientID string) *QueueSubscriber { + return &QueueSubscriber{ClientID: clientID} +} + +// Open creates and maintains a connection to NATS server +func (s *QueueSubscriber) Open() error { + sc, err := stan.Connect(ServerName, s.ClientID) + if err != nil { + return err + } + s.Connection = sc + return nil +} + +type messageHandler struct { + handler Handler + sub subscription +} + +func (mh *messageHandler) handle(m *stan.Msg) { + mh.handler.Process(mh.sub, &message{m: m}) +} + +func (s *QueueSubscriber) Subscribe(subject, group string, handler Handler) error { + if s.Connection == nil { + return ErrNoNatsConnection + } + + mh := messageHandler{handler: handler} + sub, err := s.Connection.QueueSubscribe(subject, group, mh.handle, stan.DurableName(group), stan.SetManualAckMode(), stan.MaxInflight(25)) + if err != nil { + return err + } + mh.sub = subscription{sub: sub} + return nil +} diff --git a/nats/subscription.go b/nats/subscription.go new file mode 100644 index 00000000000..0b1d04e2ec7 --- /dev/null +++ b/nats/subscription.go @@ -0,0 +1,31 @@ +package nats + +import stan "github.com/nats-io/go-nats-streaming" + +type Subscription interface { + // Pending returns the number of queued messages and queued bytes for this subscription. + Pending() (int64, int64, error) + + // Delivered returns the number of delivered messages for this subscription. + Delivered() (int64, error) + + // Close removes this subscriber + Close() error +} + +type subscription struct { + sub stan.Subscription +} + +func (s subscription) Pending() (int64, int64, error) { + messages, bytes, err := s.sub.Pending() + return int64(messages), int64(bytes), err +} + +func (s subscription) Delivered() (int64, error) { + return s.sub.Delivered() +} + +func (s subscription) Close() error { + return s.sub.Close() +}