Skip to content

Commit

Permalink
feat(http): add write path (influxdata#682)
Browse files Browse the repository at this point in the history
feat(http): Add write path with embedded NATS streaming server
  • Loading branch information
imogenkinsman authored Aug 29, 2018
1 parent bc97ac4 commit b39ba88
Show file tree
Hide file tree
Showing 11 changed files with 582 additions and 3 deletions.
117 changes: 116 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

82 changes: 81 additions & 1 deletion cmd/idpd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ package main
import (
"context"
"fmt"
"io"
nethttp "net/http"
_ "net/http/pprof"
"os"
"os/signal"
"os/user"
"path/filepath"
"runtime"
"syscall"
"time"
Expand All @@ -19,6 +22,7 @@ import (
"github.com/influxdata/platform/chronograf/server"
"github.com/influxdata/platform/http"
"github.com/influxdata/platform/kit/prom"
"github.com/influxdata/platform/nats"
"github.com/influxdata/platform/query"
_ "github.com/influxdata/platform/query/builtin"
"github.com/influxdata/platform/query/control"
Expand All @@ -38,12 +42,40 @@ func main() {
Execute()
}

const (
// NatsSubject is the subject that subscribers and publishers use for writing and consuming line protocol
NatsSubject = "ingress"
// IngressGroup is the Nats Streaming Subscriber group, allowing multiple subscribers to distribute work
IngressGroup = "ingress"
)

var (
httpBindAddress string
authorizationPath string
boltPath string
walPath string
)

func influxDir() (string, error) {
var dir string
// By default, store meta and data files in current users home directory
u, err := user.Current()
if err == nil {
dir = u.HomeDir
} else if os.Getenv("HOME") != "" {
dir = os.Getenv("HOME")
} else {
wd, err := os.Getwd()
if err != nil {
return "", err
}
dir = wd
}
dir = filepath.Join(dir, ".influxdbv2")

return dir, nil
}

func init() {
viper.SetEnvPrefix("INFLUX")

Expand All @@ -64,6 +96,18 @@ func init() {
if h := viper.GetString("BOLT_PATH"); h != "" {
boltPath = h
}

dir, err := influxDir()
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to determine influx directory: %v", err)
os.Exit(1)
}

platformCmd.Flags().StringVar(&walPath, "wal-path", filepath.Join(dir, "wal"), "path to persistent WAL files")
viper.BindEnv("WAL_PATH")
if h := viper.GetString("WAL_PATH"); h != "" {
walPath = h
}
}

var platformCmd = &cobra.Command{
Expand Down Expand Up @@ -166,7 +210,32 @@ func platformF(cmd *cobra.Command, args []string) {
errc := make(chan error)

sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGTERM)
signal.Notify(sigs, syscall.SIGTERM, os.Interrupt)

// NATS streaming server
natsServer := nats.NewServer(nats.Config{FilestoreDir: walPath})
if err := natsServer.Open(); err != nil {
logger.Error("failed to start nats streaming server", zap.Error(err))
os.Exit(1)
}

publisher := nats.NewAsyncPublisher("nats-publisher")
if err := publisher.Open(); err != nil {
logger.Error("failed to connect to streaming server", zap.Error(err))
os.Exit(1)
}

// TODO(jm): this is an example of using a subscriber to consume from the channel. It should be removed.
subscriber := nats.NewQueueSubscriber("nats-subscriber")
if err := subscriber.Open(); err != nil {
logger.Error("failed to connect to streaming server", zap.Error(err))
os.Exit(1)
}

if err := subscriber.Subscribe(NatsSubject, IngressGroup, &nats.LogHandler{Logger: logger}); err != nil {
logger.Error("failed to create nats subscriber", zap.Error(err))
os.Exit(1)
}

httpServer := &nethttp.Server{
Addr: httpBindAddress,
Expand Down Expand Up @@ -204,6 +273,16 @@ func platformF(cmd *cobra.Command, args []string) {
taskHandler := http.NewTaskHandler(logger)
taskHandler.TaskService = taskSvc

publishFn := func(r io.Reader) error {
return publisher.Publish(NatsSubject, r)
}

writeHandler := http.NewWriteHandler(publishFn)
writeHandler.AuthorizationService = authSvc
writeHandler.OrganizationService = orgSvc
writeHandler.BucketService = bucketSvc
writeHandler.Logger = logger.With(zap.String("handler", "write"))

// TODO(desa): what to do about idpe.
chronografHandler := http.NewChronografHandler(chronografSvc)

Expand All @@ -219,6 +298,7 @@ func platformF(cmd *cobra.Command, args []string) {
SourceHandler: sourceHandler,
TaskHandler: taskHandler,
ViewHandler: cellHandler,
WriteHandler: writeHandler,
}
reg.MustRegister(platformHandler.PrometheusCollectors()...)

Expand Down
8 changes: 7 additions & 1 deletion http/platform_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type PlatformHandler struct {
SourceHandler *SourceHandler
TaskHandler *TaskHandler
FluxLangHandler *FluxLangHandler
WriteHandler *WriteHandler
}

func setCORSResponseHeaders(w nethttp.ResponseWriter, r *nethttp.Request) {
Expand Down Expand Up @@ -55,7 +56,6 @@ func (h *PlatformHandler) serveLinks(w nethttp.ResponseWriter, r *nethttp.Reques

// ServeHTTP delegates a request to the appropriate subhandler.
func (h *PlatformHandler) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request) {

setCORSResponseHeaders(w, r)
if r.Method == "OPTIONS" {
return
Expand Down Expand Up @@ -126,6 +126,12 @@ func (h *PlatformHandler) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request

if strings.HasPrefix(r.URL.Path, "/v1/tasks") {
h.TaskHandler.ServeHTTP(w, r)
return
}

if strings.HasSuffix(r.URL.Path, "/write") {
h.WriteHandler.ServeHTTP(w, r)
return
}

if strings.HasPrefix(r.URL.Path, "/v2/views") {
Expand Down
Loading

0 comments on commit b39ba88

Please sign in to comment.