Skip to content

Commit

Permalink
feat: service logs workflow v2 (#6684)
Browse files Browse the repository at this point in the history
* feat(hatchery): services on job v2

Signed-off-by: Yvonnick Esnault <yvonnick.esnault@corp.ovh.com>
  • Loading branch information
yesnault authored Nov 29, 2023
1 parent f800cd5 commit 4496b6a
Show file tree
Hide file tree
Showing 64 changed files with 1,618 additions and 831 deletions.
4 changes: 2 additions & 2 deletions cli/cdsctl/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,10 @@ func loadConfig(cmd *cobra.Command) (string, *cdsclient.Config, error) {

if contextName != "" {
if cdsctx, err = internal.GetContext(f, contextName); err != nil {
return "", nil, cli.NewError("unable to load the current context from %s", contextName)
return "", nil, cli.NewError("unable to load the current context name %s", contextName)
}
} else if cdsctx, err = internal.GetCurrentContext(f); err != nil {
return "", nil, cli.NewError("unable to load the current context from %s", configFile)
return "", nil, cli.NewError("unable to load the current context file %s err:%v", configFile, err)
}

if verbose {
Expand Down
2 changes: 1 addition & 1 deletion cli/cdsctl/experimental_workflow_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func workflowRunJobLogsDownloadFunc(v cli.Values) error {

for _, link := range links.Data {
fileName := getFileName(rj, rj.Job.Steps[link.StepOrder].ID, link.StepOrder)
data, err := client.WorkflowLogDownload(context.Background(), sdk.CDNLogLink{APIRef: link.APIRef, ItemType: links.ItemType})
data, err := client.WorkflowLogDownload(context.Background(), sdk.CDNLogLink{APIRef: link.APIRef, ItemType: link.ItemType})
if err != nil {
if strings.Contains(err.Error(), "resource not found") {
continue
Expand Down
5 changes: 4 additions & 1 deletion engine/api/auth_hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net/http"
"time"

"github.com/pkg/errors"
"github.com/rockbears/log"

"github.com/ovh/cds/engine/api/authentication"
Expand Down Expand Up @@ -144,7 +145,9 @@ func (api *API) postAuthHatcherySigninHandler() ([]service.RbacChecker, service.

// This has to be called by the signin handler
func (api *API) hatcheryRegister(ctx context.Context, tx gorpmapper.SqlExecutorWithTx, consumer sdk.AuthHatcheryConsumer, sessionID string, h *sdk.Hatchery, signInRequest sdk.AuthConsumerHatcherySigninRequest) error {
h.Name = signInRequest.Name
if signInRequest.Name != h.Name {
return sdk.NewError(sdk.ErrForbidden, errors.Errorf("wrong token. name (from hatchery configuration):%s != name (from token):%s", h.Name, signInRequest.Name))
}
h.HTTPURL = signInRequest.HTTPURL
h.Config = signInRequest.Config
h.PublicKey = signInRequest.PublicKey
Expand Down
17 changes: 12 additions & 5 deletions engine/api/v2_hatchery.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,17 +156,24 @@ func (api *API) deleteHatcheryHandler() ([]service.RbacChecker, service.Handler)
return err
}

hatcheryPermission, err := rbac.LoadRBACByHatcheryID(ctx, api.mustDB(), hatch.ID)
if err != nil {
return err
}

tx, err := api.mustDB().Begin()
if err != nil {
return sdk.WithStack(err)
}
defer tx.Rollback() // nolint

hatcheryPermission, err := rbac.LoadRBACByHatcheryID(ctx, tx, hatch.ID)
if err != nil {
if !sdk.ErrorIs(err, sdk.ErrNotFound) {
return err
}
// here, no rbac found, just delete the hatchery
if err := hatchery.Delete(tx, hatch.ID); err != nil {
return err
}
return sdk.WithStack(tx.Commit())
}

// Remove all permissions on this hatchery
rbacHatcheries := make([]sdk.RBACHatchery, 0)
for _, h := range hatcheryPermission.Hatcheries {
Expand Down
23 changes: 16 additions & 7 deletions engine/api/v2_workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (api *API) getWorkflowRunJobLogsLinksV2Handler() ([]service.RbacChecker, se
}
repositoryIdentifier, err := url.PathUnescape(vars["repositoryIdentifier"])
if err != nil {
return sdk.WithStack(err)
return sdk.NewError(sdk.ErrWrongRequest, err)
}
workflowName := vars["workflow"]
runNumberS := vars["runNumber"]
Expand Down Expand Up @@ -311,6 +311,13 @@ func (api *API) getWorkflowRunJobLogsLinksV2Handler() ([]service.RbacChecker, se
RunAttempt: runJob.RunAttempt,
}

for serviceName := range runJob.Job.Services {
ref := apiRef
ref.ServiceName = serviceName
ref.ItemType = sdk.CDNTypeItemServiceLogV2
refs = append(refs, ref)
}

for k := range runJob.StepsStatus {
stepOrder := -1
for i := range runJob.Job.Steps {
Expand All @@ -327,6 +334,7 @@ func (api *API) getWorkflowRunJobLogsLinksV2Handler() ([]service.RbacChecker, se
ref := apiRef
ref.StepName = sdk.GetJobStepName(k, stepOrder)
ref.StepOrder = int64(stepOrder)
ref.ItemType = sdk.CDNTypeItemJobStepLog
refs = append(refs, ref)
}
datas := make([]sdk.CDNLogLinkData, 0, len(refs))
Expand All @@ -337,9 +345,11 @@ func (api *API) getWorkflowRunJobLogsLinksV2Handler() ([]service.RbacChecker, se
}
apiRefHash := strconv.FormatUint(apiRefHashU, 10)
datas = append(datas, sdk.CDNLogLinkData{
APIRef: apiRefHash,
StepOrder: r.StepOrder,
StepName: r.StepName,
APIRef: apiRefHash,
StepOrder: r.StepOrder,
StepName: r.StepName,
ServiceName: r.ServiceName,
ItemType: r.ItemType,
})
}

Expand All @@ -349,9 +359,8 @@ func (api *API) getWorkflowRunJobLogsLinksV2Handler() ([]service.RbacChecker, se
}

return service.WriteJSON(w, sdk.CDNLogLinks{
CDNURL: httpURL,
ItemType: sdk.CDNTypeItemJobStepLog,
Data: datas,
CDNURL: httpURL,
Data: datas,
}, http.StatusOK)
}
}
Expand Down
2 changes: 1 addition & 1 deletion engine/api/v2_workflow_run_job_routines.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (api *API) stopDeadJob(ctx context.Context, store cache.Store, db *gorp.DbM
info := sdk.V2WorkflowRunJobInfo{
Level: sdk.WorkflowRunInfoLevelError,
WorkflowRunJobID: runJob.ID,
Message: fmt.Sprintf("worker %s don't respond anymore.", runJob.WorkerName),
Message: fmt.Sprintf("worker %q doesn't respond anymore.", runJob.WorkerName),
IssuedAt: time.Now(),
WorkflowRunID: runJob.WorkflowRunID,
}
Expand Down
9 changes: 5 additions & 4 deletions engine/api/v2_workflow_run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"net/http/httptest"
"strconv"
"testing"
"time"

"github.com/ovh/cds/engine/api/authentication"
"github.com/ovh/cds/engine/api/entity"
"github.com/ovh/cds/engine/api/hatchery"
Expand All @@ -16,10 +21,6 @@ import (
"github.com/ovh/cds/sdk"
"github.com/rockbears/yaml"
"github.com/stretchr/testify/require"
"net/http/httptest"
"strconv"
"testing"
"time"
)

func TestGetWorkflowRunInfoV2Handler(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions engine/cdn/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func (s *Service) itemAccessCheck(ctx context.Context, req *http.Request, item s
logRef, _ := item.GetCDNLogApiRef()
projectKey = logRef.ProjectKey
workflowID = logRef.WorkflowID
case sdk.CDNTypeItemJobStepLog:
case sdk.CDNTypeItemJobStepLog, sdk.CDNTypeItemServiceLogV2:
logRef, _ := item.GetCDNLogApiRefV2()
projectKey = logRef.ProjectKey
case sdk.CDNTypeItemRunResult:
Expand Down Expand Up @@ -145,7 +145,7 @@ func (s *Service) itemAccessCheck(ctx context.Context, req *http.Request, item s
if err := s.Client.ProjectAccess(ctx, projectKey, sessionID, item.Type); err != nil {
return sdk.NewErrorWithStack(err, sdk.ErrNotFound)
}
case sdk.CDNTypeItemJobStepLog:
case sdk.CDNTypeItemJobStepLog, sdk.CDNTypeItemServiceLogV2:
if err := s.Client.HasProjectRole(ctx, projectKey, sessionID, sdk.ProjectRoleRead); err != nil {
return sdk.NewErrorWithStack(err, sdk.ErrNotFound)
}
Expand Down
3 changes: 1 addition & 2 deletions engine/cdn/cdn_item.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func (s *Service) downloadItem(ctx context.Context, t sdk.CDNItemType, apiRefHas
ctx = context.WithValue(ctx, storage.FieldAPIRef, apiRefHash)

switch t {
case sdk.CDNTypeItemServiceLog, sdk.CDNTypeItemStepLog, sdk.CDNTypeItemJobStepLog:
case sdk.CDNTypeItemServiceLog, sdk.CDNTypeItemStepLog, sdk.CDNTypeItemJobStepLog, sdk.CDNTypeItemServiceLogV2:
if err := s.downloadLog(ctx, t, apiRefHash, w, opts); err != nil {
return err
}
Expand Down Expand Up @@ -156,7 +156,6 @@ type getItemLogOptions struct {
}

type getItemFileOptions struct {
cacheClean bool
cacheSource string
}

Expand Down
5 changes: 2 additions & 3 deletions engine/cdn/cdn_log_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@ func (s *Service) sendToBufferWithRetry(ctx context.Context, hms []handledMessag
itemType = sdk.CDNTypeItemStepLog
}
} else {
// FIXME manage new service log
if hm.Signature.Service != nil {
itemType = sdk.CDNTypeItemServiceLog
if hm.Signature.HatcheryService != nil {
itemType = sdk.CDNTypeItemServiceLogV2
} else {
itemType = sdk.CDNTypeItemJobStepLog
}
Expand Down
82 changes: 53 additions & 29 deletions engine/cdn/cdn_log_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,15 @@ func (s *Service) handleConnection(ctx context.Context, conn net.Conn) {

// Handle Message: Worker/Hatchery
func (s *Service) handleLogMessage(ctx context.Context, messageReceived []byte) error {
m := hook.Message{}
if err := m.UnmarshalJSON(messageReceived); err != nil {
msg := hook.Message{}
if err := msg.UnmarshalJSON(messageReceived); err != nil {
return sdk.WrapError(err, "unable to unmarshall gelf message: %s", string(messageReceived))
}

// Extract Signature
sig, ok := m.Extra["_"+cdslog.ExtraFieldSignature]
sig, ok := msg.Extra["_"+cdslog.ExtraFieldSignature]
if !ok || sig == "" {
return sdk.WithStack(fmt.Errorf("signature not found on log message: %+v", m))
return sdk.WithStack(fmt.Errorf("signature not found on log message: %+v", msg))
}

// Unsafe parse of signature to get datas
Expand All @@ -141,17 +141,20 @@ func (s *Service) handleLogMessage(ctx context.Context, messageReceived []byte)
switch {
case signature.Worker != nil:
telemetry.Record(ctx, s.Metrics.tcpServerStepLogCount, 1)
return s.handleWorkerLog(ctx, signature, sig, m)
return s.handleWorkerLog(ctx, signature, sig, msg)
case signature.Service != nil:
telemetry.Record(ctx, s.Metrics.tcpServerServiceLogCount, 1)
return s.handleServiceLog(ctx, signature.Service.HatcheryID, signature.Service.HatcheryName, signature.Service.WorkerName, sig, m)
return s.handleServiceLog(ctx, signature, sig, msg)
case signature.HatcheryService != nil:
telemetry.Record(ctx, s.Metrics.tcpServerServiceLogCount, 1)
return s.handleServiceLog(ctx, signature, sig, msg)
default:
return sdk.WithStack(sdk.ErrWrongRequest)
}
}

// Handle Message from worker (job logs). Enqueue in Redis
func (s *Service) handleWorkerLog(ctx context.Context, unsafeSign cdn.Signature, sig interface{}, m hook.Message) error {
func (s *Service) handleWorkerLog(ctx context.Context, unsafeSign cdn.Signature, sig interface{}, msg hook.Message) error {
var signature cdn.Signature

switch {
Expand Down Expand Up @@ -183,12 +186,12 @@ func (s *Service) handleWorkerLog(ctx context.Context, unsafeSign cdn.Signature,
}
}

terminatedI := m.Extra["_"+cdslog.ExtraFieldTerminated]
terminatedI := msg.Extra["_"+cdslog.ExtraFieldTerminated]
terminated := cast.ToBool(terminatedI)

hm := handledMessage{
Signature: signature,
Msg: m,
Msg: msg,
IsTerminated: terminated,
}

Expand Down Expand Up @@ -235,23 +238,36 @@ func buildMessage(hm handledMessage) string {
if !strings.HasSuffix(val, "\n") {
val += "\n"
}
return fmt.Sprintf("%s", val)
return val
}

func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatcheryName string, workerName string, sig interface{}, m hook.Message) error {
func (s *Service) handleServiceLog(ctx context.Context, unsafeSign cdn.Signature, sig interface{}, msg hook.Message) error {
var signature cdn.Signature
var pk *rsa.PublicKey

var hatcheryID, hatcheryName, serviceName string
if unsafeSign.JobID != 0 {
hatcheryID = strconv.FormatInt(unsafeSign.Service.HatcheryID, 10)
hatcheryName = unsafeSign.Service.HatcheryName
serviceName = unsafeSign.Service.RequirementName
} else if unsafeSign.RunJobID != "" {
hatcheryID = unsafeSign.HatcheryService.HatcheryID
hatcheryName = unsafeSign.HatcheryService.HatcheryName
serviceName = unsafeSign.HatcheryService.ServiceName
} else {
return sdk.WrapError(sdk.ErrForbidden, "invalid signature %v", unsafeSign)
}

// Get hatchery public key from cache
cacheData, ok := runCache.Get(fmt.Sprintf("hatchery-key-%d", hatcheryID))
cacheData, ok := runCache.Get(fmt.Sprintf("hatchery-key-%s", hatcheryID))
if !ok {
// Refresh hatcheries cache
if err := s.refreshHatcheriesPK(ctx); err != nil {
return err
}
cacheData, ok = runCache.Get(fmt.Sprintf("hatchery-key-%d", hatcheryID))
cacheData, ok = runCache.Get(fmt.Sprintf("hatchery-key-%s", hatcheryID))
if !ok {
return sdk.WrapError(sdk.ErrForbidden, "unable to find hatchery %d/%s", hatcheryID, hatcheryName)
return sdk.WrapError(sdk.ErrForbidden, "unable to find hatchery %s/%s", hatcheryID, hatcheryName)
}
}
pk = cacheData.(*rsa.PublicKey)
Expand All @@ -261,30 +277,38 @@ func (s *Service) handleServiceLog(ctx context.Context, hatcheryID int64, hatche
return err
}

// Get worker + check hatchery ID
w, err := s.getWorker(ctx, workerName, GetWorkerOptions{NeedPrivateKey: false})
if err != nil {
return err
}
if w.HatcheryID == nil {
return sdk.WrapError(sdk.ErrWrongRequest, "hatchery %d cannot send service log for worker %s started by %s that is no more linked to an hatchery", signature.Service.HatcheryID, w.ID, w.HatcheryName)
}
if *w.HatcheryID != signature.Service.HatcheryID {
return sdk.WrapError(sdk.ErrWrongRequest, "cannot send service log for worker %s from hatchery (expected: %d/actual: %d)", w.ID, *w.HatcheryID, signature.Service.HatcheryID)
var key string
switch {
case signature.JobID != 0:
// Get worker + check hatchery ID
w, err := s.getWorker(ctx, signature.Service.WorkerName, GetWorkerOptions{NeedPrivateKey: false})
if err != nil {
return err
}
if w.HatcheryID == nil {
return sdk.WrapError(sdk.ErrWrongRequest, "hatchery %d cannot send service log for worker %s started by %s that is no more linked to an hatchery", signature.Service.HatcheryID, w.ID, w.HatcheryName)
}
if *w.HatcheryID != signature.Service.HatcheryID {
return sdk.WrapError(sdk.ErrWrongRequest, "cannot send service log (%s) for worker %s from hatchery (expected: %d / actual: %d)", serviceName, w.ID, *w.HatcheryID, signature.Service.HatcheryID)
}

key = fmt.Sprintf("%d-%d", signature.JobID, signature.Service.RequirementID)

case signature.RunJobID != "":
key = fmt.Sprintf("%s-%s", signature.RunJobID, signature.HatcheryService.ServiceName)
}

terminatedI := m.Extra["_"+cdslog.ExtraFieldTerminated]
terminatedI := msg.Extra["_"+cdslog.ExtraFieldTerminated]
terminated := cast.ToBool(terminatedI)

hm := handledMessage{
Signature: signature,
Msg: m,
Msg: msg,
IsTerminated: terminated,
}

reqKey := fmt.Sprintf("%d-%d", signature.JobID, signature.Service.RequirementID)
sizeQueueKey := cache.Key(keyJobLogSize, reqKey)
jobQueue := cache.Key(keyJobLogQueue, reqKey)
sizeQueueKey := cache.Key(keyJobLogSize, key)
jobQueue := cache.Key(keyJobLogQueue, key)

if err := s.sendIntoIncomingQueue(hm, jobQueue, sizeQueueKey); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/cdn_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func (s *Service) ComputeMetrics(ctx context.Context) {
}

func (s *Service) countItemsForUnit(ctx context.Context, storageUnit storage.Interface) []storage.Stat {
types := []sdk.CDNItemType{sdk.CDNTypeItemStepLog, sdk.CDNTypeItemServiceLog, sdk.CDNTypeItemRunResult, sdk.CDNTypeItemJobStepLog}
types := []sdk.CDNItemType{sdk.CDNTypeItemStepLog, sdk.CDNTypeItemServiceLog, sdk.CDNTypeItemRunResult, sdk.CDNTypeItemJobStepLog, sdk.CDNTypeItemServiceLogV2}
var storageStats []storage.Stat
for _, typ := range types {
suStats, err := storage.CountItemsForUnitByType(s.mustDBWithCtx(ctx), storageUnit.ID(), string(typ))
Expand Down
1 change: 0 additions & 1 deletion engine/cdn/cdn_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ func (s *Service) initRouter(ctx context.Context) {
r.SetHeaderFunc = service.DefaultHeaders
r.Middlewares = append(r.Middlewares, service.TracingMiddlewareFunc(s), s.jwtMiddleware)
r.DefaultAuthMiddleware = service.CheckRequestSignatureMiddleware(s.ParsedAPIPublicKey)
r.PostAuthMiddlewares = append(r.PostAuthMiddlewares)
r.PostMiddlewares = append(r.PostMiddlewares, service.TracingPostMiddleware)

r.Handle("/mon/version", nil, r.GET(service.VersionHandler, service.OverrideAuth(service.NoAuthMiddleware)))
Expand Down
2 changes: 1 addition & 1 deletion engine/cdn/item/gorp_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (c cdnItemDB) ToCDSItem() (sdk.CDNItem, error) {
return item, sdk.WithStack(err)
}
item.APIRef = &apiRef
case sdk.CDNTypeItemJobStepLog:
case sdk.CDNTypeItemJobStepLog, sdk.CDNTypeItemServiceLogV2:
var apiRef sdk.CDNLogAPIRefV2
if err := sdk.JSONUnmarshal(c.APIRefDB, &apiRef); err != nil {
return item, sdk.WithStack(err)
Expand Down
Loading

0 comments on commit 4496b6a

Please sign in to comment.