Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: service logs workflow v2 #6684

Merged
merged 24 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/cdsctl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ func loadConfig(cmd *cobra.Command) (string, *cdsclient.Config, error) {

if contextName != "" {
if cdsctx, err = internal.GetContext(f, contextName); err != nil {
return "", nil, cli.NewError("unable to load the current context from %s", contextName)
return "", nil, cli.NewError("unable to load the current context name %s", contextName)
}
} else if cdsctx, err = internal.GetCurrentContext(f); err != nil {
return "", nil, cli.NewError("unable to load the current context from %s", configFile)
return "", nil, cli.NewError("unable to load the current context file %s err:%v", configFile, err)
}

if verbose {
Expand Down
2 changes: 1 addition & 1 deletion cli/cdsctl/experimental_workflow_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func workflowRunJobLogsDownloadFunc(v cli.Values) error {

for _, link := range links.Data {
fileName := getFileName(rj, rj.Job.Steps[link.StepOrder].ID, link.StepOrder)
data, err := client.WorkflowLogDownload(context.Background(), sdk.CDNLogLink{APIRef: link.APIRef, ItemType: links.ItemType})
data, err := client.WorkflowLogDownload(context.Background(), sdk.CDNLogLink{APIRef: link.APIRef, ItemType: link.ItemType})
if err != nil {
if strings.Contains(err.Error(), "resource not found") {
continue
Expand Down
5 changes: 4 additions & 1 deletion engine/api/auth_hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"time"

"github.com/pkg/errors"
"github.com/rockbears/log"

"github.com/ovh/cds/engine/api/authentication"
Expand Down Expand Up @@ -144,7 +145,9 @@ func (api *API) postAuthHatcherySigninHandler() ([]service.RbacChecker, service.

// This has to be called by the signin handler
func (api *API) hatcheryRegister(ctx context.Context, tx gorpmapper.SqlExecutorWithTx, consumer sdk.AuthHatcheryConsumer, sessionID string, h *sdk.Hatchery, signInRequest sdk.AuthConsumerHatcherySigninRequest) error {
h.Name = signInRequest.Name
if signInRequest.Name != h.Name {
return sdk.NewError(sdk.ErrForbidden, errors.Errorf("wrong token. name (from hatchery configuration):%s != name (from token):%s", h.Name, signInRequest.Name))
}
h.HTTPURL = signInRequest.HTTPURL
h.Config = signInRequest.Config
h.PublicKey = signInRequest.PublicKey
Expand Down
17 changes: 12 additions & 5 deletions engine/api/v2_hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,24 @@ func (api *API) deleteHatcheryHandler() ([]service.RbacChecker, service.Handler)
return err
}

hatcheryPermission, err := rbac.LoadRBACByHatcheryID(ctx, api.mustDB(), hatch.ID)
if err != nil {
return err
}

tx, err := api.mustDB().Begin()
if err != nil {
return sdk.WithStack(err)
}
defer tx.Rollback() // nolint

hatcheryPermission, err := rbac.LoadRBACByHatcheryID(ctx, tx, hatch.ID)
if err != nil {
if !sdk.ErrorIs(err, sdk.ErrNotFound) {
return err
}
// here, no rbac found, just delete the hatchery
if err := hatchery.Delete(tx, hatch.ID); err != nil {
return err
}
return sdk.WithStack(tx.Commit())
}

// Remove all permissions on this hatchery
rbacHatcheries := make([]sdk.RBACHatchery, 0)
for _, h := range hatcheryPermission.Hatcheries {
Expand Down
23 changes: 16 additions & 7 deletions engine/api/v2_workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (api *API) getWorkflowRunJobLogsLinksV2Handler() ([]service.RbacChecker, se
}
repositoryIdentifier, err := url.PathUnescape(vars["repositoryIdentifier"])
if err != nil {
return sdk.WithStack(err)
return sdk.NewError(sdk.ErrWrongRequest, err)
}
workflowName := vars["workflow"]
runNumberS := vars["runNumber"]
Expand Down Expand Up @@ -311,6 +311,13 @@ func (api *API) getWorkflowRunJobLogsLinksV2Handler() ([]service.RbacChecker, se
RunAttempt: runJob.RunAttempt,
}

for serviceName := range runJob.Job.Services {
ref := apiRef
ref.ServiceName = serviceName
ref.ItemType = sdk.CDNTypeItemServiceLogV2
refs = append(refs, ref)
}

for k := range runJob.StepsStatus {
stepOrder := -1
for i := range runJob.Job.Steps {
Expand All @@ -327,6 +334,7 @@ func (api *API) getWorkflowRunJobLogsLinksV2Handler() ([]service.RbacChecker, se
ref := apiRef
ref.StepName = sdk.GetJobStepName(k, stepOrder)
ref.StepOrder = int64(stepOrder)
ref.ItemType = sdk.CDNTypeItemJobStepLog
refs = append(refs, ref)
}
datas := make([]sdk.CDNLogLinkData, 0, len(refs))
Expand All @@ -337,9 +345,11 @@ func (api *API) getWorkflowRunJobLogsLinksV2Handler() ([]service.RbacChecker, se
}
apiRefHash := strconv.FormatUint(apiRefHashU, 10)
datas = append(datas, sdk.CDNLogLinkData{
APIRef: apiRefHash,
StepOrder: r.StepOrder,
StepName: r.StepName,
APIRef: apiRefHash,
StepOrder: r.StepOrder,
StepName: r.StepName,
ServiceName: r.ServiceName,
ItemType: r.ItemType,
})
}

Expand All @@ -349,9 +359,8 @@ func (api *API) getWorkflowRunJobLogsLinksV2Handler() ([]service.RbacChecker, se
}

return service.WriteJSON(w, sdk.CDNLogLinks{
CDNURL: httpURL,
ItemType: sdk.CDNTypeItemJobStepLog,
Data: datas,
CDNURL: httpURL,
Data: datas,
}, http.StatusOK)
}
}
Expand Down
2 changes: 1 addition & 1 deletion engine/api/v2_workflow_run_job_routines.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (api *API) stopDeadJob(ctx context.Context, store cache.Store, db *gorp.DbM
info := sdk.V2WorkflowRunJobInfo{
Level: sdk.WorkflowRunInfoLevelError,
WorkflowRunJobID: runJob.ID,
Message: fmt.Sprintf("worker %s don't respond anymore.", runJob.WorkerName),
Message: fmt.Sprintf("worker %q doesn't respond anymore.", runJob.WorkerName),
IssuedAt: time.Now(),
WorkflowRunID: runJob.WorkflowRunID,
}
Expand Down
9 changes: 5 additions & 4 deletions engine/api/v2_workflow_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"net/http/httptest"
"strconv"
"testing"
"time"

"github.com/ovh/cds/engine/api/authentication"
"github.com/ovh/cds/engine/api/entity"
"github.com/ovh/cds/engine/api/hatchery"
Expand All @@ -16,10 +21,6 @@ import (
"github.com/ovh/cds/sdk"
"github.com/rockbears/yaml"
"github.com/stretchr/testify/require"
"net/http/httptest"
"strconv"
"testing"
"time"
)

func TestGetWorkflowRunInfoV2Handler(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions engine/cdn/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *Service) itemAccessCheck(ctx context.Context, req *http.Request, item s
logRef, _ := item.GetCDNLogApiRef()
projectKey = logRef.ProjectKey
workflowID = logRef.WorkflowID
case sdk.CDNTypeItemJobStepLog:
case sdk.CDNTypeItemJobStepLog, sdk.CDNTypeItemServiceLogV2:
logRef, _ := item.GetCDNLogApiRefV2()
projectKey = logRef.ProjectKey
case sdk.CDNTypeItemRunResult:
Expand Down Expand Up @@ -145,7 +145,7 @@ func (s *Service) itemAccessCheck(ctx context.Context, req *http.Request, item s
if err := s.Client.ProjectAccess(ctx, projectKey, sessionID, item.Type); err != nil {
return sdk.NewErrorWithStack(err, sdk.ErrNotFound)
}
case sdk.CDNTypeItemJobStepLog:
case sdk.CDNTypeItemJobStepLog, sdk.CDNTypeItemServiceLogV2:
if err := s.Client.HasProjectRole(ctx, projectKey, sessionID, sdk.ProjectRoleRead); err != nil {
return sdk.NewErrorWithStack(err, sdk.ErrNotFound)
}
Expand Down
3 changes: 1 addition & 2 deletions engine/cdn/cdn_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *Service) downloadItem(ctx context.Context, t sdk.CDNItemType, apiRefHas
ctx = context.WithValue(ctx, storage.FieldAPIRef, apiRefHash)

switch t {
case sdk.CDNTypeItemServiceLog, sdk.CDNTypeItemStepLog, sdk.CDNTypeItemJobStepLog:
case sdk.CDNTypeItemServiceLog, sdk.CDNTypeItemStepLog, sdk.CDNTypeItemJobStepLog, sdk.CDNTypeItemServiceLogV2:
if err := s.downloadLog(ctx, t, apiRefHash, w, opts); err != nil {
return err
}
Expand Down Expand Up @@ -156,7 +156,6 @@ type getItemLogOptions struct {
}

type getItemFileOptions struct {
cacheClean bool
cacheSource string
}

Expand Down
5 changes: 2 additions & 3 deletions engine/cdn/cdn_log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ func (s *Service) sendToBufferWithRetry(ctx context.Context, hms []handledMessag
itemType = sdk.CDNTypeItemStepLog
}
} else {
// FIXME manage new service log
if hm.Signature.Service != nil {
itemType = sdk.CDNTypeItemServiceLog
if hm.Signature.HatcheryService != nil {
itemType = sdk.CDNTypeItemServiceLogV2
} else {
itemType = sdk.CDNTypeItemJobStepLog
}
Expand Down
82 changes: 53 additions & 29 deletions engine/cdn/cdn_log_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ func (s *Service) handleConnection(ctx context.Context, conn net.Conn) {

// Handle Message: Worker/Hatchery
func (s *Service) handleLogMessage(ctx context.Context, messageReceived []byte) error {
m := hook.Message{}
if err := m.UnmarshalJSON(messageReceived); err != nil {
msg := hook.Message{}
if err := msg.UnmarshalJSON(messageReceived); err != nil {
return sdk.WrapError(err, "unable to unmarshall gelf message: %s", string(messageReceived))
}

// Extract Signature
sig, ok := m.Extra["_"+cdslog.ExtraFieldSignature]
sig, ok := msg.Extra["_"+cdslog.ExtraFieldSignature]
if !ok || sig == "" {
return sdk.WithStack(fmt.Errorf("signature not found on log message: %+v", m))
return sdk.WithStack(fmt.Errorf("signature not found on log message: %+v", msg))
}

// Unsafe parse of signature to get datas
Expand All @@ -141,17 +141,20 @@ func (s *Service) handleLogMessage(ctx context.Context, messageReceived []byte)
switch {
case signature.Worker != nil:
telemetry.Record(ctx, s.Metrics.tcpServerStepLogCount, 1)
return s.handleWorkerLog(ctx, signature, sig, m)
return s.handleWorkerLog(ctx, signature, sig, msg)
case signature.Service != nil:
telemetry.Record(ctx, s.Metrics.tcpServerServiceLogCount, 1)
return s.handleServiceLog(ctx, signature.Service.HatcheryID, signature.Service.HatcheryName, signature.Service.WorkerName, sig, m)
return s.handleServiceLog(ctx, signature, sig, msg)
case signature.HatcheryService != nil:
telemetry.Record(ctx, s.Metrics.tcpServerServiceLogCount, 1)
return s.handleServiceLog(ctx, signature, sig, msg)
default:
return sdk.WithStack(sdk.ErrWrongRequest)
}
}

// Handle Message from worker (job logs). Enqueue in Redis
func (s *Service) handleWorkerLog(ctx context.Context, unsafeSign cdn.Signature, sig interface{}, m hook.Message) error {
func (s *Service) handleWorkerLog(ctx context.Context, unsafeSign cdn.Signature, sig interface{}, msg hook.Message) error {
var signature cdn.Signature

switch {
Expand Down Expand Up @@ -183,12 +186,12 @@ func (s *Service) handleWorkerLog(ctx context.Context, unsafeSign cdn.Signature,
}
}

terminatedI := m.Extra["_"+cdslog.ExtraFieldTerminated]
terminatedI := msg.Extra["_"+cdslog.ExtraFieldTerminated]
terminated := cast.ToBool(terminatedI)

hm := handledMessage{
Signature: signature,
Msg: m,
Msg: msg,
IsTerminated: terminated,
}

Expand Down Expand Up @@ -235,23 +238,36 @@ func buildMessage(hm handledMessage) string {
if !strings.HasSuffix(val, "\n") {
val += "\n"
}
return fmt.Sprintf("%s", val)
return val
}

func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatcheryName string, workerName string, sig interface{}, m hook.Message) error {
func (s *Service) handleServiceLog(ctx context.Context, unsafeSign cdn.Signature, sig interface{}, msg hook.Message) error {
var signature cdn.Signature
var pk *rsa.PublicKey

var hatcheryID, hatcheryName, serviceName string
if unsafeSign.JobID != 0 {
hatcheryID = strconv.FormatInt(unsafeSign.Service.HatcheryID, 10)
hatcheryName = unsafeSign.Service.HatcheryName
serviceName = unsafeSign.Service.RequirementName
} else if unsafeSign.RunJobID != "" {
hatcheryID = unsafeSign.HatcheryService.HatcheryID
hatcheryName = unsafeSign.HatcheryService.HatcheryName
serviceName = unsafeSign.HatcheryService.ServiceName
} else {
return sdk.WrapError(sdk.ErrForbidden, "invalid signature %v", unsafeSign)
}

// Get hatchery public key from cache
cacheData, ok := runCache.Get(fmt.Sprintf("hatchery-key-%d", hatcheryID))
cacheData, ok := runCache.Get(fmt.Sprintf("hatchery-key-%s", hatcheryID))
if !ok {
// Refresh hatcheries cache
if err := s.refreshHatcheriesPK(ctx); err != nil {
return err
}
cacheData, ok = runCache.Get(fmt.Sprintf("hatchery-key-%d", hatcheryID))
cacheData, ok = runCache.Get(fmt.Sprintf("hatchery-key-%s", hatcheryID))
if !ok {
return sdk.WrapError(sdk.ErrForbidden, "unable to find hatchery %d/%s", hatcheryID, hatcheryName)
return sdk.WrapError(sdk.ErrForbidden, "unable to find hatchery %s/%s", hatcheryID, hatcheryName)
}
}
pk = cacheData.(*rsa.PublicKey)
Expand All @@ -261,30 +277,38 @@ func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatche
return err
}

// Get worker + check hatchery ID
w, err := s.getWorker(ctx, workerName, GetWorkerOptions{NeedPrivateKey: false})
if err != nil {
return err
}
if w.HatcheryID == nil {
return sdk.WrapError(sdk.ErrWrongRequest, "hatchery %d cannot send service log for worker %s started by %s that is no more linked to an hatchery", signature.Service.HatcheryID, w.ID, w.HatcheryName)
}
if *w.HatcheryID != signature.Service.HatcheryID {
return sdk.WrapError(sdk.ErrWrongRequest, "cannot send service log for worker %s from hatchery (expected: %d/actual: %d)", w.ID, *w.HatcheryID, signature.Service.HatcheryID)
var key string
switch {
case signature.JobID != 0:
// Get worker + check hatchery ID
w, err := s.getWorker(ctx, signature.Service.WorkerName, GetWorkerOptions{NeedPrivateKey: false})
if err != nil {
return err
}
if w.HatcheryID == nil {
return sdk.WrapError(sdk.ErrWrongRequest, "hatchery %d cannot send service log for worker %s started by %s that is no more linked to an hatchery", signature.Service.HatcheryID, w.ID, w.HatcheryName)
}
if *w.HatcheryID != signature.Service.HatcheryID {
return sdk.WrapError(sdk.ErrWrongRequest, "cannot send service log (%s) for worker %s from hatchery (expected: %d / actual: %d)", serviceName, w.ID, *w.HatcheryID, signature.Service.HatcheryID)
}

key = fmt.Sprintf("%d-%d", signature.JobID, signature.Service.RequirementID)

case signature.RunJobID != "":
key = fmt.Sprintf("%s-%s", signature.RunJobID, signature.HatcheryService.ServiceName)
}

terminatedI := m.Extra["_"+cdslog.ExtraFieldTerminated]
terminatedI := msg.Extra["_"+cdslog.ExtraFieldTerminated]
terminated := cast.ToBool(terminatedI)

hm := handledMessage{
Signature: signature,
Msg: m,
Msg: msg,
IsTerminated: terminated,
}

reqKey := fmt.Sprintf("%d-%d", signature.JobID, signature.Service.RequirementID)
sizeQueueKey := cache.Key(keyJobLogSize, reqKey)
jobQueue := cache.Key(keyJobLogQueue, reqKey)
sizeQueueKey := cache.Key(keyJobLogSize, key)
jobQueue := cache.Key(keyJobLogQueue, key)

if err := s.sendIntoIncomingQueue(hm, jobQueue, sizeQueueKey); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/cdn_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *Service) ComputeMetrics(ctx context.Context) {
}

func (s *Service) countItemsForUnit(ctx context.Context, storageUnit storage.Interface) []storage.Stat {
types := []sdk.CDNItemType{sdk.CDNTypeItemStepLog, sdk.CDNTypeItemServiceLog, sdk.CDNTypeItemRunResult, sdk.CDNTypeItemJobStepLog}
types := []sdk.CDNItemType{sdk.CDNTypeItemStepLog, sdk.CDNTypeItemServiceLog, sdk.CDNTypeItemRunResult, sdk.CDNTypeItemJobStepLog, sdk.CDNTypeItemServiceLogV2}
var storageStats []storage.Stat
for _, typ := range types {
suStats, err := storage.CountItemsForUnitByType(s.mustDBWithCtx(ctx), storageUnit.ID(), string(typ))
Expand Down
1 change: 0 additions & 1 deletion engine/cdn/cdn_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ func (s *Service) initRouter(ctx context.Context) {
r.SetHeaderFunc = service.DefaultHeaders
r.Middlewares = append(r.Middlewares, service.TracingMiddlewareFunc(s), s.jwtMiddleware)
r.DefaultAuthMiddleware = service.CheckRequestSignatureMiddleware(s.ParsedAPIPublicKey)
r.PostAuthMiddlewares = append(r.PostAuthMiddlewares)
r.PostMiddlewares = append(r.PostMiddlewares, service.TracingPostMiddleware)

r.Handle("/mon/version", nil, r.GET(service.VersionHandler, service.OverrideAuth(service.NoAuthMiddleware)))
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/item/gorp_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (c cdnItemDB) ToCDSItem() (sdk.CDNItem, error) {
return item, sdk.WithStack(err)
}
item.APIRef = &apiRef
case sdk.CDNTypeItemJobStepLog:
case sdk.CDNTypeItemJobStepLog, sdk.CDNTypeItemServiceLogV2:
var apiRef sdk.CDNLogAPIRefV2
if err := sdk.JSONUnmarshal(c.APIRefDB, &apiRef); err != nil {
return item, sdk.WithStack(err)
Expand Down
Loading