Skip to content

Commit

Permalink
Merge pull request #14815 from amwat/gerr
Browse files Browse the repository at this point in the history
[Gerrit] Modify lastUpdate to be per project instead of global.
  • Loading branch information
k8s-ci-robot authored Oct 23, 2019
2 parents 7abdd3f + 5c5464f commit 4a144ea
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 87 deletions.
79 changes: 54 additions & 25 deletions prow/cmd/gerrit/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package main

import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"os"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -90,84 +90,113 @@ func gatherOptions(fs *flag.FlagSet, args ...string) options {
}

type syncTime struct {
val time.Time
val client.LastSyncState
lock sync.RWMutex
path string
opener io.Opener
ctx context.Context
}

func (st *syncTime) init() error {
fmt.Println(st.val)
func (st *syncTime) init(hostProjects client.ProjectsFlag) error {
logrus.WithField("projects", hostProjects).Debug(st.val)
st.lock.RLock()
zero := st.val.IsZero()
zero := st.val == nil
st.lock.RUnlock()
if !zero {
return nil
}
st.lock.Lock()
defer st.lock.Unlock()
if !st.val.IsZero() {
if st.val != nil {
return nil // Someone else set it while we waited for the write lock
}
unix, err := st.currentInt()
state, err := st.currentState()
if err != nil {
return err
}
if unix == 0 {
st.val = time.Now()
if state != nil {
st.val = state
logrus.Warnf("Reset lastSyncFallback to %v", st.val)
} else {
st.val = time.Unix(unix, 0)
currentTime := time.Now()
targetState := client.LastSyncState{}
for host, projects := range hostProjects {
targetState[host] = map[string]time.Time{}
for _, project := range projects {
targetState[host][project] = currentTime
}
}
st.val = targetState
}
return nil
}

func (st *syncTime) currentInt() (int64, error) {
func (st *syncTime) currentState() (client.LastSyncState, error) {
r, err := st.opener.Reader(st.ctx, st.path)
if io.IsNotExist(err) {
logrus.Warnf("lastSyncFallback not found at %q", st.path)
return 0, nil
return nil, nil
} else if err != nil {
return 0, fmt.Errorf("open: %v", err)
return nil, fmt.Errorf("open: %v", err)
}
defer io.LogClose(r)
buf, err := ioutil.ReadAll(r)
if err != nil {
return 0, fmt.Errorf("read: %v", err)
return nil, fmt.Errorf("read: %v", err)
}
unix, err := strconv.ParseInt(string(buf), 10, 64)
if err != nil {
return 0, fmt.Errorf("parse int: %v", err)
var state client.LastSyncState
if err := json.Unmarshal(buf, &state); err != nil {
return nil, fmt.Errorf("unmarshall state: %v", err)
}
return unix, nil
return state, nil
}

func (st *syncTime) Current() time.Time {
func (st *syncTime) Current() client.LastSyncState {
st.lock.RLock()
defer st.lock.RUnlock()
return st.val
}

func (st *syncTime) Update(t time.Time) error {
func (st *syncTime) Update(newState client.LastSyncState) error {
st.lock.Lock()
defer st.lock.Unlock()
if !t.After(st.val) {

targetState := st.val.DeepCopy()

var changed bool
for host, newLastSyncs := range newState {
if _, ok := targetState[host]; !ok {
targetState[host] = map[string]time.Time{}
}
for project, newLastSync := range newLastSyncs {
currentLastSync, ok := targetState[host][project]
if !ok || currentLastSync.Before(newLastSync) {
targetState[host][project] = newLastSync
changed = true
}
}
}

if !changed {
return nil
}

w, err := st.opener.Writer(st.ctx, st.path)
if err != nil {
return fmt.Errorf("open for write %q: %v", st.path, err)
}
lastSyncUnix := strconv.FormatInt(t.Unix(), 10)
if _, err := fmt.Fprint(w, lastSyncUnix); err != nil {
stateBytes, err := json.Marshal(targetState)
if err != nil {
return fmt.Errorf("marshall state: %v", err)
}
if _, err := fmt.Fprint(w, string(stateBytes)); err != nil {
io.LogClose(w)
return fmt.Errorf("write %q: %v", st.path, err)
}
if err := w.Close(); err != nil {
return fmt.Errorf("close %q: %v", st.path, err)
}
st.val = t
st.val = targetState
return nil
}

Expand Down Expand Up @@ -204,7 +233,7 @@ func main() {
ctx: ctx,
opener: op,
}
if err := st.init(); err != nil {
if err := st.init(o.projects); err != nil {
logrus.WithError(err).Fatal("Error initializing lastSyncFallback.")
}
c, err := adapter.NewController(&st, o.cookiefilePath, o.projects, prowJobClient, cfg)
Expand Down
23 changes: 12 additions & 11 deletions prow/cmd/gerrit/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,42 +144,43 @@ func TestSyncTime(t *testing.T) {
opener: open,
ctx: ctx,
}
testProjectsFlag := client.ProjectsFlag{"foo": []string{"bar"}}
now := time.Now()
if err := st.init(); err != nil {
if err := st.init(testProjectsFlag); err != nil {
t.Fatalf("Failed init: %v", err)
}
cur := st.Current()
cur := st.Current()["foo"]["bar"]
if now.After(cur) {
t.Fatalf("%v should be >= time before init was called: %v", cur, now)
}

earlier := now.Add(-time.Hour)
later := now.Add(time.Hour)

if err := st.Update(earlier); err != nil {
if err := st.Update(client.LastSyncState{"foo": {"bar": earlier}}); err != nil {
t.Fatalf("Failed update: %v", err)
}
if actual := st.Current(); !actual.Equal(cur) {
if actual := st.Current()["foo"]["bar"]; !actual.Equal(cur) {
t.Errorf("Update(%v) should not have reduced value from %v, got %v", earlier, cur, actual)
}

if err := st.Update(later); err != nil {
if err := st.Update(client.LastSyncState{"foo": {"bar": later}}); err != nil {
t.Fatalf("Failed update: %v", err)
}
if actual := st.Current(); !actual.After(cur) {
if actual := st.Current()["foo"]["bar"]; !actual.After(cur) {
t.Errorf("Update(%v) did not move current value to after %v, got %v", later, cur, actual)
}

expected := later.Truncate(time.Second)
expected := later
st = syncTime{
path: path,
opener: open,
ctx: ctx,
}
if err := st.init(); err != nil {
if err := st.init(testProjectsFlag); err != nil {
t.Fatalf("Failed init: %v", err)
}
if actual := st.Current(); !actual.Equal(expected) {
if actual := st.Current()["foo"]["bar"]; !actual.Equal(expected) {
t.Errorf("init() failed to reload %v, got %v", expected, actual)
}

Expand All @@ -188,10 +189,10 @@ func TestSyncTime(t *testing.T) {
opener: fakeOpener{}, // return storage.ErrObjectNotExist on open
ctx: ctx,
}
if err := st.init(); err != nil {
if err := st.init(testProjectsFlag); err != nil {
t.Fatalf("Failed init: %v", err)
}
if actual := st.Current(); now.After(actual) || actual.After(later) {
if actual := st.Current()["foo"]["bar"]; now.After(actual) || actual.After(later) {
t.Fatalf("should initialize to start %v <= actual <= later %v, but got %v", now, later, actual)
}
}
22 changes: 15 additions & 7 deletions prow/gerrit/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type prowJobClient interface {
}

type gerritClient interface {
QueryChanges(lastUpdate time.Time, rateLimit int) map[string][]client.ChangeInfo
QueryChanges(lastState client.LastSyncState, rateLimit int) map[string][]client.ChangeInfo
GetBranchRevision(instance, project, branch string) (string, error)
SetReview(instance, id, revision, message string, labels map[string]string) error
Account(instance string) *gerrit.AccountInfo
Expand All @@ -59,8 +59,8 @@ type Controller struct {
}

type LastSyncTracker interface {
Current() time.Time
Update(time.Time) error
Current() client.LastSyncState
Update(client.LastSyncState) error
}

// NewController returns a new gerrit controller client
Expand All @@ -87,15 +87,17 @@ func NewController(lastSyncTracker LastSyncTracker, cookiefilePath string, proje
// and creates prowjobs according to specs
func (c *Controller) Sync() error {
syncTime := c.tracker.Current()
latest := syncTime
latest := syncTime.DeepCopy()

for instance, changes := range c.gc.QueryChanges(syncTime, c.config().Gerrit.RateLimit) {
for _, change := range changes {
if err := c.ProcessChange(instance, change); err != nil {
logrus.WithError(err).Errorf("Failed process change %v", change.CurrentRevision)
}
if latest.Before(change.Updated.Time) {
latest = change.Updated.Time
lastTime, ok := latest[instance][change.Project]
if !ok || lastTime.Before(change.Updated.Time) {
lastTime = change.Updated.Time
latest[instance][change.Project] = lastTime
}
}

Expand Down Expand Up @@ -241,7 +243,13 @@ func (c *Controller) ProcessChange(instance string, change client.ChangeInfo) er
if latestReport != nil {
logger.Infof("Found latest report: %s", latestReport)
}
lastUpdate := c.tracker.Current()

lastUpdate, ok := c.tracker.Current()[instance][change.Project]
if !ok {
logrus.Warnf("could not find lastTime for project %q, probably something went wrong with initTracker?", change.Project)
lastUpdate = time.Now()
}

filter, err := messageFilter(lastUpdate, change, presubmits, latestReport, logger)
if err != nil {
logger.WithError(err).Warn("failed to create filter on messages for presubmits")
Expand Down
18 changes: 12 additions & 6 deletions prow/gerrit/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (f *fca) Config() *config.Config {

type fgc struct{}

func (f *fgc) QueryChanges(lastUpdate time.Time, rateLimit int) map[string][]client.ChangeInfo {
func (f *fgc) QueryChanges(lastUpdate client.LastSyncState, rateLimit int) map[string][]client.ChangeInfo {
return nil
}

Expand Down Expand Up @@ -123,17 +123,17 @@ func TestMakeCloneURI(t *testing.T) {
}

type fakeSync struct {
val time.Time
val client.LastSyncState
lock sync.Mutex
}

func (s *fakeSync) Current() time.Time {
func (s *fakeSync) Current() client.LastSyncState {
s.lock.Lock()
defer s.lock.Unlock()
return s.val
}

func (s *fakeSync) Update(t time.Time) error {
func (s *fakeSync) Update(t client.LastSyncState) error {
s.lock.Lock()
defer s.lock.Unlock()
s.val = t
Expand Down Expand Up @@ -704,16 +704,22 @@ func TestProcessChange(t *testing.T) {
},
}

testInstance := "https://gerrit"
fakeProwJobClient := prowfake.NewSimpleClientset()
fakeLastSync := client.LastSyncState{testInstance: map[string]time.Time{}}

for _, tc := range testcases {
fakeLastSync[testInstance][tc.change.Project] = timeNow.Add(-time.Minute)
}

c := &Controller{
config: fca.Config,
prowJobClient: fakeProwJobClient.ProwV1().ProwJobs("prowjobs"),
gc: &fgc{},
tracker: &fakeSync{val: timeNow.Add(-time.Minute)},
tracker: &fakeSync{val: fakeLastSync},
}

err := c.ProcessChange("https://gerrit", tc.change)
err := c.ProcessChange(testInstance, tc.change)
if err != nil && !tc.shouldError {
t.Errorf("tc %s, expect no error, but got %v", tc.name, err)
continue
Expand Down
27 changes: 24 additions & 3 deletions prow/gerrit/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,20 @@ type RevisionInfo = gerrit.RevisionInfo
// FileInfo is a gerrit.FileInfo
type FileInfo = gerrit.FileInfo

// Map from instance name to repos to lastsync time for that repo
type LastSyncState map[string]map[string]time.Time

func (l LastSyncState) DeepCopy() LastSyncState {
result := LastSyncState{}
for host, lastSyncs := range l {
result[host] = map[string]time.Time{}
for projects, lastSync := range lastSyncs {
result[host][projects] = lastSync
}
}
return result
}

// NewClient returns a new gerrit client
func NewClient(instances map[string][]string) (*Client, error) {
c := &Client{
Expand Down Expand Up @@ -193,10 +207,11 @@ func (c *Client) Start(cookiefilePath string) {

// QueryChanges queries for all changes from all projects after lastUpdate time
// returns an instance:changes map
func (c *Client) QueryChanges(lastUpdate time.Time, rateLimit int) map[string][]ChangeInfo {
func (c *Client) QueryChanges(lastState LastSyncState, rateLimit int) map[string][]ChangeInfo {
result := map[string][]ChangeInfo{}
for _, h := range c.handlers {
changes := h.queryAllChanges(lastUpdate, rateLimit)
lastStateForInstance := lastState[h.instance]
changes := h.queryAllChanges(lastStateForInstance, rateLimit)
if len(changes) > 0 {
result[h.instance] = []ChangeInfo{}
for _, change := range changes {
Expand Down Expand Up @@ -246,9 +261,15 @@ func (c *Client) Account(instance string) *gerrit.AccountInfo {

// private handler implementation details

func (h *gerritInstanceHandler) queryAllChanges(lastUpdate time.Time, rateLimit int) []gerrit.ChangeInfo {
func (h *gerritInstanceHandler) queryAllChanges(lastState map[string]time.Time, rateLimit int) []gerrit.ChangeInfo {
result := []gerrit.ChangeInfo{}
timeNow := time.Now()
for _, project := range h.projects {
lastUpdate, ok := lastState[project]
if !ok {
logrus.WithField("project", project).Warnf("could not find lastTime for project %q, probably something went wrong with initTracker?", project)
lastUpdate = timeNow
}
changes, err := h.queryChangesForProject(project, lastUpdate, rateLimit)
if err != nil {
// don't halt on error from one project, log & continue
Expand Down
Loading

0 comments on commit 4a144ea

Please sign in to comment.