Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store build steps and logs as structured data [breaking change] #1981

Merged
merged 9 commits into from
Apr 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
drone/drone
*.sqlite
*_gen.go
!store/datastore/sql/sqlite/sql_gen.go
!store/datastore/sql/mysql/sql_gen.go
!store/datastore/sql/postgres/sql_gen.go
#*.css
*.txt
*.zip
Expand Down
94 changes: 82 additions & 12 deletions drone/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package agent

import (
"context"
"encoding/json"
"io"
"io/ioutil"
"log"
"math"
"net/url"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -189,9 +192,9 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {

state := rpc.State{}
state.Started = time.Now().Unix()
err = client.Update(context.Background(), work.ID, state)
err = client.Init(context.Background(), work.ID, state)
if err != nil {
log.Printf("pipeline: error updating pipeline status: %s: %s", work.ID, err)
log.Printf("pipeline: error signaling pipeline init: %s: %s", work.ID, err)
}

var uploads sync.WaitGroup
Expand All @@ -201,9 +204,31 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
return rerr
}
uploads.Add(1)
writer := rpc.NewLineWriter(client, work.ID, proc.Alias)
rlimit := io.LimitReader(part, maxLogsUpload)
io.Copy(writer, rlimit)

var secrets []string
for _, secret := range work.Config.Secrets {
if secret.Mask {
secrets = append(secrets, secret.Value)
}
}

limitedPart := io.LimitReader(part, maxLogsUpload)
logstream := rpc.NewLineWriter(client, work.ID, proc.Alias, secrets...)
io.Copy(logstream, limitedPart)

file := &rpc.File{}
file.Mime = "application/json+logs"
file.Proc = proc.Alias
file.Name = "logs.json"
file.Data, _ = json.Marshal(logstream.Lines())
file.Size = len(file.Data)
file.Time = time.Now().Unix()

if serr := client.Upload(context.Background(), work.ID, file); serr != nil {
log.Printf("pipeline: cannot upload logs: %s: %s: %s", work.ID, file.Mime, serr)
} else {
log.Printf("pipeline: finish uploading logs: %s: step %s: %s", file.Mime, work.ID, proc.Alias)
}

defer func() {
log.Printf("pipeline: finish uploading logs: %s: step %s", work.ID, proc.Alias)
Expand All @@ -214,18 +239,62 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
if rerr != nil {
return nil
}
rlimit = io.LimitReader(part, maxFileUpload)
mime := part.Header().Get("Content-Type")
if serr := client.Upload(context.Background(), work.ID, mime, rlimit); serr != nil {
log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, mime, serr)
// TODO should be configurable
limitedPart = io.LimitReader(part, maxFileUpload)
file = &rpc.File{}
file.Mime = part.Header().Get("Content-Type")
file.Proc = proc.Alias
file.Name = part.FileName()
file.Data, _ = ioutil.ReadAll(limitedPart)
file.Size = len(file.Data)
file.Time = time.Now().Unix()

if serr := client.Upload(context.Background(), work.ID, file); serr != nil {
log.Printf("pipeline: cannot upload artifact: %s: %s: %s", work.ID, file.Mime, serr)
} else {
log.Printf("pipeline: finish uploading artifact: %s: step %s: %s", file.Mime, work.ID, proc.Alias)
}
return nil
})

defaultTracer := pipeline.TraceFunc(func(state *pipeline.State) error {
procState := rpc.State{
Proc: state.Pipeline.Step.Alias,
Exited: state.Process.Exited,
ExitCode: state.Process.ExitCode,
Started: time.Now().Unix(), // TODO do not do this
Finished: time.Now().Unix(),
}
defer func() {
if uerr := client.Update(context.Background(), work.ID, procState); uerr != nil {
log.Printf("Pipeine: error updating pipeline step status: %s: %s: %s", work.ID, procState.Proc, uerr)
}
}()
if state.Process.Exited {
return nil
}
if state.Pipeline.Step.Environment == nil {
state.Pipeline.Step.Environment = map[string]string{}
}
state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "success"
state.Pipeline.Step.Environment["CI_BUILD_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10)
state.Pipeline.Step.Environment["CI_BUILD_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10)

state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "success"
state.Pipeline.Step.Environment["CI_JOB_STARTED"] = strconv.FormatInt(state.Pipeline.Time, 10)
state.Pipeline.Step.Environment["CI_JOB_FINISHED"] = strconv.FormatInt(time.Now().Unix(), 10)

if state.Pipeline.Error != nil {
state.Pipeline.Step.Environment["CI_BUILD_STATUS"] = "failure"
state.Pipeline.Step.Environment["CI_JOB_STATUS"] = "failure"
}
return nil
})

err = pipeline.New(work.Config,
pipeline.WithContext(ctx),
pipeline.WithLogger(defaultLogger),
pipeline.WithTracer(pipeline.DefaultTracer),
pipeline.WithTracer(defaultTracer),
pipeline.WithEngine(engine),
).Run()

Expand All @@ -247,9 +316,10 @@ func run(ctx context.Context, client rpc.Peer, filter rpc.Filter) error {
log.Printf("pipeline: execution complete: %s", work.ID)

uploads.Wait()
err = client.Update(context.Background(), work.ID, state)

err = client.Done(context.Background(), work.ID, state)
if err != nil {
log.Printf("Pipeine: error updating pipeline status: %s: %s", work.ID, err)
log.Printf("Pipeine: error signaling pipeline done: %s: %s", work.ID, err)
}

return nil
Expand Down
3 changes: 2 additions & 1 deletion model/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ type Build struct {
Verified bool `json:"verified" meddler:"build_verified"` // deprecate
Reviewer string `json:"reviewed_by" meddler:"build_reviewer"`
Reviewed int64 `json:"reviewed_at" meddler:"build_reviewed"`
Jobs []*Job `json:"jobs,omitempty" meddler:"-"`
// Jobs []*Job `json:"jobs,omitempty" meddler:"-"`
Procs []*Proc `json:"procs,omitempty" meddler:"-"`
}

type BuildGroup struct {
Expand Down
21 changes: 1 addition & 20 deletions model/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,5 @@ type Event struct {
Type EventType `json:"type"`
Repo Repo `json:"repo"`
Build Build `json:"build"`
Job Job `json:"job"`
}

// NewEvent creates a new Event for the build, using copies of
// the build data to avoid possible mutation or race conditions.
func NewEvent(t EventType, r *Repo, b *Build, j *Job) *Event {
return &Event{
Type: t,
Repo: *r,
Build: *b,
Job: *j,
}
}

func NewBuildEvent(t EventType, r *Repo, b *Build) *Event {
return &Event{
Type: t,
Repo: *r,
Build: *b,
}
Proc Proc `json:"proc"`
}
23 changes: 23 additions & 0 deletions model/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package model

import "io"

// FileStore persists pipeline artifacts to storage.
type FileStore interface {
FileList(*Build) ([]*File, error)
FileFind(*Proc, string) (*File, error)
FileRead(*Proc, string) (io.ReadCloser, error)
FileCreate(*File, io.Reader) error
}

// File represents a pipeline artifact.
type File struct {
ID int64 `json:"id" meddler:"file_id,pk"`
BuildID int64 `json:"build_id" meddler:"file_build_id"`
ProcID int64 `json:"proc_id" meddler:"file_proc_id"`
Name string `json:"name" meddler:"file_name"`
Size int `json:"size" meddler:"file_size"`
Mime string `json:"mime" meddler:"file_mime"`
Time int64 `json:"time" meddler:"file_time"`
// Data []byte `json:"data" meddler:"file_data"`
}
30 changes: 15 additions & 15 deletions model/job.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
package model

// swagger:model job
type Job struct {
ID int64 `json:"id" meddler:"job_id,pk"`
BuildID int64 `json:"-" meddler:"job_build_id"`
NodeID int64 `json:"-" meddler:"job_node_id"`
Number int `json:"number" meddler:"job_number"`
Error string `json:"error" meddler:"job_error"`
Status string `json:"status" meddler:"job_status"`
ExitCode int `json:"exit_code" meddler:"job_exit_code"`
Enqueued int64 `json:"enqueued_at" meddler:"job_enqueued"`
Started int64 `json:"started_at" meddler:"job_started"`
Finished int64 `json:"finished_at" meddler:"job_finished"`

Environment map[string]string `json:"environment" meddler:"job_environment,json"`
}
// // swagger:model job
// type Job struct {
// ID int64 `json:"id" meddler:"job_id,pk"`
// BuildID int64 `json:"-" meddler:"job_build_id"`
// NodeID int64 `json:"-" meddler:"job_node_id"`
// Number int `json:"number" meddler:"job_number"`
// Error string `json:"error" meddler:"job_error"`
// Status string `json:"status" meddler:"job_status"`
// ExitCode int `json:"exit_code" meddler:"job_exit_code"`
// Enqueued int64 `json:"enqueued_at" meddler:"job_enqueued"`
// Started int64 `json:"started_at" meddler:"job_started"`
// Finished int64 `json:"finished_at" meddler:"job_finished"`
//
// Environment map[string]string `json:"environment" meddler:"job_environment,json"`
// }
8 changes: 0 additions & 8 deletions model/key.go

This file was deleted.

7 changes: 0 additions & 7 deletions model/log.go

This file was deleted.

60 changes: 60 additions & 0 deletions model/proc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package model

// ProcStore persists process information to storage.
type ProcStore interface {
ProcLoad(int64) (*Proc, error)
ProcFind(*Build, int) (*Proc, error)
ProcChild(*Build, int, string) (*Proc, error)
ProcList(*Build) ([]*Proc, error)
ProcCreate([]*Proc) error
ProcUpdate(*Proc) error
ProcClear(*Build) error
}

// Proc represents a process in the build pipeline.
// swagger:model proc
type Proc struct {
ID int64 `json:"id" meddler:"proc_id,pk"`
BuildID int64 `json:"build_id" meddler:"proc_build_id"`
PID int `json:"pid" meddler:"proc_pid"`
PPID int `json:"ppid" meddler:"proc_ppid"`
PGID int `json:"pgid" meddler:"proc_pgid"`
Name string `json:"name" meddler:"proc_name"`
State string `json:"state" meddler:"proc_state"`
Error string `json:"error,omitempty" meddler:"proc_error"`
ExitCode int `json:"exit_code" meddler:"proc_exit_code"`
Started int64 `json:"start_time,omitempty" meddler:"proc_started"`
Stopped int64 `json:"end_time,omitempty" meddler:"proc_stopped"`
Machine string `json:"machine,omitempty" meddler:"proc_machine"`
Platform string `json:"platform,omitempty" meddler:"proc_platform"`
Environ map[string]string `json:"environ,omitempty" meddler:"proc_environ,json"`
Children []*Proc `json:"children,omitempty" meddler:"-"`
}

// Running returns true if the process state is pending or running.
func (p *Proc) Running() bool {
return p.State == StatusPending || p.State == StatusRunning
}

// Failing returns true if the process state is failed, killed or error.
func (p *Proc) Failing() bool {
return p.State == StatusError || p.State == StatusKilled || p.State == StatusFailure
}

// Tree creates a process tree from a flat process list.
func Tree(procs []*Proc) []*Proc {
var (
nodes []*Proc
parent *Proc
)
for _, proc := range procs {
if proc.PPID == 0 {
nodes = append(nodes, proc)
parent = proc
continue
} else {
parent.Children = append(parent.Children, proc)
}
}
return nodes
}
19 changes: 0 additions & 19 deletions model/work.go

This file was deleted.

2 changes: 1 addition & 1 deletion router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ func Load(middleware ...gin.HandlerFunc) http.Handler {
repo.GET("", server.GetRepo)
repo.GET("/builds", server.GetBuilds)
repo.GET("/builds/:number", server.GetBuild)
repo.GET("/logs/:number/:job", server.GetBuildLogs)
repo.GET("/logs/:number/:ppid/:proc", server.GetBuildLogs)
repo.POST("/sign", session.MustPush, server.Sign)

repo.GET("/secrets", session.MustPush, server.GetSecrets)
Expand Down
Loading