Skip to content

Commit

Permalink
Merge pull request #22777 from chaodaiG/pubsub-presubmit
Browse files Browse the repository at this point in the history
Prow pubsub: supports presubmit jobs
  • Loading branch information
k8s-ci-robot authored Jul 12, 2021
2 parents 43b031e + a1ce8f6 commit 161e384
Show file tree
Hide file tree
Showing 5 changed files with 94 additions and 15 deletions.
1 change: 1 addition & 0 deletions prow/cmd/sub/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"//prow/crier/reporters/pubsub:go_default_library",
"//prow/flagutil:go_default_library",
"//prow/flagutil/config:go_default_library",
"//prow/git/v2:go_default_library",
"//prow/interrupts:go_default_library",
"//prow/logrusutil:go_default_library",
"//prow/metrics:go_default_library",
Expand Down
24 changes: 17 additions & 7 deletions prow/cmd/sub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"k8s.io/test-infra/prow/crier/reporters/pubsub"
"k8s.io/test-infra/prow/flagutil"
configflagutil "k8s.io/test-infra/prow/flagutil/config"
"k8s.io/test-infra/prow/git/v2"
"k8s.io/test-infra/prow/interrupts"
"k8s.io/test-infra/prow/logrusutil"
"k8s.io/test-infra/prow/metrics"
Expand All @@ -45,6 +46,7 @@ var (

type options struct {
client flagutil.KubernetesOptions
github flagutil.GitHubOptions
port int
pushSecretFile string

Expand Down Expand Up @@ -80,6 +82,7 @@ func init() {

flagOptions.config.AddFlags(fs)
flagOptions.client.AddFlags(fs)
flagOptions.github.AddFlags(fs)
flagOptions.instrumentationOptions.AddFlags(fs)

fs.Parse(os.Args[1:])
Expand All @@ -93,16 +96,22 @@ func main() {
logrus.WithError(err).Fatal("Error starting config agent.")
}

var tokenGenerator func() []byte
var tokens []string
if flagOptions.pushSecretFile != "" {
var tokens []string
tokens = append(tokens, flagOptions.pushSecretFile)
}
if flagOptions.github.TokenPath != "" {
tokens = append(tokens, flagOptions.github.TokenPath)
}
secretAgent := &secret.Agent{}
if err := secretAgent.Start(tokens); err != nil {
logrus.WithError(err).Fatal("Error starting secrets agent.")
}
tokenGenerator := secretAgent.GetTokenGenerator(flagOptions.pushSecretFile)

secretAgent := &secret.Agent{}
if err := secretAgent.Start(tokens); err != nil {
logrus.WithError(err).Fatal("Error starting secrets agent.")
}
tokenGenerator = secretAgent.GetTokenGenerator(flagOptions.pushSecretFile)
gitClient, err := flagOptions.github.GitClient(secretAgent, flagOptions.dryRun)
if err != nil {
logrus.WithError(err).Fatal("Error getting Git client.")
}

prowjobClient, err := flagOptions.client.ProwJobClient(configAgent.Config().ProwJobNamespace, flagOptions.dryRun)
Expand All @@ -125,6 +134,7 @@ func main() {
ConfigAgent: configAgent,
Metrics: promMetrics,
ProwJobClient: kubeClient,
GitClient: git.ClientFactoryFrom(gitClient),
Reporter: pubsub.NewReporter(configAgent.Config), // reuse crier reporter
}

Expand Down
7 changes: 6 additions & 1 deletion prow/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,12 @@ func (c *Config) GetPresubmits(gc git.ClientFactory, identifier string, baseSHAG
return nil, err
}

return append(c.PresubmitsStatic[identifier], prowYAML.Presubmits...), nil
return append(c.GetPresubmitsStatic(identifier), prowYAML.Presubmits...), nil
}

// GetPresubmitsStatic will return presubmits for the given identifier that are versioned inside the tested repo
func (c *Config) GetPresubmitsStatic(identifier string) []Presubmit {
return c.PresubmitsStatic[identifier]
}

// GetPostsubmits will return all postsubmits for the given identifier. This includes
Expand Down
1 change: 1 addition & 0 deletions prow/pubsub/subscriber/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
deps = [
"//prow/apis/prowjobs/v1:go_default_library",
"//prow/config:go_default_library",
"//prow/git/v2:go_default_library",
"//prow/pjutil:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
Expand Down
76 changes: 69 additions & 7 deletions prow/pubsub/subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
prowapi "k8s.io/test-infra/prow/apis/prowjobs/v1"
v1 "k8s.io/test-infra/prow/apis/prowjobs/v1"
"k8s.io/test-infra/prow/config"
"k8s.io/test-infra/prow/git/v2"
"k8s.io/test-infra/prow/pjutil"
)

Expand All @@ -43,6 +44,16 @@ const (
postsubmitProwJobEvent = "prow.k8s.io/pubsub.PostsubmitProwJobEvent"
)

// Ensure interface is intact
var _ prowCfgClient = (*config.Config)(nil)

// prowCfgClient is for unit test purpose
type prowCfgClient interface {
AllPeriodics() []config.Periodic
GetPresubmits(gc git.ClientFactory, identifier string, baseSHAGetter config.RefGetter, headSHAGetters ...config.RefGetter) ([]config.Presubmit, error)
GetPresubmitsStatic(identifier string) []config.Presubmit
}

// ProwJobEvent contains the minimum information required to start a ProwJob.
type ProwJobEvent struct {
Name string `json:"name"`
Expand Down Expand Up @@ -88,6 +99,7 @@ type Subscriber struct {
ConfigAgent *config.Agent
Metrics *Metrics
ProwJobClient ProwJobClient
GitClient git.ClientFactory
Reporter reportClient
}

Expand Down Expand Up @@ -129,13 +141,13 @@ func (m *pubSubMessage) nack() {

// jobHandler handles job type specific logic
type jobHandler interface {
getProwJobSpec(cfg *config.Config, pe ProwJobEvent) (*v1.ProwJobSpec, map[string]string, error)
getProwJobSpec(cfg prowCfgClient, pe ProwJobEvent) (*v1.ProwJobSpec, map[string]string, error)
}

// periodicJobHandler implements jobHandler
type periodicJobHandler struct{}

func (peh *periodicJobHandler) getProwJobSpec(cfg *config.Config, pe ProwJobEvent) (*v1.ProwJobSpec, map[string]string, error) {
func (peh *periodicJobHandler) getProwJobSpec(cfg prowCfgClient, pe ProwJobEvent) (*v1.ProwJobSpec, map[string]string, error) {
var periodicJob *config.Periodic
// TODO(chaodaiG): do we want to support inrepoconfig when
// https://github.com/kubernetes/test-infra/issues/21729 is done?
Expand All @@ -154,16 +166,66 @@ func (peh *periodicJobHandler) getProwJobSpec(cfg *config.Config, pe ProwJobEven
}

// presubmitJobHandler implements jobHandler
type presubmitJobHandler struct{}
type presubmitJobHandler struct {
GitClient git.ClientFactory
}

func (prh *presubmitJobHandler) getProwJobSpec(cfg prowCfgClient, pe ProwJobEvent) (*v1.ProwJobSpec, map[string]string, error) {
// presubmit jobs require Refs and Refs.Pulls to be set
refs := pe.Refs
if refs == nil {
return nil, nil, errors.New("Refs must be supplied")
}
if len(refs.Pulls) == 0 {
return nil, nil, errors.New("at least 1 Pulls is required")
}

var presubmitJob *config.Presubmit
org, repo, branch := refs.Org, refs.Repo, refs.BaseRef
orgRepo := org + "/" + repo
baseSHAGetter := func() (string, error) {
return refs.BaseSHA, nil
}
var headSHAGetters []func() (string, error)
for _, pull := range refs.Pulls {
pull := pull
headSHAGetters = append(headSHAGetters, func() (string, error) {
return pull.SHA, nil
})
}

// This will work with inrepoconfig
presubmits, err := cfg.GetPresubmits(prh.GitClient, orgRepo, baseSHAGetter, headSHAGetters...)
if err != nil {
// Fall back to static presubmits to avoid deadlocking when a presubmit is used to verify
// inrepoconfig
logrus.WithError(err).Debug("Failed to get presubmits")
presubmits = cfg.GetPresubmitsStatic(orgRepo)
}

for _, job := range presubmits {
if !job.CouldRun(branch) { // filter out jobs that are not branch matching
continue
}
if job.Name == pe.Name {
if presubmitJob != nil {
return nil, nil, fmt.Errorf("%s matches multiple prow jobs", pe.Name)
}
presubmitJob = &job
}
}
if presubmitJob == nil {
return nil, nil, fmt.Errorf("failed to find associated periodic job %q", pe.Name)
}

func (prh *presubmitJobHandler) getProwJobSpec(cfg *config.Config, pe ProwJobEvent) (*v1.ProwJobSpec, map[string]string, error) {
return nil, nil, errors.New("presubmit not supported yet")
prowJobSpec := pjutil.PresubmitSpec(*presubmitJob, *refs)
return &prowJobSpec, presubmitJob.Labels, nil
}

// ppostsubmitJobHandler implements jobHandler
type postsubmitJobHandler struct{}

func (poh *postsubmitJobHandler) getProwJobSpec(cfg *config.Config, pe ProwJobEvent) (*v1.ProwJobSpec, map[string]string, error) {
func (poh *postsubmitJobHandler) getProwJobSpec(cfg prowCfgClient, pe ProwJobEvent) (*v1.ProwJobSpec, map[string]string, error) {
return nil, nil, errors.New("postsubmit not supported yet")
}

Expand Down Expand Up @@ -193,7 +255,7 @@ func (s *Subscriber) handleMessage(msg messageInterface, subscription string) er
case periodicProwJobEvent:
jh = &periodicJobHandler{}
case presubmitProwJobEvent:
jh = &presubmitJobHandler{}
jh = &presubmitJobHandler{GitClient: s.GitClient}
case postsubmitProwJobEvent:
jh = &postsubmitJobHandler{}
default:
Expand Down

0 comments on commit 161e384

Please sign in to comment.