Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: display download link for run results #7185

Merged
merged 2 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions contrib/grpcplugins/action/addRunResult/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func (p *addRunResultPlugin) Stream(q *actionplugin.ActionQuery, stream actionpl
Status: sdk.StatusSuccess,
}

resultType := q.GetOptions()["type"]
resultType := sdk.V2WorkflowRunResultType(q.GetOptions()["type"])
path := q.GetOptions()["path"]
payload := q.GetOptions()["payload"]

Expand Down Expand Up @@ -71,7 +71,7 @@ func (p *addRunResultPlugin) Stream(q *actionplugin.ActionQuery, stream actionpl

}

func (p *addRunResultPlugin) perform(ctx context.Context, resultType, artifactPath string, detail sdk.V2WorkflowRunResultDetail) (bool, error) {
func (p *addRunResultPlugin) perform(ctx context.Context, resultType sdk.V2WorkflowRunResultType, artifactPath string, detail sdk.V2WorkflowRunResultDetail) (bool, error) {
jobCtx, err := grpcplugins.GetJobContext(ctx, &p.Common)
if err != nil {
return true, err
Expand Down
14 changes: 7 additions & 7 deletions contrib/grpcplugins/action/downloadArtifact/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ func (actPlugin *runActionDownloadArtifactlugin) perform(ctx context.Context, na
return errors.New("unable to retrieve job context")
}

workerConfig, err := grpcplugins.GetWorkerConfig(ctx, &actPlugin.Common)
if err != nil {
grpcplugins.Errorf(&actPlugin.Common, err.Error())
return errors.New("unable to retrieve worker config")
}

grpcplugins.Logf(&actPlugin.Common, "Total number of files that will be downloaded: %d", len(response.RunResults))

for _, r := range filteredRunResults {
Expand All @@ -113,20 +119,14 @@ func (actPlugin *runActionDownloadArtifactlugin) perform(ctx context.Context, na
return sdk.Errorf("unable to download artifact %q (caused by: missing cdn_type property", r.Name())
}

cdnAddr, has := (*r.ArtifactManagerMetadata)["cdn_http_url"]
if !has {
return sdk.Errorf("unable to download artifact %q (caused by: missing cdn_http_url property", r.Name())
}

destinationFile, n, err := grpcplugins.DownloadFromCDN(ctx, &actPlugin.Common, response.CDNSignature, *workDirs, cdnApirefhash, cdnType, cdnAddr, path, x.Name, x.Mode)
destinationFile, n, err := grpcplugins.DownloadFromCDN(ctx, &actPlugin.Common, response.CDNSignature, *workDirs, cdnApirefhash, cdnType, workerConfig.CDNEndpoint, path, x.Name, x.Mode)
if err != nil {
grpcplugins.Errorf(&actPlugin.Common, err.Error())
hasError = true
continue
}
grpcplugins.Logf(&actPlugin.Common, "Artifact %q was downloaded to %s (%d bytes downloaded in %.3f seconds).", x.Name, destinationFile, n, time.Since(t0).Seconds())
case r.ArtifactManagerIntegrationName != nil: // download from artifactory

// Get integration from the local cache, or from the worker
if jobCtx.Integrations == nil || jobCtx.Integrations.ArtifactManager.Name == "" {
grpcplugins.Errorf(&actPlugin.Common, "unable to retrieve artifactory integration")
Expand Down
2 changes: 1 addition & 1 deletion contrib/grpcplugins/action/uploadArtifact/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (p *runActionUploadArtifactPlugin) Stream(q *actionplugin.ActionQuery, stre
ifNoFilesFound := q.GetOptions()["if-no-files-found"]

runResultType := sdk.V2WorkflowRunResultType(sdk.V2WorkflowRunResultTypeGeneric)
if q.GetOptions()["type"] == sdk.V2WorkflowRunResultTypeCoverage {
if sdk.V2WorkflowRunResultType(q.GetOptions()["type"]) == sdk.V2WorkflowRunResultTypeCoverage {
runResultType = sdk.V2WorkflowRunResultType(sdk.V2WorkflowRunResultTypeCoverage)
}

Expand Down
18 changes: 15 additions & 3 deletions contrib/grpcplugins/action/uploadArtifact/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ func Test_perform(t *testing.T) {
},
)

mockHTTPClient.EXPECT().Do(sdk.ReqMatcher{Method: "GET", URLPath: "/v2/workerConfig"}).DoAndReturn(
func(req *http.Request) (*http.Response, error) {
h := workerruntime.V2_contextHandler(context.TODO(), mockWorker)
rec := httptest.NewRecorder()
apiReq := http.Request{
Method: "GET",
URL: &url.URL{},
}
h(rec, &apiReq)
return rec.Result(), nil
},
)

mockHTTPClient.EXPECT().Do(reqMatcher{method: "POST", urlPath: "/v2/result"}).DoAndReturn(
func(req *http.Request) (*http.Response, error) {
var rrRequest workerruntime.V2RunResultRequest
Expand Down Expand Up @@ -112,8 +125,7 @@ func Test_perform(t *testing.T) {
log.Debug(ctx, "V2AddRunResult")
require.Equal(t, "main.go", req.RunResult.Detail.Data.(*sdk.V2WorkflowRunResultGenericDetail).Name)
return &workerruntime.V2AddResultResponse{
RunResult: req.RunResult,
CDNAddress: "cdn-address",
RunResult: req.RunResult,
}, nil
},
)
Expand Down Expand Up @@ -156,7 +168,7 @@ func TestUnMarshalRunResulRequest(t *testing.T) {
log.Factory = log.NewTestingWrapper(t)
log.UnregisterField(log.FieldCaller, log.FieldSourceFile, log.FieldSourceLine, log.FieldStackTrace)

btes := []byte(`{"RunResult":{"id":"","workflow_run_id":"","workflow_run_job_id":"","run_attempt":0,"issued_at":"2024-09-24T07:13:30.606970365Z","type":"generic","artifact_manager_integration_name":null,"artifact_manager_metadata":null,"detail":{"data":{"name":"main.go","size":3868,"mode":420,"md5":"dcdcb00178f065cd3d728091578b92db","sha1":"cf2c4760aae1fc78f5fc16f301389e3966c85403","sha256":"79d1855a45e7dd816b46364fd2da931af33ebcca756d5c145db89af80d47121b"},"type":"V2WorkflowRunResultGenericDetail"},"sync":null,"status":"PENDING"},"CDNItemLink":{"cdn_http_url":"","item":{"id":"","created":"0001-01-01T00:00:00Z","last_modified":"0001-01-01T00:00:00Z","hash":"","api_ref":null,"api_ref_hash":"","status":"","type":"","size":0,"md5":"","to_delete":false}}}`)
btes := []byte(`{"RunResult":{"id":"","workflow_run_id":"","workflow_run_job_id":"","run_attempt":0,"issued_at":"2024-09-24T07:13:30.606970365Z","type":"generic","artifact_manager_integration_name":null,"artifact_manager_metadata":null,"detail":{"data":{"name":"main.go","size":3868,"mode":420,"md5":"dcdcb00178f065cd3d728091578b92db","sha1":"cf2c4760aae1fc78f5fc16f301389e3966c85403","sha256":"79d1855a45e7dd816b46364fd2da931af33ebcca756d5c145db89af80d47121b"},"type":"V2WorkflowRunResultGenericDetail"},"sync":null,"status":"PENDING"},"CDNItemLink":{"item":{"id":"","created":"0001-01-01T00:00:00Z","last_modified":"0001-01-01T00:00:00Z","hash":"","api_ref":null,"api_ref_hash":"","status":"","type":"","size":0,"md5":"","to_delete":false}}}`)
var rrRequest workerruntime.V2RunResultRequest
require.NoError(t, sdk.JSONUnmarshal(btes, &rrRequest))

Expand Down
52 changes: 39 additions & 13 deletions contrib/grpcplugins/grpcplugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,30 @@ func GetJobContext(ctx context.Context, c *actionplugin.Common) (*sdk.WorkflowRu
return &context, nil
}

func GetWorkerConfig(ctx context.Context, c *actionplugin.Common) (*workerruntime.V2WorkerConfig, error) {
r, err := c.NewRequest(ctx, "GET", "/v2/workerConfig", nil)
if err != nil {
return nil, sdk.WrapError(err, "unable to prepare request")
}

resp, err := c.DoRequest(r)
if err != nil {
return nil, sdk.WrapError(err, "unable to get job context")
}
btes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, sdk.WrapError(err, "unable to read response")
}

defer resp.Body.Close()

var config workerruntime.V2WorkerConfig
if err := sdk.JSONUnmarshal(btes, &config); err != nil {
return nil, sdk.WrapError(err, "unable to read response")
}
return &config, nil
}

type ArtifactoryConfig struct {
URL string
Token string
Expand Down Expand Up @@ -643,12 +667,12 @@ func GetArtifactoryRunResults(ctx context.Context, c *actionplugin.Common, patte
}, nil
}

func ExtractFileInfoIntoRunResult(runResult *sdk.V2WorkflowRunResult, fi ArtifactoryFileInfo, name, resultType, localRepository, repository, maturity string) {
func ExtractFileInfoIntoRunResult(runResult *sdk.V2WorkflowRunResult, fi ArtifactoryFileInfo, name string, resultType sdk.V2WorkflowRunResultType, localRepository, repository, maturity string) {
runResult.ArtifactManagerMetadata = &sdk.V2WorkflowRunResultArtifactManagerMetadata{}
runResult.ArtifactManagerMetadata.Set("repository", repository) // This is the virtual repository
runResult.ArtifactManagerMetadata.Set("maturity", maturity)
runResult.ArtifactManagerMetadata.Set("name", name)
runResult.ArtifactManagerMetadata.Set("type", resultType)
runResult.ArtifactManagerMetadata.Set("type", string(resultType))
runResult.ArtifactManagerMetadata.Set("path", fi.Path)
runResult.ArtifactManagerMetadata.Set("md5", fi.Checksums.Md5)
runResult.ArtifactManagerMetadata.Set("sha1", fi.Checksums.Sha1)
Expand All @@ -665,6 +689,12 @@ func ExtractFileInfoIntoRunResult(runResult *sdk.V2WorkflowRunResult, fi Artifac
}

func UploadRunResult(ctx context.Context, actplugin *actionplugin.Common, jobContext sdk.WorkflowRunJobsContext, runresultReq *workerruntime.V2RunResultRequest, fileName string, f fs.File, size int64, fileChecksum ChecksumResult) (*workerruntime.V2UpdateResultResponse, error) {
workerConfig, err := GetWorkerConfig(ctx, actplugin)
if err != nil {
Error(actplugin, err.Error())
return nil, err
}

response, err := CreateRunResult(ctx, actplugin, runresultReq)
if err != nil {
Error(actplugin, err.Error())
Expand All @@ -675,12 +705,12 @@ func UploadRunResult(ctx context.Context, actplugin *actionplugin.Common, jobCon
var d time.Duration
var runResultRequest workerruntime.V2RunResultRequest
switch {
case response.CDNAddress != "":
case response.CDNSignature != "":
reader, ok := f.(io.ReadSeeker)
var item *sdk.CDNItem
var err error
if ok {
item, d, err = CDNItemUpload(ctx, actplugin, response.CDNAddress, response.CDNSignature, reader)
item, d, err = CDNItemUpload(ctx, actplugin, workerConfig.CDNEndpoint, response.CDNSignature, reader)
if err != nil {
Error(actplugin, "An error occurred during file upload upload: "+err.Error())
return nil, err
Expand All @@ -692,14 +722,12 @@ func UploadRunResult(ctx context.Context, actplugin *actionplugin.Common, jobCon

// Update the run result status
runResultRequest = workerruntime.V2RunResultRequest{RunResult: response.RunResult}
i := sdk.CDNItemLink{CDNHttpURL: response.CDNAddress, Item: *item}
i := sdk.CDNItemLink{CDNHttpURL: workerConfig.CDNEndpoint, Item: *item}
runResultRequest.RunResult.ArtifactManagerMetadata = &sdk.V2WorkflowRunResultArtifactManagerMetadata{
"uri": i.CDNHttpURL,
"cdn_http_url": i.CDNHttpURL,
"cdn_id": i.Item.ID,
"cdn_type": string(i.Item.Type),
"cdn_api_ref_hash": i.Item.APIRefHash,
"downloadURI": fmt.Sprintf("%s/item/%s/%s/download", i.CDNHttpURL, string(i.Item.Type), i.Item.APIRefHash),
"cdn_id": i.Item.ID,
"cdn_type": string(i.Item.Type),
"cdn_api_ref_hash": i.Item.APIRefHash,
"cdn_download_path": fmt.Sprintf("/item/%s/%s/download", string(i.Item.Type), i.Item.APIRefHash),
}
Logf(actplugin, " CDN API Ref Hash: %s", i.Item.APIRefHash)
Logf(actplugin, " CDN HTTP URL: %s", i.CDNHttpURL)
Expand Down Expand Up @@ -1085,7 +1113,6 @@ func DownloadFromCDN(ctx context.Context, c *actionplugin.Common, CDNSignature s
}

func BodyToFile(resp *http.Response, workDirs sdk.WorkerDirectories, path string, name string, mode fs.FileMode) (string, int64, error) {

var destinationDir string
if path != "" && filepath.IsAbs(path) {
destinationDir = path
Expand All @@ -1111,7 +1138,6 @@ func BodyToFile(resp *http.Response, workDirs sdk.WorkerDirectories, path string
}
_ = resp.Body.Close()
return destinationFile, n, nil

}

func BuildCacheURL(integ sdk.JobIntegrationsContext, projKey string, cacheKey string) string {
Expand Down
1 change: 1 addition & 0 deletions engine/worker/internal/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (w *CurrentWorker) Serve(c context.Context) error {
r.HandleFunc("/run-result/add/static-file", LogMiddleware(addRunResultStaticFileHandler(c, w)))
r.HandleFunc("/version", LogMiddleware(setVersionHandler(c, w)))

r.HandleFunc("/v2/workerConfig", LogMiddleware(workerruntime.V2_workerConfig(c, w)))
r.HandleFunc("/v2/cache/signature/{cacheKey}", LogMiddleware(workerruntime.V2_cacheSignatureHandler(c, w)))
r.HandleFunc("/v2/cache/signature/{cacheKey}/link", LogMiddleware(workerruntime.V2_cacheLinkHandler(c, w)))
r.HandleFunc("/v2/output", LogMiddleware(workerruntime.V2_outputHandler(c, w)))
Expand Down
7 changes: 3 additions & 4 deletions engine/worker/internal/runtime_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ func (wk *CurrentWorker) V2AddRunResult(ctx context.Context, req workerruntime.V
}
// Returns the signature and CDN info
response.CDNSignature = signature
response.CDNAddress = wk.CDNHttpURL()
} else {
if response.RunResult.Type != sdk.V2WorkflowRunResultTypeArsenalDeployment &&
response.RunResult.Type != sdk.V2WorkflowRunResultTypeRelease && response.RunResult.Type != sdk.V2WorkflowRunResultTypeVariable {
Expand Down Expand Up @@ -244,9 +243,9 @@ func (wk *CurrentWorker) V2GetRunResult(ctx context.Context, filter workerruntim
case "V2WorkflowRunResultGenericDetail":
var res *glob.Result
if r.Type == sdk.V2WorkflowRunResultTypeCoverage || r.Type == sdk.V2WorkflowRunResultTypeGeneric { // If the filter is set to "V2WorkflowRunResultGenericDetail" we can directly check the artifact name. This is the usecase of plugin "downloadArtifact"
x, err := sdk.GetConcreteDetail[*sdk.V2WorkflowRunResultGenericDetail](&r)
if err != nil {
log.ErrorWithStackTrace(ctx, err)
x, errGetDetail := sdk.GetConcreteDetail[*sdk.V2WorkflowRunResultGenericDetail](&r)
if errGetDetail != nil {
log.ErrorWithStackTrace(ctx, errGetDetail)
}
res, err = pattern.MatchString(x.Name)
} else {
Expand Down
18 changes: 14 additions & 4 deletions engine/worker/pkg/workerruntime/handlers_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,20 @@ import (
"github.com/rockbears/log"
)

func V2_workerConfig(ctx context.Context, wk Runtime) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
writeJSON(w, V2WorkerConfig{
CDNEndpoint: wk.CDNHttpURL(),
}, http.StatusOK)
default:
writeError(w, r, sdk.ErrMethodNotAllowed)
return
}
}
}

func V2_cacheLinkHandler(ctx context.Context, wk Runtime) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
Expand All @@ -40,7 +54,6 @@ func V2_cacheLinkHandler(ctx context.Context, wk Runtime) http.HandlerFunc {
return
}
}

}

func V2_cacheSignatureHandler(ctx context.Context, wk Runtime) http.HandlerFunc {
Expand All @@ -64,12 +77,10 @@ func V2_cacheSignatureHandler(ctx context.Context, wk Runtime) http.HandlerFunc
return
}
}

}

func V2_outputHandler(ctx context.Context, wk Runtime) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {

btes, err := io.ReadAll(r.Body)
if err != nil {
writeError(w, r, sdk.NewError(sdk.ErrWrongRequest, err))
Expand Down Expand Up @@ -114,7 +125,6 @@ func V2_projectKeyHandler(ctx context.Context, wk Runtime) http.HandlerFunc {
}
writeJSON(w, k, http.StatusOK)
default:

return
}
}
Expand Down
5 changes: 4 additions & 1 deletion engine/worker/pkg/workerruntime/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type V2RunResultRequest struct {
type V2AddResultResponse struct {
RunResult *sdk.V2WorkflowRunResult
CDNSignature string
CDNAddress string
}

type V2GetResultResponse struct {
Expand All @@ -41,6 +40,10 @@ type V2FilterRunResult struct {
Type []sdk.V2WorkflowRunResultType
}

type V2WorkerConfig struct {
CDNEndpoint string `json:"cdn_endpoint"`
}

type WorkerConfig struct {
Name string `json:"name"`
Basedir string `json:"basedir"`
Expand Down
25 changes: 0 additions & 25 deletions sdk/v2_workflow_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,6 @@ type V2WorkflowRunResult struct {
Label string `json:"label,omitempty" db:"-"`
Identifier string `json:"identifier,omitempty" db:"-"`
Metadata map[string]V2WorkflowRunResultDetailMetadata `json:"metadata,omitempty" db:"-"`
URL string `json:"url,omitempty" db:"-"`
}

func (r *V2WorkflowRunResult) ComputedFields() {
Expand All @@ -572,7 +571,6 @@ func (r *V2WorkflowRunResult) ComputedFields() {
r.Identifier = r.Name()
r.Label = r.GetLabel()
r.Metadata = r.GetMetadata()
r.URL = r.GetURL()
}

func (r *V2WorkflowRunResult) GetLabel() string {
Expand All @@ -593,29 +591,6 @@ func (r *V2WorkflowRunResult) GetMetadata() map[string]V2WorkflowRunResultDetail
return detail.GetMetadata()
}

func (r *V2WorkflowRunResult) GetURL() string {
var u string
metadata := r.GetMetadata()
urlData, has := metadata["URL"]
if !has {
urlData, has = metadata["url"]
}
if has && urlData.Type == V2WorkflowRunResultDetailMetadataTypeURL {
u = urlData.Value
}
if u == "" && r.ArtifactManagerMetadata != nil {
u = r.ArtifactManagerMetadata.Get("downloadURI")
if u == "" {
u = r.ArtifactManagerMetadata.Get("uri")
}
if u == "" {
u = r.ArtifactManagerMetadata.Get("cdn_http_url")
}
}

return u
}

func (r *V2WorkflowRunResult) GetDetail() (V2WorkflowRunResultDetailInterface, error) {
if err := r.CastDetail(); err != nil {
return nil, err
Expand Down
Loading