Skip to content

Commit

Permalink
feat(server): server logs to be structured and add more log error
Browse files Browse the repository at this point in the history
Signed-off-by: Lam Vu <lamvh2812@gmail.com>
Signed-off-by: Lam Vu <lvu@axon.com>
  • Loading branch information
Lam Vu committed Oct 9, 2022
1 parent a8e37e9 commit defc663
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 16 deletions.
15 changes: 8 additions & 7 deletions server/apiserver/argoserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func init() {
var err error
MaxGRPCMessageSize, err = env.GetInt("GRPC_MESSAGE_SIZE", 100*1024*1024)
if err != nil {
log.Fatalf("GRPC_MESSAGE_SIZE environment variable must be set as an integer: %v", err)
log.WithError(err).Fatal("GRPC_MESSAGE_SIZE environment variable must be set as an integer")
}
}

Expand Down Expand Up @@ -212,7 +212,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
// like and the controller won't offload newly created workflows, but you can still read them
offloadRepo, err = sqldb.NewOffloadNodeStatusRepo(session, persistence.GetClusterName(), tableName)
if err != nil {
log.Fatal(err)
log.WithError(err).Fatal(err.Error())
}
// we always enable the archive for the Argo Server, as the Argo Server does not write records, so you can
// disable the archiving - and still read old records
Expand All @@ -232,7 +232,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
err = wait.ExponentialBackoff(backoff, func() (bool, error) {
conn, listerErr = net.Listen("tcp", address)
if listerErr != nil {
log.Warnf("failed to listen: %v", listerErr)
log.WithError(err).Warn("failed to listen")
return false, nil
}
return true, nil
Expand Down Expand Up @@ -262,7 +262,7 @@ func (as *argoServer) Run(ctx context.Context, port int, browserOpenFunc func(st
log.WithFields(log.Fields{
"GRPC_MESSAGE_SIZE": MaxGRPCMessageSize,
}).Info("GRPC Server Max Message Size, MaxGRPCMessageSize, is set")
log.Infof("Argo Server started successfully on %s", url)
log.WithFields(log.Fields{"url": url}).Infof("Argo Server started successfully on %s", url)
browserOpenFunc(url)

<-as.stopCh
Expand Down Expand Up @@ -413,14 +413,15 @@ func mustRegisterGWHandler(register registerFunc, ctx context.Context, mux *runt

// checkServeErr checks the error from a .Serve() call to decide if it was a graceful shutdown
func (as *argoServer) checkServeErr(name string, err error) {
nameField := log.Fields{"name": name}
if err != nil {
if as.stopCh == nil {
// a nil stopCh indicates a graceful shutdown
log.Infof("graceful shutdown %s: %v", name, err)
log.WithFields(nameField).WithError(err).Info("graceful shutdown with error")
} else {
log.Fatalf("%s: %v", name, err)
log.WithFields(nameField).WithError(err).Fatalf("%s: %v", name, err)
}
} else {
log.Infof("graceful shutdown %s", name)
log.WithFields(nameField).Info("graceful shutdown")
}
}
2 changes: 1 addition & 1 deletion server/artifacts/artifact_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func (a *ArtifactServer) returnArtifact(w http.ResponseWriter, art *wfv1.Artifac

defer func() {
if err := stream.Close(); err != nil {
log.Warningf("Error closing stream[%s]: %v", stream, err)
log.WithFields(log.Fields{"stream": stream}).WithError(err).Warning("Error closing stream")
}
}()

Expand Down
18 changes: 16 additions & 2 deletions server/clusterworkflowtemplate/cluster_workflow_template_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package clusterworkflowtemplate
import (
"context"
"fmt"
log "github.com/sirupsen/logrus"
"sort"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -36,12 +37,18 @@ func (cwts *ClusterWorkflowTemplateServer) CreateClusterWorkflowTemplate(ctx con
if err != nil {
return nil, err
}
return wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates().Create(ctx, req.Template, v1.CreateOptions{})
wfTemplate, err := wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates().Create(ctx, req.Template, v1.CreateOptions{})
if err != nil {
log.WithError(err).Error("Create Cluster workflow template error")
return nil, err
}
return wfTemplate, nil
}

func (cwts *ClusterWorkflowTemplateServer) GetClusterWorkflowTemplate(ctx context.Context, req *clusterwftmplpkg.ClusterWorkflowTemplateGetRequest) (*v1alpha1.ClusterWorkflowTemplate, error) {
wfTmpl, err := cwts.getTemplateAndValidate(ctx, req.Name)
if err != nil {
log.WithField("name", req.Name).WithError(err).Error("Get Cluster workflow template error")
return nil, err
}
return wfTmpl, nil
Expand Down Expand Up @@ -69,6 +76,7 @@ func (cwts *ClusterWorkflowTemplateServer) ListClusterWorkflowTemplates(ctx cont
cwts.instanceIDService.With(options)
cwfList, err := wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates().List(ctx, *options)
if err != nil {
log.WithField("options", options).WithError(err).Error("List Cluster workflow template error")
return nil, err
}

Expand All @@ -85,6 +93,7 @@ func (cwts *ClusterWorkflowTemplateServer) DeleteClusterWorkflowTemplate(ctx con
}
err = wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates().Delete(ctx, req.Name, v1.DeleteOptions{})
if err != nil {
log.WithFields(log.Fields{"name": req.Name}).WithError(err).Error("Delete Cluster workflow template error")
return nil, err
}

Expand All @@ -99,6 +108,7 @@ func (cwts *ClusterWorkflowTemplateServer) LintClusterWorkflowTemplate(ctx conte

err := validate.ValidateClusterWorkflowTemplate(nil, cwftmplGetter, req.Template, validate.ValidateOpts{Lint: true})
if err != nil {
log.WithError(err).Error("Lint Cluster workflow template error")
return nil, err
}

Expand All @@ -122,5 +132,9 @@ func (cwts *ClusterWorkflowTemplateServer) UpdateClusterWorkflowTemplate(ctx con
}

res, err := wfClient.ArgoprojV1alpha1().ClusterWorkflowTemplates().Update(ctx, req.Template, v1.UpdateOptions{})
return res, err
if err != nil {
log.WithError(err).Error("Update Cluster workflow template error")
return nil, err
}
return res, nil
}
26 changes: 23 additions & 3 deletions server/cronworkflow/cron_workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -34,6 +35,7 @@ func (c *cronWorkflowServiceServer) LintCronWorkflow(ctx context.Context, req *c
creator.Label(ctx, req.CronWorkflow)
err := validate.ValidateCronWorkflow(wftmplGetter, cwftmplGetter, req.CronWorkflow)
if err != nil {
log.WithError(err).Error("List cron workflow err")
return nil, err
}
return req.CronWorkflow, nil
Expand All @@ -45,7 +47,12 @@ func (c *cronWorkflowServiceServer) ListCronWorkflows(ctx context.Context, req *
options = req.ListOptions
}
c.instanceIDService.With(options)
return auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).List(ctx, *options)
cronWorkflows, err := auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).List(ctx, *options)
if err != nil {
log.WithField("namespace", req.Namespace).WithError(err).Error("List cron workflows error")
return nil, err
}
return cronWorkflows, nil
}

func (c *cronWorkflowServiceServer) CreateCronWorkflow(ctx context.Context, req *cronworkflowpkg.CreateCronWorkflowRequest) (*v1alpha1.CronWorkflow, error) {
Expand All @@ -61,7 +68,12 @@ func (c *cronWorkflowServiceServer) CreateCronWorkflow(ctx context.Context, req
if err != nil {
return nil, err
}
return wfClient.ArgoprojV1alpha1().CronWorkflows(req.Namespace).Create(ctx, req.CronWorkflow, metav1.CreateOptions{})
cronWf, err := wfClient.ArgoprojV1alpha1().CronWorkflows(req.Namespace).Create(ctx, req.CronWorkflow, metav1.CreateOptions{})
if err != nil {
log.WithField("namespace", req.Namespace).WithError(err).Error("Create cron workflow error")
return nil, err
}
return cronWf, nil
}

func (c *cronWorkflowServiceServer) GetCronWorkflow(ctx context.Context, req *cronworkflowpkg.GetCronWorkflowRequest) (*v1alpha1.CronWorkflow, error) {
Expand All @@ -83,7 +95,12 @@ func (c *cronWorkflowServiceServer) UpdateCronWorkflow(ctx context.Context, req
if err := validate.ValidateCronWorkflow(wftmplGetter, cwftmplGetter, req.CronWorkflow); err != nil {
return nil, err
}
return auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Update(ctx, req.CronWorkflow, metav1.UpdateOptions{})
cronWf, err := auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Update(ctx, req.CronWorkflow, metav1.UpdateOptions{})
if err != nil {
log.WithFields(log.Fields{"namespace": req.Namespace, "name": req.CronWorkflow.Name}).WithError(err).Error("Update Cron workflow error")
return nil, err
}
return cronWf, nil
}

func (c *cronWorkflowServiceServer) DeleteCronWorkflow(ctx context.Context, req *cronworkflowpkg.DeleteCronWorkflowRequest) (*cronworkflowpkg.CronWorkflowDeletedResponse, error) {
Expand All @@ -99,6 +116,7 @@ func (c *cronWorkflowServiceServer) DeleteCronWorkflow(ctx context.Context, req

err = auth.GetWfClient(ctx).ArgoprojV1alpha1().CronWorkflows(req.Namespace).Delete(ctx, req.Name, opts)
if err != nil {
log.WithFields(log.Fields{"namespace": req.Namespace, "name": req.Name}).WithError(err).Error("Delete Cron workflow error")
return nil, err
}
return &cronworkflowpkg.CronWorkflowDeletedResponse{}, nil
Expand All @@ -124,10 +142,12 @@ func (c *cronWorkflowServiceServer) getCronWorkflowAndValidate(ctx context.Conte
wfClient := auth.GetWfClient(ctx)
cronWf, err := wfClient.ArgoprojV1alpha1().CronWorkflows(namespace).Get(ctx, name, options)
if err != nil {
log.WithFields(log.Fields{"namespace": namespace, "name": name}).WithError(err).Error("Get Cron workflow error")
return nil, err
}
err = c.instanceIDService.Validate(cronWf)
if err != nil {
log.WithFields(log.Fields{"namespace": namespace, "name": name}).WithError(err).Error("Validate Cron workflow error")
return nil, err
}
return cronWf, nil
Expand Down
6 changes: 3 additions & 3 deletions server/workflow/workflow_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,10 @@ func (s *workflowServer) CreateWorkflow(ctx context.Context, req *workflowpkg.Wo
if err != nil {
if apierr.IsServerTimeout(err) && req.Workflow.GenerateName != "" && req.Workflow.Name != "" {
errWithHint := fmt.Errorf(`create request failed due to timeout, but it's possible that workflow "%s" already exists. Original error: %w`, req.Workflow.Name, err)
log.Error(errWithHint)
log.WithError(errWithHint).Error(errWithHint.Error())
return nil, errWithHint
}
log.Errorf("Create request failed: %s", err)
log.WithError(err).Error("Create request failed")
return nil, err
}

Expand Down Expand Up @@ -395,7 +395,7 @@ func (s *workflowServer) ResumeWorkflow(ctx context.Context, req *workflowpkg.Wo

err = util.ResumeWorkflow(ctx, wfClient.ArgoprojV1alpha1().Workflows(req.Namespace), s.hydrator, wf.Name, req.NodeFieldSelector)
if err != nil {
log.Warnf("Failed to resume %s: %+v", wf.Name, err)
log.WithFields(log.Fields{"name": wf.Name}).WithError(err).Warn("Failed to resume")
return nil, err
}

Expand Down
1 change: 1 addition & 0 deletions v3

0 comments on commit defc663

Please sign in to comment.