Skip to content

Commit

Permalink
Merge pull request #22765 from chaodaiG/pubsub-interface
Browse files Browse the repository at this point in the history
Prow sub: interface supports presubmit and postsubmit jobs
  • Loading branch information
k8s-ci-robot authored Jul 1, 2021
2 parents 1597134 + ef20447 commit a7530d2
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 57 deletions.
2 changes: 1 addition & 1 deletion prow/cmd/sub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func main() {
ConfigAgent: configAgent,
Metrics: promMetrics,
ProwJobClient: kubeClient,
Reporter: pubsub.NewReporter(configAgent.Config),
Reporter: pubsub.NewReporter(configAgent.Config), // reuse crier reporter
}

// Return 200 on / for health checks.
Expand Down
124 changes: 86 additions & 38 deletions prow/pubsub/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package subscriber
import (
"context"
"encoding/json"
"errors"
"fmt"

"cloud.google.com/go/pubsub"
Expand All @@ -30,33 +31,38 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

prowapi "k8s.io/test-infra/prow/apis/prowjobs/v1"
v1 "k8s.io/test-infra/prow/apis/prowjobs/v1"
"k8s.io/test-infra/prow/config"
"k8s.io/test-infra/prow/pjutil"
)

const (
prowEventType = "prow.k8s.io/pubsub.EventType"
periodicProwJobEvent = "prow.k8s.io/pubsub.PeriodicProwJobEvent"
prowEventType = "prow.k8s.io/pubsub.EventType"
periodicProwJobEvent = "prow.k8s.io/pubsub.PeriodicProwJobEvent"
presubmitProwJobEvent = "prow.k8s.io/pubsub.PresubmitProwJobEvent"
postsubmitProwJobEvent = "prow.k8s.io/pubsub.PostsubmitProwJobEvent"
)

// PeriodicProwJobEvent contains the minimum information required to start a ProwJob.
type PeriodicProwJobEvent struct {
Name string `json:"name"`
// ProwJobEvent contains the minimum information required to start a ProwJob.
type ProwJobEvent struct {
Name string `json:"name"`
// Refs are used by presubmit and postsubmit jobs supplying baseSHA and SHA
Refs *v1.Refs `json:"refs,omitempty"`
Envs map[string]string `json:"envs,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
}

// FromPayload set the PeriodicProwJobEvent from the PubSub message payload.
func (pe *PeriodicProwJobEvent) FromPayload(data []byte) error {
// FromPayload set the ProwJobEvent from the PubSub message payload.
func (pe *ProwJobEvent) FromPayload(data []byte) error {
if err := json.Unmarshal(data, pe); err != nil {
return err
}
return nil
}

// ToMessage generates a PubSub Message from a PeriodicProwJobEvent.
func (pe *PeriodicProwJobEvent) ToMessage() (*pubsub.Message, error) {
// ToMessage generates a PubSub Message from a ProwJobEvent.
func (pe *ProwJobEvent) ToMessage() (*pubsub.Message, error) {
data, err := json.Marshal(pe)
if err != nil {
return nil, err
Expand Down Expand Up @@ -121,6 +127,46 @@ func (m *pubSubMessage) nack() {
m.Message.Nack()
}

// jobHandler handles job type specific logic
type jobHandler interface {
getProwJobSpec(cfg *config.Config, pe ProwJobEvent) (*v1.ProwJobSpec, map[string]string, error)
}

// periodicJobHandler implements jobHandler
type periodicJobHandler struct{}

func (peh *periodicJobHandler) getProwJobSpec(cfg *config.Config, pe ProwJobEvent) (*v1.ProwJobSpec, map[string]string, error) {
var periodicJob *config.Periodic
// TODO(chaodaiG): do we want to support inrepoconfig when
// https://github.com/kubernetes/test-infra/issues/21729 is done?
for _, job := range cfg.AllPeriodics() {
if job.Name == pe.Name {
periodicJob = &job
break
}
}
if periodicJob == nil {
return nil, nil, fmt.Errorf("failed to find associated periodic job %q", pe.Name)
}

prowJobSpec := pjutil.PeriodicSpec(*periodicJob)
return &prowJobSpec, periodicJob.Labels, nil
}

// presubmitJobHandler implements jobHandler
type presubmitJobHandler struct{}

func (prh *presubmitJobHandler) getProwJobSpec(cfg *config.Config, pe ProwJobEvent) (*v1.ProwJobSpec, map[string]string, error) {
return nil, nil, errors.New("presubmit not supported yet")
}

// ppostsubmitJobHandler implements jobHandler
type postsubmitJobHandler struct{}

func (poh *postsubmitJobHandler) getProwJobSpec(cfg *config.Config, pe ProwJobEvent) (*v1.ProwJobSpec, map[string]string, error) {
return nil, nil, errors.New("postsubmit not supported yet")
}

func extractFromAttribute(attrs map[string]string, key string) (string, error) {
value, ok := attrs[key]
if !ok {
Expand All @@ -141,62 +187,64 @@ func (s *Subscriber) handleMessage(msg messageInterface, subscription string) er
s.Metrics.ErrorCounter.With(prometheus.Labels{subscriptionLabel: subscription})
return err
}

var jh jobHandler
switch eType {
case periodicProwJobEvent:
err := s.handlePeriodicJob(l, msg, subscription)
if err != nil {
l.WithError(err).Error("failed to create Prow Periodic Job")
s.Metrics.ErrorCounter.With(prometheus.Labels{subscriptionLabel: subscription})
}
return err
jh = &periodicJobHandler{}
case presubmitProwJobEvent:
jh = &presubmitJobHandler{}
case postsubmitProwJobEvent:
jh = &postsubmitJobHandler{}
default:
l.WithField("type", eType).Error("Unsupported event type")
s.Metrics.ErrorCounter.With(prometheus.Labels{subscriptionLabel: subscription})
return fmt.Errorf("unsupported event type: %s", eType)
}
if err = s.handleProwJob(l, jh, msg, subscription); err != nil {
l.WithError(err).Error("failed to create Prow Job")
s.Metrics.ErrorCounter.With(prometheus.Labels{subscriptionLabel: subscription})
}
err = fmt.Errorf("unsupported event type")
l.WithError(err).Error("failed to read message")
s.Metrics.ErrorCounter.With(prometheus.Labels{subscriptionLabel: subscription})
return err
}

func (s *Subscriber) handlePeriodicJob(l *logrus.Entry, msg messageInterface, subscription string) error {
func (s *Subscriber) handleProwJob(l *logrus.Entry, jh jobHandler, msg messageInterface, subscription string) error {

var pe PeriodicProwJobEvent
var pe ProwJobEvent
var prowJob prowapi.ProwJob

if err := pe.FromPayload(msg.getPayload()); err != nil {
return err
}

reportProwJobFailure := func(pj *prowapi.ProwJob, err error) {
pj.Status.State = prowapi.ErrorState
pj.Status.Description = err.Error()
if s.Reporter.ShouldReport(context.TODO(), l, &prowJob) {
if _, _, err := s.Reporter.Report(context.TODO(), l, &prowJob); err != nil {
if s.Reporter.ShouldReport(context.TODO(), l, pj) {
if _, _, err := s.Reporter.Report(context.TODO(), l, pj); err != nil {
l.Warningf("failed to report status. %v", err)
}
}
}

if err := pe.FromPayload(msg.getPayload()); err != nil {
return err
}
var periodicJob *config.Periodic
for _, job := range s.ConfigAgent.Config().AllPeriodics() {
if job.Name == pe.Name {
periodicJob = &job
break
}
}
if periodicJob == nil {
err := fmt.Errorf("failed to find associated periodic job %q", pe.Name)
prowJobSpec, labels, err := jh.getProwJobSpec(s.ConfigAgent.Config(), pe)
if err != nil {
l.WithError(err).Errorf("failed to create job %q", pe.Name)
prowJob = pjutil.NewProwJob(prowapi.ProwJobSpec{}, nil, pe.Annotations)
reportProwJobFailure(&prowJob, err)
return err
}
if prowJobSpec == nil {
return fmt.Errorf("failed getting prowjob spec") // This should not happen
}

prowJobSpec := pjutil.PeriodicSpec(*periodicJob)
// Adds / Updates Labels from prow job event
for k, v := range pe.Labels {
periodicJob.Labels[k] = v
labels[k] = v
}

// Adds annotations
prowJob = pjutil.NewProwJob(prowJobSpec, periodicJob.Labels, pe.Annotations)
prowJob = pjutil.NewProwJob(*prowJobSpec, labels, pe.Annotations)
// Adds / Updates Environments to containers
if prowJob.Spec.PodSpec != nil {
for i, c := range prowJob.Spec.PodSpec.Containers {
Expand All @@ -212,6 +260,6 @@ func (s *Subscriber) handlePeriodicJob(l *logrus.Entry, msg messageInterface, su
reportProwJobFailure(&prowJob, err)
return err
}
l.Infof("periodic job %q created as %q", pe.Name, prowJob.Name)
l.Infof("Job %q created as %q", pe.Name, prowJob.Name)
return nil
}
36 changes: 18 additions & 18 deletions prow/pubsub/subscriber/subscriber_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func (r *fakeReporter) ShouldReport(_ context.Context, _ *logrus.Entry, pj *prow
return pj.Annotations[reporter.PubSubProjectLabel] != "" && pj.Annotations[reporter.PubSubTopicLabel] != ""
}

func TestPeriodicProwJobEvent_ToFromMessage(t *testing.T) {
pe := PeriodicProwJobEvent{
func TestProwJobEvent_ToFromMessage(t *testing.T) {
pe := ProwJobEvent{
Annotations: map[string]string{
reporter.PubSubProjectLabel: "project",
reporter.PubSubTopicLabel: "topic",
Expand All @@ -131,7 +131,7 @@ func TestPeriodicProwJobEvent_ToFromMessage(t *testing.T) {
if m.Attributes[prowEventType] != periodicProwJobEvent {
t.Errorf("%s should be %s found %s instead", prowEventType, periodicProwJobEvent, m.Attributes[prowEventType])
}
var newPe PeriodicProwJobEvent
var newPe ProwJobEvent
if err = newPe.FromPayload(m.Data); err != nil {
t.Error(err)
}
Expand All @@ -144,15 +144,15 @@ func TestHandleMessage(t *testing.T) {
for _, tc := range []struct {
name string
msg *pubSubMessage
pe *PeriodicProwJobEvent
pe *ProwJobEvent
s string
config *config.Config
err string
labels []string
}{
{
name: "PeriodicJobNoPubsub",
pe: &PeriodicProwJobEvent{
pe: &ProwJobEvent{
Name: "test",
},
config: &config.Config{
Expand All @@ -177,7 +177,7 @@ func TestHandleMessage(t *testing.T) {
},
},
config: &config.Config{},
err: "unsupported event type",
err: "unsupported event type: unsupported",
labels: []string{reporter.PubSubTopicLabel, reporter.PubSubRunIDLabel, reporter.PubSubProjectLabel},
},
{
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestHandleMessage(t *testing.T) {
}
if err := s.handleMessage(tc.msg, tc.s); err != nil {
if err.Error() != tc.err {
t1.Errorf("Expected error %v got %v", tc.err, err.Error())
t1.Errorf("Expected error '%v' got '%v'", tc.err, err.Error())
} else if tc.err == "" {
var created []*prowapi.ProwJob
for _, action := range fakeProwJobClient.Fake.Actions() {
Expand All @@ -235,7 +235,7 @@ func TestHandleMessage(t *testing.T) {
}
}

func CheckProwJob(pe *PeriodicProwJobEvent, pj *prowapi.ProwJob) error {
func CheckProwJob(pe *ProwJobEvent, pj *prowapi.ProwJob) error {
// checking labels
for label, value := range pe.Labels {
if pj.Labels[label] != value {
Expand Down Expand Up @@ -269,7 +269,7 @@ func CheckProwJob(pe *PeriodicProwJobEvent, pj *prowapi.ProwJob) error {
func TestHandlePeriodicJob(t *testing.T) {
for _, tc := range []struct {
name string
pe *PeriodicProwJobEvent
pe *ProwJobEvent
s string
config *config.Config
err string
Expand All @@ -278,7 +278,7 @@ func TestHandlePeriodicJob(t *testing.T) {
}{
{
name: "PeriodicJobNoPubsub",
pe: &PeriodicProwJobEvent{
pe: &ProwJobEvent{
Name: "test",
},
config: &config.Config{
Expand All @@ -295,7 +295,7 @@ func TestHandlePeriodicJob(t *testing.T) {
},
{
name: "PeriodicJobPubsubSet",
pe: &PeriodicProwJobEvent{
pe: &ProwJobEvent{
Name: "test",
Annotations: map[string]string{
reporter.PubSubProjectLabel: "project",
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestHandlePeriodicJob(t *testing.T) {
},
{
name: "PeriodicJobPubsubSetCreationError",
pe: &PeriodicProwJobEvent{
pe: &ProwJobEvent{
Name: "test",
Annotations: map[string]string{
reporter.PubSubProjectLabel: "project",
Expand All @@ -359,15 +359,15 @@ func TestHandlePeriodicJob(t *testing.T) {
},
{
name: "JobNotFound",
pe: &PeriodicProwJobEvent{
pe: &ProwJobEvent{
Name: "test",
},
config: &config.Config{},
err: "failed to find associated periodic job \"test\"",
},
{
name: "JobNotFoundReportNeeded",
pe: &PeriodicProwJobEvent{
pe: &ProwJobEvent{
Name: "test",
Annotations: map[string]string{
reporter.PubSubProjectLabel: "project",
Expand Down Expand Up @@ -402,7 +402,7 @@ func TestHandlePeriodicJob(t *testing.T) {
t.Error(err)
}
m.ID = "id"
err = s.handlePeriodicJob(logrus.NewEntry(logrus.New()), &pubSubMessage{*m}, tc.s)
err = s.handleProwJob(logrus.NewEntry(logrus.New()), &periodicJobHandler{}, &pubSubMessage{*m}, tc.s)
if err != nil {
if err.Error() != tc.err {
t1.Errorf("Expected error %v got %v", tc.err, err.Error())
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestPushServer_ServeHTTP(t *testing.T) {
url string
secret string
pushRequest interface{}
pe *PeriodicProwJobEvent
pe *ProwJobEvent
expectedCode int
}{
{
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestPushServer_ServeHTTP(t *testing.T) {
secret: "secret",
url: "https://prow.k8s.io/push?token=secret",
pushRequest: pushRequest{},
pe: &PeriodicProwJobEvent{
pe: &ProwJobEvent{
Name: "test",
},
expectedCode: http.StatusOK,
Expand All @@ -494,7 +494,7 @@ func TestPushServer_ServeHTTP(t *testing.T) {
name: "SuccessNoToken",
url: "https://prow.k8s.io/push",
pushRequest: pushRequest{},
pe: &PeriodicProwJobEvent{
pe: &ProwJobEvent{
Name: "test",
},
expectedCode: http.StatusOK,
Expand Down

0 comments on commit a7530d2

Please sign in to comment.