Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor query engine for distributed query support #3299

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Refactor query engine for distributed query support
With this change, the query engine code gathers information about
shards and tagsets by working with individual shards, collating the
information, and returning that to the client. It does not assume that any
particular shard is local, and accesses all shards through abstracted
Mappers, of which there are two types -- a Mapper type for Raw queries
and a second type for Aggregate queries. There are corresponding
Executors for each type of Mapper, but both types of Executors share the
same interface.
  • Loading branch information
otoolep committed Jul 13, 2015
commit 2b12a83fb8c48796c7a59e45f0cbe56af1cdfed0
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
- [#3177](https://github.com/influxdb/influxdb/pull/3177): Client supports making HTTPS requests. Thanks @jipperinbham
- [#3299](https://github.com/influxdb/influxdb/pull/3299): Refactor query engine for distributed query support.

### Bugfixes

Expand Down
50 changes: 50 additions & 0 deletions cluster/internal/data.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions cluster/internal/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,12 @@ message WriteShardResponse {
optional string Message = 2;
}

message MapShardRequest {
required uint64 ShardID = 1;
required string Statement = 2;
}

message MapShardResponse {
required int32 Code = 1;
optional string Message = 2;
}
85 changes: 73 additions & 12 deletions cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,17 @@ import (
// MaxMessageSize defines how large a message can be before we reject it
const MaxMessageSize = 1024 * 1024 * 1024 // 1GB

// MuxHeader is the header byte used in the TCP mux.
const MuxHeader = 2
// MuxWriteHeader is the header byte used in the TCP mux for data writes.
const MuxWriteHeader = 2

// MuxMapperHeader is the header byte used in the TCP mux for shard mapping.
const MuxMapperHeader = 4

const (
writeShardRequestMessage byte = iota + 1
writeShardResponseMessage
mapShardRequestMessage
)

// Service processes data received over raw TCP connections.
type Service struct {
Expand All @@ -27,7 +36,8 @@ type Service struct {
wg sync.WaitGroup
closing chan struct{}

Listener net.Listener
WriteListener net.Listener
MapperListener net.Listener

MetaStore interface {
ShardOwner(shardID uint64) (string, string, *meta.ShardGroupInfo)
Expand All @@ -53,7 +63,8 @@ func NewService(c Config) *Service {
func (s *Service) Open() error {
// Begin serving conections.
s.wg.Add(1)
go s.serve()
go s.serve(s.WriteListener, s.handleWriteConn)
//go s.serve(s.MapperListener, s.handleMapperConn)

return nil
}
Expand All @@ -63,8 +74,8 @@ func (s *Service) SetLogger(l *log.Logger) {
s.Logger = l
}

// serve accepts connections from the listener and handles them.
func (s *Service) serve() {
// serve accepts writer connections from the listener and handles them.
func (s *Service) serve(listener net.Listener, handler func(conn net.Conn)) {
defer s.wg.Done()

for {
Expand All @@ -76,7 +87,7 @@ func (s *Service) serve() {
}

// Accept the next connection.
conn, err := s.Listener.Accept()
conn, err := listener.Accept()
if err != nil {
if strings.Contains(err.Error(), "connection closed") {
s.Logger.Printf("cluster service accept error: %s", err)
Expand All @@ -90,15 +101,18 @@ func (s *Service) serve() {
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.handleConn(conn)
handler(conn)
}()
}
}

// Close shuts down the listener and waits for all connections to finish.
func (s *Service) Close() error {
if s.Listener != nil {
s.Listener.Close()
if s.WriteListener != nil {
s.WriteListener.Close()
}
if s.MapperListener != nil {
s.MapperListener.Close()
}

// Shut down all handlers.
Expand All @@ -108,8 +122,8 @@ func (s *Service) Close() error {
return nil
}

// handleConn services an individual TCP connection.
func (s *Service) handleConn(conn net.Conn) {
// handleWriteConn services an individual write connection.
func (s *Service) handleWriteConn(conn net.Conn) {
// Ensure connection is closed when service is closed.
closing := make(chan struct{})
defer close(closing)
Expand Down Expand Up @@ -150,6 +164,46 @@ func (s *Service) handleConn(conn net.Conn) {
}
}

// handleWriteConn services an individual mapper connection.
func (s *Service) handleMapperConn(conn net.Conn) {
// Ensure connection is closed when service is closed.
closing := make(chan struct{})
defer close(closing)
go func() {
select {
case <-closing:
case <-s.closing:
}
conn.Close()
}()

s.Logger.Printf("accept remote shard mapper connection from %v\n", conn.RemoteAddr())
defer func() {
s.Logger.Printf("close remote shard mapper from %v\n", conn.RemoteAddr())
}()
for {
// Read type-length-value.
typ, buf, err := ReadTLV(conn)
if err != nil {
if strings.HasSuffix(err.Error(), "EOF") {
return
}
s.Logger.Printf("unable to read type-length-value %s", err)
return
}

// Delegate message processing by type.
switch typ {
case mapShardRequestMessage:
if err := s.processMapShardRequest(conn, buf); err != nil {
s.Logger.Printf("process map shard request error: %s", err)
}
default:
s.Logger.Printf("cluster service message type not found: %d", typ)
}
}
}

func (s *Service) processWriteShardRequest(buf []byte) error {
// Build request
var req WriteShardRequest
Expand Down Expand Up @@ -213,6 +267,13 @@ func (s *Service) writeShardResponse(w io.Writer, e error) {
}
}

func (s *Service) processMapShardRequest(w io.Writer, buf []byte) error {
// Consider adding a timeout to the read of w. This way query processing
// that hangs on the remote node, but doesn't terminate the connection,
// the resources are still free.
return nil
}

// ReadTLV reads a type-length-value record from r.
func ReadTLV(r io.Reader) (byte, []byte, error) {
var typ [1]byte
Expand Down
2 changes: 1 addition & 1 deletion cluster/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func newTestService(f func(shardID uint64, points []tsdb.Point) error) testServi
}

mux := tcp.NewMux()
muxln := mux.Listen(cluster.MuxHeader)
muxln := mux.Listen(cluster.MuxWriteHeader)
go mux.Serve(ln)

return testService{
Expand Down
7 changes: 1 addition & 6 deletions cluster/shard_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,6 @@ import (
"gopkg.in/fatih/pool.v2"
)

const (
writeShardRequestMessage byte = iota + 1
writeShardResponseMessage
)

// ShardWriter writes a set of points to a shard.
type ShardWriter struct {
pool *clientPool
Expand Down Expand Up @@ -151,7 +146,7 @@ func (c *connFactory) dial() (net.Conn, error) {
}

// Write a marker byte for cluster messages.
_, err = conn.Write([]byte{MuxHeader})
_, err = conn.Write([]byte{MuxWriteHeader})
if err != nil {
conn.Close()
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions cluster/shard_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
func TestShardWriter_WriteShard_Success(t *testing.T) {
ts := newTestService(writeShardSuccess)
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.WriteListener = ts.muxln
s.TSDBStore = ts
if err := s.Open(); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestShardWriter_WriteShard_Success(t *testing.T) {
func TestShardWriter_WriteShard_Multiple(t *testing.T) {
ts := newTestService(writeShardSuccess)
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.WriteListener = ts.muxln
s.TSDBStore = ts
if err := s.Open(); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestShardWriter_WriteShard_Multiple(t *testing.T) {
func TestShardWriter_WriteShard_Error(t *testing.T) {
ts := newTestService(writeShardFail)
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.WriteListener = ts.muxln
s.TSDBStore = ts
if err := s.Open(); err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestShardWriter_WriteShard_Error(t *testing.T) {
func TestShardWriter_Write_ErrDialTimeout(t *testing.T) {
ts := newTestService(writeShardSuccess)
s := cluster.NewService(cluster.Config{})
s.Listener = ts.muxln
s.WriteListener = ts.muxln
s.TSDBStore = ts
if err := s.Open(); err != nil {
t.Fatal(err)
Expand Down
3 changes: 2 additions & 1 deletion cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,8 @@ func (s *Server) Open() error {
mux := tcp.NewMux()
s.MetaStore.RaftListener = mux.Listen(meta.MuxRaftHeader)
s.MetaStore.ExecListener = mux.Listen(meta.MuxExecHeader)
s.ClusterService.Listener = mux.Listen(cluster.MuxHeader)
s.ClusterService.WriteListener = mux.Listen(cluster.MuxWriteHeader)
s.ClusterService.MapperListener = mux.Listen(cluster.MuxMapperHeader)
s.SnapshotterService.Listener = mux.Listen(snapshotter.MuxHeader)
go mux.Serve(ln)

Expand Down
Loading