Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node redirection #2154

Merged
merged 3 commits into from
Apr 4, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Handle broker and data node endpoints regardless of role
This is a pre-requisite for #1934.  When running separate
broker and data nodes, you currently need to know what role
a host is performing.  This complicates cluster setup in
that you must configure separate broker URLs and data node
URLs.

This change allows a broker only node to redirect data nodes endpoints
to a valid data node and a data only node to redirect broker
endpoints to a valid broker.
  • Loading branch information
jwilder committed Apr 4, 2015
commit 6d4c7e9cd52948bd16cdd328766d5f9daa317093
108 changes: 91 additions & 17 deletions cmd/influxd/handler.go
Original file line number Diff line number Diff line change
@@ -1,42 +1,116 @@
package main

import (
"log"
"net/http"
"net/url"
"strings"

"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/httpd"
"github.com/influxdb/influxdb/messaging"
"github.com/influxdb/influxdb/raft"
)

// Handler represents an HTTP handler for InfluxDB node.
// Depending on its role, it will serve many different endpoints.
type Handler struct {
brokerHandler http.Handler
serverHandler http.Handler
Log *raft.Log
Broker *influxdb.Broker
Server *influxdb.Server
Config *Config
}

// NewHandler returns a new instance of Handler.
func NewHandler(bh, sh http.Handler) *Handler {
return &Handler{
brokerHandler: bh,
serverHandler: sh,
}
func NewHandler() *Handler {
return &Handler{}
}

// ServeHTTP responds to HTTP request to the handler.
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Route raft and messaging paths to the broker.
if strings.HasPrefix(r.URL.Path, "/raft") || strings.HasPrefix(r.URL.Path, "/messaging") {
if h.brokerHandler == nil {
http.NotFound(w, r)
return
if strings.HasPrefix(r.URL.Path, "/raft") {
h.serveRaft(w, r)
return
}
if strings.HasPrefix(r.URL.Path, "/messaging") {
h.serveMessaging(w, r)
return
}

h.serveData(w, r)
}

// serveMessaging responds to broker requests
func (h *Handler) serveMessaging(w http.ResponseWriter, r *http.Request) {
if h.Broker == nil && h.Server == nil {
log.Println("no broker or server configured to handle messaging endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return
}

// If we're running a broker, handle the broker endpoints
if h.Broker != nil {
mh := &messaging.Handler{
Broker: h.Broker.Broker,
RaftHandler: &raft.Handler{Log: h.Log},
}
mh.ServeHTTP(w, r)
return
}

// Redirect to a valid broker to handle the request
h.redirect(h.Server.BrokerURLs(), w, r)
}

h.brokerHandler.ServeHTTP(w, r)
// serveRaft responds to raft requests.
func (h *Handler) serveRaft(w http.ResponseWriter, r *http.Request) {
if h.Log == nil && h.Server == nil {
log.Println("no broker or server configured to handle messaging endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return
}

// Route all other paths to the server.
if h.serverHandler == nil {
http.NotFound(w, r)
if h.Log != nil {
rh := raft.Handler{Log: h.Log}
rh.ServeHTTP(w, r)
return
}
h.serverHandler.ServeHTTP(w, r)

// Redirect to a valid broker to handle the request
h.redirect(h.Server.BrokerURLs(), w, r)
}

// serveData responds to data requests
func (h *Handler) serveData(w http.ResponseWriter, r *http.Request) {
if h.Broker == nil && h.Server == nil {
log.Println("no broker or server configured to handle messaging endpoints")
http.Error(w, "server unavailable", http.StatusServiceUnavailable)
return
}

if h.Server != nil {
sh := httpd.NewHandler(h.Server, h.Config.Authentication.Enabled, version)
sh.WriteTrace = h.Config.Logging.WriteTracing
sh.ServeHTTP(w, r)
return
}

// Redirect to a valid data URL to handle the request
h.redirect(h.Broker.Topic(influxdb.BroadcastTopicID).DataURLs(), w, r)
}

// redirect redirects a request to URL in u. If u is an empty slice,
// a 503 is returned
func (h *Handler) redirect(u []url.URL, w http.ResponseWriter, r *http.Request) {
// No valid URLs, return an error
if len(u) == 0 {
http.Error(w, "service unavailable", http.StatusServiceUnavailable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an "internal error" right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is possible that a data node could ask a broker a data node question, and that specific broker did not know about any data nodes. In that case, the urls would be empty, and you could return this error. This should only happen with a mis-configured cluster. However, since it is outside of our control, we have to check for it.

return
}

// TODO: Log to internal stats to track how frequently this is happening. If
// this is happening frequently, the clients are using a suboptimal endpoint

// Redirect the client to a valid data node that can handle the request
http.Redirect(w, r, u[0].String()+r.RequestURI, http.StatusTemporaryRedirect)
}
60 changes: 21 additions & 39 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,34 +48,30 @@ func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Ser
b, l := openBroker(config.BrokerDir(), config.BrokerURL(), initBroker, joinURLs, config.Logging.RaftTracing)

// Start the broker handler.
var h *Handler
if b != nil {
h = &Handler{
brokerHandler: &messaging.Handler{
Broker: b.Broker,
RaftHandler: &raft.Handler{Log: l},
},
}
h := &Handler{
Config: config,
Broker: b,
Log: l,
}

// We want to make sure we are spun up before we exit this function, so we manually listen and serve
listener, err := net.Listen("tcp", config.BrokerAddr())
// We want to make sure we are spun up before we exit this function, so we manually listen and serve
listener, err := net.Listen("tcp", config.BrokerAddr())
if err != nil {
log.Fatalf("TCP server failed to listen on %s. %s ", config.BrokerAddr(), err)
}
go func() {
err := http.Serve(listener, h)
if err != nil {
log.Fatalf("Broker failed to listen on %s. %s ", config.BrokerAddr(), err)
log.Fatalf("TCP server failed to server on %s: %s", config.BrokerAddr(), err)
}
go func() {
err := http.Serve(listener, h)
if err != nil {
log.Fatalf("Broker failed to server on %s.: %s", config.BrokerAddr(), err)
}
}()
log.Printf("broker listening on %s", config.BrokerAddr())
}()
log.Printf("TCP server listening on %s", config.BrokerAddr())

// have it occasionally tell a data node in the cluster to run continuous queries
if config.ContinuousQuery.Disable {
log.Printf("Not running continuous queries. [continuous_queries].disable is set to true.")
} else {
b.RunContinuousQueryLoop()
}
// have it occasionally tell a data node in the cluster to run continuous queries
if config.ContinuousQuery.Disable {
log.Printf("Not running continuous queries. [continuous_queries].disable is set to true.")
} else {
b.RunContinuousQueryLoop()
}

// Open server, initialize or join as necessary.
Expand All @@ -100,21 +96,7 @@ func Run(config *Config, join, version string) (*messaging.Broker, *influxdb.Ser

// Start the server handler. Attach to broker if listening on the same port.
if s != nil {
sh := httpd.NewHandler(s, config.Authentication.Enabled, version)
sh.WriteTrace = config.Logging.WriteTracing

if h != nil && config.BrokerAddr() == config.DataAddr() {
h.serverHandler = sh
} else {
// We want to make sure we are spun up before we exit this function, so we manually listen and serve
listener, err := net.Listen("tcp", config.DataAddr())
if err != nil {
log.Fatal(err)
}
go func() { log.Fatal(http.Serve(listener, sh)) }()
}
log.Printf("data node #%d listening on %s", s.ID(), config.DataAddr())

h.Server = s
if config.Snapshot.Enabled {
// Start snapshot handler.
go func() {
Expand Down
11 changes: 11 additions & 0 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,17 @@ func (t *Topic) Index() uint64 {
return t.index
}

// DataURLs returns the data node URLs subscribed to this topic
func (t *Topic) DataURLs() []url.URL {
t.mu.Lock()
defer t.mu.Unlock()
var urls []url.URL
for u, _ := range t.indexByURL {
urls = append(urls, u)
}
return urls
}

// IndexForURL returns the highest index replicated for a given data URL.
func (t *Topic) IndexForURL(u url.URL) uint64 {
t.mu.Lock()
Expand Down
4 changes: 4 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func NewServer() *Server {
return &s
}

func (s *Server) BrokerURLs() []url.URL {
return s.client.URLs()
}

// SetAuthenticationEnabled turns on or off server authentication
func (s *Server) SetAuthenticationEnabled(enabled bool) {
s.authenticationEnabled = enabled
Expand Down