Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to timestamp lines #430

Merged
merged 3 commits into from
Mar 7, 2017
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions agent/header_times_streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,16 @@ func (h *HeaderTimesStreamer) Scan(line string) {
h.scanWaitGroup.Add(1)
defer h.scanWaitGroup.Done()

if h.lineIsHeader(line) {
logger.Debug("[HeaderTimesStreamer] Found header %q", line)
logger.Debug("[HeaderTimesStreamer] Found header %q", line)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm probably missing something, but I think this should still be there? Otherwise each line is probably considered a ---header right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to move that check out of header_times_streamer.go, it introduces some more coupling but I didn't want the check to happen twice. Unfortunately I have to know if something is a header before I can timestamp it, so it's done here now:

https://github.com/buildkite/agent/pull/430/files#diff-14ee62401fee1966227df2f4ab955c3cR192


// Aquire a lock on the times and then add the current time to
// our times slice.
h.timesMutex.Lock()
h.times = append(h.times, time.Now().UTC().Format(time.RFC3339Nano))
h.timesMutex.Unlock()
// Aquire a lock on the times and then add the current time to
// our times slice.
h.timesMutex.Lock()
h.times = append(h.times, time.Now().UTC().Format(time.RFC3339Nano))
h.timesMutex.Unlock()

// Add the time to the wait group
h.uploadWaitGroup.Add(1)
}
// Add the time to the wait group
h.uploadWaitGroup.Add(1)
}

func (h *HeaderTimesStreamer) Upload() {
Expand Down Expand Up @@ -136,7 +134,7 @@ func (h *HeaderTimesStreamer) Stop() {
h.streamingMutex.Unlock()
}

func (h *HeaderTimesStreamer) lineIsHeader(line string) bool {
func (h *HeaderTimesStreamer) LineIsHeader(line string) bool {
// Make sure all ANSI colors are removed from the string before we
// check to see if it's a header (sometimes a color escape sequence may
// be the first thing on the line, which will cause the regex to ignore
Expand Down
18 changes: 13 additions & 5 deletions agent/job_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -59,13 +60,20 @@ func (r JobRunner) Create() (runner *JobRunner, err error) {
// the Buildkite Agent API
runner.logStreamer = LogStreamer{MaxChunkSizeBytes: r.Job.ChunksMaxSizeBytes, Callback: r.onUploadChunk}.New()

timestamp, err := strconv.ParseBool(os.Getenv("BUILDKITE_TIMESTAMP_LINES"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should check this in AgentConfiguration? Or are you thinking that BUILDKITE_TIMESTAMP_LINES could be turned on/off from the BK UI at runtime?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can move it there!

if err != nil {
timestamp = false
}

// The process that will run the bootstrap script
runner.process = process.Process{
Script: r.AgentConfiguration.BootstrapScript,
Env: r.createEnvironment(),
PTY: r.AgentConfiguration.RunInPty,
StartCallback: r.onProcessStartCallback,
LineCallback: runner.headerTimesStreamer.Scan,
Script: r.AgentConfiguration.BootstrapScript,
Env: r.createEnvironment(),
PTY: r.AgentConfiguration.RunInPty,
Timestamp: timestamp,
StartCallback: r.onProcessStartCallback,
LineCallback: runner.headerTimesStreamer.Scan,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohhh right right, I see how this works now. Took a while for it to click :)

So essentially, LineCallback will only be called if the LineCallbackFilter function returns true for a particular line, is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's exactly the idea! The ugly bit of coupling is that lines are not timestamped if there is any callback handler. However, the only callback handler that exists right now is used for the header timestamps.

LineCallbackFilter: runner.headerTimesStreamer.LineIsHeader,
}.Create()

return
Expand Down
49 changes: 41 additions & 8 deletions process/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type Process struct {
Pid int
PTY bool
Timestamp bool
Script string
Env []string
ExitStatus string
Expand All @@ -35,8 +36,9 @@ type Process struct {
StartCallback func()

// For every line in the process output, this callback will be called
// with the contents of the line
LineCallback func(string)
// with the contents of the line if its filter returns true.
LineCallback func(string)
LineCallbackFilter func(string) bool

// Running is stored as an int32 so we can use atomic operations to
// set/get it (it's accessed by multiple goroutines)
Expand Down Expand Up @@ -67,7 +69,12 @@ func (p *Process) Start() error {

lineReaderPipe, lineWriterPipe := io.Pipe()

multiWriter := io.MultiWriter(&p.buffer, lineWriterPipe)
var multiWriter io.Writer
if p.Timestamp {
multiWriter = io.MultiWriter(lineWriterPipe)
} else {
multiWriter = io.MultiWriter(&p.buffer, lineWriterPipe)
}

logger.Info("Starting to run script: %s", p.command.Path)

Expand Down Expand Up @@ -171,11 +178,37 @@ func (p *Process) Start() error {
}
}

lineCallbackWaitGroup.Add(1)
go func(line string) {
defer lineCallbackWaitGroup.Done()
p.LineCallback(line)
}(string(line))
// If we're timestamping this main thread will take
// the hit of running the regex so we can build up
// the timestamped buffer without breaking headers,
// otherwise we let the goroutines take the perf hit.

checkedForCallback := false
lineHasCallback := false
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bit here has me a little confused. lineHasCallback always seems to be false unless timestamping has been turned on?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that variable is not strictly needed but I left it for clarity.

The idea was to use a short circuit evaluation to prevent doing the check twice in case the timestamping option is enabled: https://github.com/buildkite/agent/pull/430/files#diff-14ee62401fee1966227df2f4ab955c3cR207

I admit it's not super elegant.

lineString := string(line)

// Create the prefixed buffer
if p.Timestamp {
lineHasCallback = p.LineCallbackFilter(lineString)
checkedForCallback = true
if lineHasCallback {
// Don't timestamp special lines (e.g. header)
p.buffer.WriteString(fmt.Sprintf("%s\n", line))
} else {
currentTime := time.Now().UTC().Format(time.RFC3339)
p.buffer.WriteString(fmt.Sprintf("[%s] %s\n", currentTime, line))
}
}

if lineHasCallback || !checkedForCallback {
lineCallbackWaitGroup.Add(1)
go func(line string) {
defer lineCallbackWaitGroup.Done()
if (checkedForCallback && lineHasCallback) || p.LineCallbackFilter(lineString) {
p.LineCallback(line)
}
}(lineString)
}
}

// We need to make sure all the line callbacks have finish before
Expand Down