Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stateless broker #1935

Merged
merged 27 commits into from
Mar 15, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
39ae8e6
Add topic segmentation.
benbjohnson Feb 25, 2015
16dbe8b
Add Broker.Truncate().
benbjohnson Mar 1, 2015
85be4e1
Merge branch 'master' of https://github.com/influxdb/influxdb into br…
benbjohnson Mar 1, 2015
1bbf154
Removing replicas and subscriptions from broker.
benbjohnson Mar 2, 2015
b937f06
Implementing stateless broker.
benbjohnson Mar 6, 2015
ef8658e
Continuing stateless broker refactor.
benbjohnson Mar 8, 2015
9b5aeb1
Refactor messaging client/conn.
benbjohnson Mar 8, 2015
713ca4b
Merge branch 'master' into stateless-broker
benbjohnson Mar 9, 2015
5f5c6ca
Integrate stateless messaging into influxdb package.
benbjohnson Mar 9, 2015
4160d0b
Add continuously streaming topic readers.
benbjohnson Mar 10, 2015
27e9132
Integrate stateless broker into remaining packages.
benbjohnson Mar 10, 2015
66115f9
Merge branch 'master' of https://github.com/influxdb/influxdb into st…
benbjohnson Mar 10, 2015
5f6bcf5
Fix broker integration bugs.
benbjohnson Mar 11, 2015
7ab19b9
Merge branch 'master' of https://github.com/influxdb/influxdb into st…
benbjohnson Mar 12, 2015
7880bc2
Add zero length data checks.
benbjohnson Mar 12, 2015
c7d4920
Update urlgen to end at current time.
benbjohnson Mar 12, 2015
12e8939
Fix messaging client redirection.
benbjohnson Mar 12, 2015
4b9a93d
Merge branch 'master' of https://github.com/influxdb/influxdb into st…
benbjohnson Mar 12, 2015
fc189cd
Remove /test from .gitignore
benbjohnson Mar 12, 2015
8e813ec
Update CHANGELOG.md for v0.9.0-rc11
toddboom Mar 13, 2015
53dbec8
Add config notifications and increased test coverage.
benbjohnson Mar 14, 2015
8cb7be4
Merge branch 'stateless-broker' of https://github.com/influxdb/influx…
benbjohnson Mar 14, 2015
96748cb
Update file permissions.
benbjohnson Mar 14, 2015
b045ad5
Wrap open logic in anonymous functions.
benbjohnson Mar 14, 2015
41d357a
Fixes based on code review comments.
benbjohnson Mar 14, 2015
06d8392
Integration test delay.
benbjohnson Mar 14, 2015
7dc465b
Fix shard close race condition.
benbjohnson Mar 14, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix broker integration bugs.
  • Loading branch information
benbjohnson committed Mar 11, 2015
commit 5f6bcf523f952b64c8c7f279a483fe9eb250d0d3
68 changes: 20 additions & 48 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,19 @@ func joinLog(l *raft.Log, joinURLs []url.URL) {

// creates and initializes a server.
func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, configExists bool, joinURLs []url.URL, w io.Writer) *influxdb.Server {
// Use broker URL is there is no config and there are no join URLs passed.
clientJoinURLs := joinURLs
if !configExists || len(joinURLs) == 0 {
clientJoinURLs = []url.URL{b.URL()}
}

// Create messaging client to the brokers.
c := influxdb.NewMessagingClient()
c.SetLogOutput(w)
if err := c.Open(filepath.Join(config.Data.Dir, messagingClientFile), clientJoinURLs); err != nil {
log.Fatalf("messaging client error: %s", err)
}

// Create and open the server.
s := influxdb.NewServer()
s.SetLogOutput(w)
Expand All @@ -298,56 +311,27 @@ func openServer(config *Config, b *influxdb.Broker, initServer, initBroker, conf
s.ComputeRunsPerInterval = config.ContinuousQuery.ComputeRunsPerInterval
s.ComputeNoMoreThan = time.Duration(config.ContinuousQuery.ComputeNoMoreThan)

if err := s.Open(config.Data.Dir); err != nil {
// Open server with data directory and broker client.
if err := s.Open(config.Data.Dir, c); err != nil {
log.Fatalf("failed to open data server: %v", err.Error())
}

// If the server is uninitialized then initialize or join it.
if initServer {
if len(joinURLs) == 0 {
initializeServer(config.DataURL(), s, b, w, initBroker)
if initBroker {
if err := s.Initialize(b.URL()); err != nil {
log.Fatalf("server initialization error: %s", err)
}
}
} else {
joinServer(s, config.DataURL(), joinURLs)
}
}

if !configExists {
// We are spining up a server that has no config,
// but already has an initialized data directory
joinURLs = []url.URL{b.URL()}
openServerClient(s, joinURLs, w)
} else {
if len(joinURLs) == 0 {
// If a config exists, but no joinUrls are specified, fall back to the broker URL
// TODO: Make sure we have a leader, and then spin up the server
joinURLs = []url.URL{b.URL()}
}
openServerClient(s, joinURLs, w)
}

return s
}

// initializes a new server that does not yet have an ID.
func initializeServer(u url.URL, s *influxdb.Server, b *influxdb.Broker, w io.Writer, initBroker bool) {
// Create messaging client.
c := influxdb.NewMessagingClient()
c.SetLogOutput(w)
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), []url.URL{b.URL()}); err != nil {
log.Fatalf("messaging client error: %s", err)
}
if err := s.SetClient(c); err != nil {
log.Fatalf("set client error: %s", err)
}

if initBroker {
// Initialize the server.
if err := s.Initialize(b.URL()); err != nil {
log.Fatalf("server initialization error: %s", err)
}
}
}

// joins a server to an existing cluster.
func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
// TODO: Use separate broker and data join urls.
Expand All @@ -364,18 +348,6 @@ func joinServer(s *influxdb.Server, u url.URL, joinURLs []url.URL) {
log.Fatalf("join: failed to connect data node to any specified server")
}

// opens the messaging client and attaches it to the server.
func openServerClient(s *influxdb.Server, joinURLs []url.URL, w io.Writer) {
c := influxdb.NewMessagingClient()
c.SetLogOutput(w)
if err := c.Open(filepath.Join(s.Path(), messagingClientFile), joinURLs); err != nil {
log.Fatalf("messaging client error: %s", err)
}
if err := s.SetClient(c); err != nil {
log.Fatalf("set client error: %s", err)
}
}

// parses a comma-delimited list of URLs.
func parseURLs(s string) (a []url.URL) {
if s == "" {
Expand Down
10 changes: 2 additions & 8 deletions httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1559,21 +1559,15 @@ func (s *Server) Restart() {
}

// Open and reset the client.
if err := s.Server.Open(path); err != nil {
if err := s.Server.Open(path, client); err != nil {
panic("open: " + err.Error())
}
if err := s.Server.SetClient(client); err != nil {
panic("client: " + err.Error())
}
}

// OpenUninitializedServer returns a new, uninitialized, open test server instance.
func OpenUninitializedServer(client influxdb.MessagingClient) *Server {
s := NewServer()
if err := s.Open(tempfile()); err != nil {
panic(err.Error())
}
if err := s.SetClient(client); err != nil {
if err := s.Open(tempfile(), client); err != nil {
panic(err.Error())
}
return s
Expand Down
53 changes: 48 additions & 5 deletions messaging/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,23 @@ func (b *Broker) closeTopics() {
// SetMaxIndex sets the highest index seen by the broker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"seen" is not a Raft term. Is there a specific Raft term that you actually mean here? "Committed", "applied"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. It should have been "applied".

// This is only used for internal log messages and topics may have a higher index.
func (b *Broker) SetMaxIndex(index uint64) error {
return b.meta.Update(func(tx *bolt.Tx) error {
b.mu.Lock()
defer b.mu.Unlock()
return b.setMaxIndex(index)
}

func (b *Broker) setMaxIndex(index uint64) error {
// Update index in meta database.
if err := b.meta.Update(func(tx *bolt.Tx) error {
return tx.Bucket([]byte("meta")).Put([]byte("index"), u64tob(index))
})
}); err != nil {
return err
}

// Set in-memory index.
b.index = index

return nil
}

// Snapshot streams the current state of the broker and returns the index.
Expand Down Expand Up @@ -335,8 +349,8 @@ func (b *Broker) Restore(r io.Reader) error {
defer b.mu.Unlock()

// Remove and recreate broker path.
if err := os.RemoveAll(b.path); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("remove all: %s", err)
if err := b.reset(); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("reset: %s", err)
} else if err = os.MkdirAll(b.path, 0700); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0755 I think @rothrock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to 0777 as mentioned above.

return fmt.Errorf("mkdir: %s", err)
}
Expand Down Expand Up @@ -400,14 +414,43 @@ func (b *Broker) Restore(r io.Reader) error {
}

// Set the highest seen index.
if err := b.SetMaxIndex(sh.Index); err != nil {
if err := b.setMaxIndex(sh.Index); err != nil {
return fmt.Errorf("set max index: %s", err)
}
b.index = sh.Index

return nil
}

// reset removes all files in the broker directory besides the raft directory.
func (b *Broker) reset() error {
// Open handle to directory.
f, err := os.Open(b.path)
if err != nil {
return err
}
defer func() { _ = f.Close() }()

// Read directory items.
fis, err := f.Readdir(0)
if err != nil {
return err
}

// Remove all files & directories besides raft.
for _, fi := range fis {
if fi.Name() == "raft" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We better never change the name of this directory. :-)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I started moving it out of the broker directory but I didn't want to expose raft through the configuration. I'm open to moving it but this worked for now.

continue
}

if err := os.RemoveAll(fi.Name()); err != nil {
return fmt.Errorf("remove: %s", fi.Name())
}
}

return nil
}

// Publish writes a message.
// Returns the index of the message. Otherwise returns an error.
func (b *Broker) Publish(m *Message) (uint64, error) {
Expand Down
14 changes: 6 additions & 8 deletions messaging/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,6 @@ func TestTopicReader_streaming(t *testing.T) {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()

time.Sleep(2 * time.Millisecond)
MustWriteFile(filepath.Join(path, "6"),
MustMarshalMessages([]*messaging.Message{
Expand All @@ -549,10 +547,6 @@ func TestTopicReader_streaming(t *testing.T) {
{Index: 14},
}),
)

// Close reader.
time.Sleep(5 * time.Millisecond)
r.Close()
}()

// Slurp all message ids from the reader.
Expand All @@ -561,20 +555,24 @@ func TestTopicReader_streaming(t *testing.T) {
for {
m := &messaging.Message{}
if err := dec.Decode(m); err == io.EOF {
break
t.Fatalf("unexpected EOF")
} else if err != nil {
t.Fatalf("decode error: %s", err)
} else {
indices = append(indices, m.Index)
}

if m.Index == 14 {
break
}
}

// Verify we received the correct indices.
if !reflect.DeepEqual(indices, []uint64{6, 7, 10, 12, 13, 14}) {
t.Fatalf("unexpected indices: %#v", indices)
}

wg.Wait()
r.Close()
}

// Ensure multiple topic readers can read from the same topic directory.
Expand Down
70 changes: 61 additions & 9 deletions messaging/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import (

// DefaultReconnectTimeout is the default time to wait between when a broker
// stream disconnects and another connection is retried.
const DefaultReconnectTimeout = 100 * time.Millisecond
const DefaultReconnectTimeout = 1000 * time.Millisecond

// DefaultPingInterval is the default time to wait between checks to the broker.
const DefaultPingInterval = 1000 * time.Millisecond

// Client represents a client for the broker's HTTP API.
type Client struct {
Expand All @@ -29,11 +32,16 @@ type Client struct {
urls []url.URL // list of available broker URLs

opened bool
done chan chan struct{} // disconnection notification

wg sync.WaitGroup
closing chan struct{}

// The amount of time to wait before reconnecting to a broker stream.
ReconnectTimeout time.Duration

// The amount of time between pings to verify the broker is alive.
PingInterval time.Duration

// The logging interface used by the client for out-of-band errors.
Logger *log.Logger
}
Expand All @@ -42,6 +50,7 @@ type Client struct {
func NewClient() *Client {
c := &Client{
ReconnectTimeout: DefaultReconnectTimeout,
PingInterval: DefaultPingInterval,
}
c.SetLogOutput(os.Stderr)
return c
Expand Down Expand Up @@ -130,6 +139,11 @@ func (c *Client) Open(path string, urls []url.URL) error {
// Set open flag.
c.opened = true

// Start background ping.
c.closing = make(chan struct{}, 0)
c.wg.Add(1)
go c.pinger(c.closing)

return nil
}

Expand All @@ -149,6 +163,17 @@ func (c *Client) Close() error {
}
c.conns = nil

// Close goroutines.
if c.closing != nil {
close(c.closing)
c.closing = nil
}

// Wait for goroutines to finish.
c.mu.Unlock()
c.wg.Wait()
c.mu.Lock()

// Unset open flag.
c.opened = false

Expand All @@ -168,6 +193,14 @@ func (c *Client) Publish(m *Message) (uint64, error) {
}
defer func() { _ = resp.Body.Close() }()

// Check response code.
if resp.StatusCode != http.StatusOK {
if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" {
return 0, errors.New(errstr)
}
return 0, fmt.Errorf("cannot publish(%d)", resp.StatusCode)
}

// Parse broker index.
index, err := strconv.ParseUint(resp.Header.Get("X-Broker-Index"), 10, 64)
if err != nil {
Expand All @@ -177,6 +210,18 @@ func (c *Client) Publish(m *Message) (uint64, error) {
return index, nil
}

// Ping sends a request to the current broker to check if it is alive.
// If the broker is down then a new URL is tried.
func (c *Client) Ping() error {
// Post message to broker.
resp, err := c.do("POST", "/messaging/ping", nil, "application/octet-stream", nil)
if err != nil {
return err
}
resp.Body.Close()
return nil
}

// do sends an HTTP request to the given path with the current leader URL.
// This will automatically retry the request if it is redirected.
func (c *Client) do(method, path string, values url.Values, contentType string, body io.Reader) (*http.Response, error) {
Expand Down Expand Up @@ -210,12 +255,6 @@ func (c *Client) do(method, path string, values url.Values, contentType string,
}
c.SetURL(*redirectURL)
continue
} else if resp.StatusCode != http.StatusOK {
resp.Body.Close()
if errstr := resp.Header.Get("X-Broker-Error"); errstr != "" {
return nil, errors.New(errstr)
}
return nil, fmt.Errorf("cannot publish(%d)", resp.StatusCode)
}

return resp, nil
Expand All @@ -238,6 +277,20 @@ func (c *Client) Conn(topicID uint64) *Conn {
return conn
}

// pinger periodically pings the broker to check that it is alive.
func (c *Client) pinger(closing chan struct{}) {
defer c.wg.Done()

for {
select {
case <-closing:
return
case <-time.After(c.PingInterval):
c.Ping()
}
}
}

// ClientConfig represents the configuration that must be persisted across restarts.
type ClientConfig struct {
Brokers []url.URL `json:"brokers"`
Expand Down Expand Up @@ -490,7 +543,6 @@ func (c *Conn) stream(req *http.Request, closing <-chan struct{}) error {
// Decode message from the stream.
m := &Message{}
if err := dec.Decode(m); err == io.EOF {
warn("EOF!!!")
return nil
} else if err != nil {
return fmt.Errorf("decode: %s", err)
Expand Down
Loading