Skip to content

Commit

Permalink
Checkpoint -- add build event handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
tylerwilliams committed Feb 18, 2020
1 parent bd4edd9 commit 1d72d20
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 21 deletions.
13 changes: 13 additions & 0 deletions server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
srcs = ["build_event_server.go"],
importpath = "github.com/tryflame/buildbuddy/server/build_event_server",
deps = [
":build_event_handler",
"//proto:build_event_stream_go_proto",
"//proto:build_go_proto",
"@com_github_golang_protobuf//proto:go_default_library",
Expand Down Expand Up @@ -43,6 +44,17 @@ go_library(
],
)

go_library(
name = "build_event_handler",
srcs = ["build_event_handler.go"],
importpath = "github.com/tryflame/buildbuddy/server/build_event_handler",
deps = [
":blobstore",
"//proto:build_event_stream_go_proto",
"//proto:build_go_proto",
],
)

go_library(
name = "static",
srcs = ["static.go"],
Expand All @@ -63,6 +75,7 @@ go_binary(
],
deps = [
":blobstore",
":build_event_handler",
":build_event_server",
":buildbuddy_server",
":config",
Expand Down
7 changes: 7 additions & 0 deletions server/blobstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,10 @@ func (d *DiskBlobStore) GetBlob(blobName string) (Blob, error) {
}
return bufio.NewReadWriter(bufio.NewReader(f), bufio.NewWriter(f)), nil
}

func (d *DiskBlobStore) DeleteBlob(blobName string) error {
if filepath.Base(blobName) != blobName {
return fmt.Errorf("blobName (%s) must not contain dirs.", blobName)
}
return os.Remove(filepath.Join(d.rootDir, blobName))
}
78 changes: 78 additions & 0 deletions server/build_event_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package build_event_handler

import (
"fmt"
"sync"

"github.com/tryflame/buildbuddy/server/blobstore"
"proto/build_event_stream"
)

const (
eventChannelBufferSize = 25
)

type EventChannel struct {
done chan struct{}
in chan *build_event_stream.BuildEvent
}

func (c *EventChannel) WriteEvent(e *build_event_stream.BuildEvent, lastEvent bool) {
if e != nil {
c.in <- e
}
if lastEvent {
close(c.done)
}
}

type BuildEventHandler struct {
bs blobstore.Blobstore
activeChannels map[string]*EventChannel
activeChannelsLock sync.RWMutex
}

func NewBuildEventHandler(bs blobstore.Blobstore) *BuildEventHandler {
return &BuildEventHandler{
bs: bs,
activeChannels: make(map[string]*EventChannel),
activeChannelsLock: sync.RWMutex{},
}
}

func (b *BuildEventHandler) startChannelProcessor(ec *EventChannel) {
go func() {
events := make([]*build_event_stream.BuildEvent, 0)
receivedDoneSignal := false
for event := range ec.in {
if receivedDoneSignal {
break
}
events = append(events, event)
select {
case <-ec.done:
fmt.Printf("got done signal!\n")
receivedDoneSignal = true
}
}
fmt.Printf("received %d events in total\n", len(events))
// sort events
// write to storage and elsewhere
}()
}

func (b *BuildEventHandler) GetEventChannel(eventID string) *EventChannel {
b.activeChannelsLock.RLock()
defer b.activeChannelsLock.RUnlock()

if _, ok := b.activeChannels[eventID]; ok {
return b.activeChannels[eventID]
}
ec := EventChannel{
done: make(chan struct{}),
in: make(chan *build_event_stream.BuildEvent, eventChannelBufferSize),
}
b.activeChannels[eventID] = &ec
b.startChannelProcessor(&ec)
return &ec
}
32 changes: 19 additions & 13 deletions server/build_event_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,38 +5,38 @@ import (
"io"
"log"

_ "github.com/golang/protobuf/proto"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"github.com/tryflame/buildbuddy/server/build_event_handler"

bpb "proto"
"proto/build_event_stream"
)

type BuildEventProtocolServer struct {
eventHandler *build_event_handler.BuildEventHandler
}

func NewBuildEventProtocolServer() (*BuildEventProtocolServer, error) {
return &BuildEventProtocolServer{}, nil
func NewBuildEventProtocolServer(h *build_event_handler.BuildEventHandler) (*BuildEventProtocolServer, error) {
return &BuildEventProtocolServer{
eventHandler: h,
}, nil
}

func (s *BuildEventProtocolServer) chompBuildEvent(obe *bpb.OrderedBuildEvent) error {
func (s *BuildEventProtocolServer) chompBuildEvent(obe *bpb.OrderedBuildEvent) (*build_event_stream.BuildEvent, bool, error) {
switch buildEvent := obe.Event.Event.(type) {
case *bpb.BuildEvent_ComponentStreamFinished:
log.Print("BuildTool: ComponentStreamFinished: ", buildEvent.ComponentStreamFinished)
return nil, true, nil
case *bpb.BuildEvent_BazelEvent:
var bazelBuildEvent build_event_stream.BuildEvent
if err := ptypes.UnmarshalAny(buildEvent.BazelEvent, &bazelBuildEvent); err != nil {
return err
}
switch bazelBuildEvent.Payload.(type) {
default:
log.Printf("Payload: %+v", bazelBuildEvent.Payload)
return nil, false, err
}
default:
log.Printf("Unknown event: %+v", buildEvent)
return &bazelBuildEvent, false, nil
}
return nil
return nil, false, nil
}

func (s *BuildEventProtocolServer) PublishLifecycleEvent(ctx context.Context, req *bpb.PublishLifecycleEventRequest) (*empty.Empty, error) {
Expand Down Expand Up @@ -71,9 +71,15 @@ func (s *BuildEventProtocolServer) PublishBuildToolEventStream(stream bpb.Publis
if err != nil {
return err
}
if err := s.chompBuildEvent(in.OrderedBuildEvent); err != nil {
key := proto.MarshalTextString(in.OrderedBuildEvent.StreamId)
eventChannel := s.eventHandler.GetEventChannel(key)
bazelEvent, last, err := s.chompBuildEvent(in.OrderedBuildEvent)
if err != nil {
return err
}
log.Printf("About to write event")
eventChannel.WriteEvent(bazelEvent, last)
log.Printf("Write event")

if lastReceived == -1 {
lastReceived = in.OrderedBuildEvent.SequenceNumber
Expand Down
20 changes: 12 additions & 8 deletions server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,20 @@ package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"

"github.com/tryflame/buildbuddy/server/blobstore"
"github.com/tryflame/buildbuddy/server/build_event_handler"
"github.com/tryflame/buildbuddy/server/build_event_server"
"github.com/tryflame/buildbuddy/server/buildbuddy_server"
"github.com/tryflame/buildbuddy/server/config"
"github.com/tryflame/buildbuddy/server/static"
"google.golang.org/grpc"

bpb "proto"
bbspb "proto/buildbuddy_service"
)

var (
Expand Down Expand Up @@ -49,17 +50,20 @@ func main() {
}

fmt.Printf("Loaded configurator: %s", configurator)
sfs, err := static.NewStaticFileServer(*staticDirectory, false)
staticFileServer, err := static.NewStaticFileServer(*staticDirectory, false)
if err != nil {
log.Fatalf("Error initializing static file server: %v", err)
log.Fatalf("Error initializing static file server: %s", err)
}

diskBlobStore := blobstore.NewDiskBlobStore(configurator.GetStorageDiskRootDir())
eventHandler := build_event_handler.NewBuildEventHandler(diskBlobStore)

afs, err := static.NewStaticFileServer(*appDirectory, true)
if err != nil {
log.Fatalf("Error initializing app server: %v", err)
log.Fatalf("Error initializing app server: %s", err)
}

http.Handle("/", redirectHTTPS(sfs))
http.Handle("/", redirectHTTPS(staticFileServer))
http.Handle("/app/", redirectHTTPS(afs))

hostAndPort := fmt.Sprintf("%s:%d", *listen, *port)
Expand All @@ -71,12 +75,12 @@ func main() {

lis, err := net.Listen("tcp", gRPCHostAndPort)
if err != nil {
log.Fatalf("Failed to listen: %v", err)
log.Fatalf("Failed to listen: %s", err)
}
grpcServer := grpc.NewServer()
buildEventServer, err := build_event_server.NewBuildEventProtocolServer()
buildEventServer, err := build_event_server.NewBuildEventProtocolServer(eventHandler)
if err != nil {
log.Fatalf("Error initializing BuildEventProtocolServer: %v", err)
log.Fatalf("Error initializing BuildEventProtocolServer: %s", err)
}
bpb.RegisterPublishBuildEventServer(grpcServer, buildEventServer)
buildBuddyServer, err := buildbuddy_server.NewBuildBuddyServer()
Expand Down

0 comments on commit 1d72d20

Please sign in to comment.