Skip to content

Commit

Permalink
Merge pull request #19 from OGKevin/add-no-ops
Browse files Browse the repository at this point in the history
  • Loading branch information
OGKevin-bot authored Jun 8, 2019
2 parents 8caf0c1 + d7318dc commit 2f53106
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 25 deletions.
12 changes: 10 additions & 2 deletions sonic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type Client struct {
address string
password string

IngestService *IngestService
SearchService *SearchService
IngestService IngestService
SearchService SearchService
}

// SetDeadline sets the read and write deadlines associated
Expand Down Expand Up @@ -144,6 +144,14 @@ func NewClientWithPassword(address, password string, ctx context.Context) (*Clie
return &client, nil
}

func NewNoOpsClient(ctx context.Context) *Client {
return &Client{
ctx: ctx,
IngestService: &NoOpsIngestService{},
SearchService: &NoOpsSearchService{},
}
}

func (c *Client) reconnect(ctx context.Context) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "sonic-reconnect")
defer sp.Finish()
Expand Down
66 changes: 55 additions & 11 deletions sonic/ingest_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,24 +12,68 @@ import (
"sync"
)

type IngestService interface {
Push(data *Data) (bool, error)
Pop(data *Data) (int, error)
Count(data *Data) (int, error)
Flushc(data *Data) (int, error)
Flushb(data *Data) (int, error)
Flusho(data *Data) (int, error)

connect(ctx context.Context) error
}

// NoOpsIngestService is an IngestService that does no operations on the methods it implements.
type NoOpsIngestService struct {

}

func (*NoOpsIngestService) Push(data *Data) (bool, error) {
return true, nil
}

func (*NoOpsIngestService) Pop(data *Data) (int, error) {
return 0, nil
}

func (*NoOpsIngestService) Count(data *Data) (int, error) {
return 0, nil
}

func (*NoOpsIngestService) Flushc(data *Data) (int, error) {
return 0, nil
}

func (*NoOpsIngestService) Flushb(data *Data) (int, error) {
return 0, nil
}

func (*NoOpsIngestService) Flusho(data *Data) (int, error) {
return 0, nil
}

func (*NoOpsIngestService) connect(ctx context.Context) error {
return nil
}

// IngestService exposes the ingest mode of sonic
type IngestService struct {
type ingestService struct {
c *Client

l sync.Mutex
s *bufio.Scanner
}

func newIngestService(ctx context.Context, c *Client) (*IngestService, error) {
func newIngestService(ctx context.Context, c *Client) (IngestService, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "sonic-newIngestService")
defer sp.Finish()

i := &IngestService{c: c}
i := &ingestService{c: c}

return i, errors.Wrap(i.connect(ctx), "could not connect to ingest service")
}

func (i *IngestService) connect(ctx context.Context) error {
func (i *ingestService) connect(ctx context.Context) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "sonic-ingest-connect")
defer sp.Finish()

Expand Down Expand Up @@ -61,7 +105,7 @@ parse:
}

// Push search data in the index
func (i *IngestService) Push(data *Data) (bool, error) {
func (i *ingestService) Push(data *Data) (bool, error) {
if data.Collection == "" || data.Bucket == "" || data.Object == "" || data.Text == "" {
return false, errors.New("all ingest data are required for pushing")
}
Expand Down Expand Up @@ -89,7 +133,7 @@ func (i *IngestService) Push(data *Data) (bool, error) {
}

// Pop search data from the index
func (i *IngestService) Pop(data *Data) (int, error) {
func (i *ingestService) Pop(data *Data) (int, error) {
if data.Collection == "" || data.Bucket == "" || data.Object == "" || data.Text == "" {
return 0, errors.New("all ingest data are required for pushing")
}
Expand Down Expand Up @@ -122,7 +166,7 @@ func (i *IngestService) Pop(data *Data) (int, error) {
}

// Count indexed search data
func (i *IngestService) Count(data *Data) (int, error) {
func (i *ingestService) Count(data *Data) (int, error) {
if data.Collection == "" {
return 0, errors.New("collection can not be an empty string")
}
Expand Down Expand Up @@ -173,7 +217,7 @@ func (i *IngestService) Count(data *Data) (int, error) {
return c, nil
}

func (i *IngestService) flush(query string) (int, error) {
func (i *ingestService) flush(query string) (int, error) {
i.l.Lock()
defer i.l.Unlock()
_, err := io.WriteString(i.c.i, query)
Expand Down Expand Up @@ -206,7 +250,7 @@ func (i *IngestService) flush(query string) (int, error) {
}

// Flushc Flush all indexed data from a collection
func (i *IngestService) Flushc(data *Data) (int, error) {
func (i *ingestService) Flushc(data *Data) (int, error) {
if data.Collection == "" {
return 0, errors.New("collection can not be an empty string")
}
Expand All @@ -215,7 +259,7 @@ func (i *IngestService) Flushc(data *Data) (int, error) {
}

// Flushb Flush all indexed data from a bucket in a collection
func (i *IngestService) Flushb(data *Data) (int, error) {
func (i *ingestService) Flushb(data *Data) (int, error) {
if data.Collection == "" || data.Bucket == "" {
return 0, errors.New("collection and bucket can not be an empty strings")
}
Expand All @@ -224,7 +268,7 @@ func (i *IngestService) Flushb(data *Data) (int, error) {
}

// Flusho Flush all indexed data from an object in a bucket in collection
func (i *IngestService) Flusho(data *Data) (int, error) {
func (i *ingestService) Flusho(data *Data) (int, error) {
if data.Collection == "" || data.Bucket == "" || data.Object == "" {
return 0, errors.New("collection, bucket and object can not be an empty strings")
}
Expand Down
59 changes: 47 additions & 12 deletions sonic/search_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,43 @@ type pendingQuery struct {
sp opentracing.Span
}

// SearchService exposes the search mode of sonic
type SearchService struct {
type SearchService interface {
Suggest(ctx context.Context, data *Data, limit int) (chan string, error)
Ping(ctx context.Context) error
Query(ctx context.Context, data *Data, offset, limit int) (chan string, error)

connect(ctx context.Context) error
}

// NoOpsSearchService Is a search service that preforms no operations.
type NoOpsSearchService struct {

}

func (*NoOpsSearchService) connect(ctx context.Context) error {
return nil
}

func (*NoOpsSearchService) Suggest(ctx context.Context, data *Data, limit int) (chan string, error) {
ch := make(chan string)
close(ch)

return ch, nil
}

func (*NoOpsSearchService) Ping(ctx context.Context) error {
return nil
}

func (*NoOpsSearchService) Query(ctx context.Context, data *Data, offset, limit int) (chan string, error) {
ch := make(chan string)
close(ch)

return ch, nil
}

// searchService exposes the search mode of sonic
type searchService struct {
c *Client

sl sync.Mutex
Expand All @@ -38,11 +73,11 @@ type SearchService struct {
ctx context.Context
}

func newSearchService(ctx context.Context, c *Client) (*SearchService, error) {
func newSearchService(ctx context.Context, c *Client) (SearchService, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "sonic-newSearchService")
defer sp.Finish()

ss := &SearchService{c: c, pending: make(map[string]*pendingQuery), ctx: c.ctx}
ss := &searchService{c: c, pending: make(map[string]*pendingQuery), ctx: c.ctx}

err := ss.connect(ctx)
if err != nil {
Expand All @@ -54,7 +89,7 @@ func newSearchService(ctx context.Context, c *Client) (*SearchService, error) {
return ss, nil
}

func (s *SearchService) connect(ctx context.Context) error {
func (s *searchService) connect(ctx context.Context) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "sonic-search-connect")
defer sp.Finish()

Expand Down Expand Up @@ -90,7 +125,7 @@ parse:
return nil
}

func (s *SearchService) keepAlive() {
func (s *searchService) keepAlive() {
go func() {
ticker := time.Tick(time.Second * 5)
for {
Expand Down Expand Up @@ -118,7 +153,7 @@ func (s *SearchService) keepAlive() {
}()
}

func (s *SearchService) pollForEvents() {
func (s *searchService) pollForEvents() {
s.onePoll.Do(func() {
t := time.NewTicker(time.Millisecond * 100)
go func() {
Expand Down Expand Up @@ -183,7 +218,7 @@ func (s *SearchService) pollForEvents() {
}

// Suggest auto-completes word
func (s *SearchService) Suggest(ctx context.Context, data *Data, limit int) (chan string, error) {
func (s *searchService) Suggest(ctx context.Context, data *Data, limit int) (chan string, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "sonic-Suggest")
defer sp.Finish()

Expand Down Expand Up @@ -215,7 +250,7 @@ func (s *SearchService) Suggest(ctx context.Context, data *Data, limit int) (cha
return ch, nil
}

func (s *SearchService) Ping(ctx context.Context) error {
func (s *searchService) Ping(ctx context.Context) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "sonic-Ping")
defer sp.Finish()

Expand Down Expand Up @@ -252,7 +287,7 @@ ping:
}

// Query query database
func (s *SearchService) Query(ctx context.Context, data *Data, offset, limit int) (chan string, error) {
func (s *searchService) Query(ctx context.Context, data *Data, offset, limit int) (chan string, error) {
logrus.Debug("preforming query")
defer logrus.Debug("done performing query")

Expand Down Expand Up @@ -299,7 +334,7 @@ func (s *SearchService) Query(ctx context.Context, data *Data, offset, limit int
return ch, nil
}

func (s *SearchService) parseResponse(ctx context.Context) (chan string, error) {
func (s *searchService) parseResponse(ctx context.Context) (chan string, error) {
logrus.Debug("parsing response")
defer logrus.Debug("done parsing response")

Expand Down Expand Up @@ -345,7 +380,7 @@ scan:
}
}

func (s *SearchService) handleEvent(ctx context.Context, event string) {
func (s *searchService) handleEvent(ctx context.Context, event string) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "sonic-handleEvent")
defer sp.Finish()

Expand Down

0 comments on commit 2f53106

Please sign in to comment.