Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions runner/pool/locking.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package pool

import "sync"

type keyMutex struct {
muxes sync.Map
}

func (k *keyMutex) TryLock(key string) bool {
mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{})
keyMux := mux.(*sync.Mutex)
return keyMux.TryLock()
}

func (k *keyMutex) Unlock(key string, remove bool) {
mux, ok := k.muxes.Load(key)
if !ok {
return
}
keyMux := mux.(*sync.Mutex)
if remove {
k.Delete(key)
}
keyMux.Unlock()
}

func (k *keyMutex) Delete(key string) {
k.muxes.Delete(key)
}
143 changes: 24 additions & 119 deletions runner/pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,32 +62,6 @@ const (
maxCreateAttempts = 5
)

type keyMutex struct {
muxes sync.Map
}

func (k *keyMutex) TryLock(key string) bool {
mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{})
keyMux := mux.(*sync.Mutex)
return keyMux.TryLock()
}

func (k *keyMutex) Unlock(key string, remove bool) {
mux, ok := k.muxes.Load(key)
if !ok {
return
}
keyMux := mux.(*sync.Mutex)
if remove {
k.Delete(key)
}
keyMux.Unlock()
}

func (k *keyMutex) Delete(key string) {
k.muxes.Delete(key)
}

type urls struct {
callbackURL string
metadataURL string
Expand Down Expand Up @@ -288,7 +262,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {

// A runner has picked up the job, and is now running it. It may need to be replaced if the pool has
// a minimum number of idle runners configured.
pool, err := r.GetPoolByID(instance.PoolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "getting pool")
}
Expand Down Expand Up @@ -386,37 +360,6 @@ func (r *basePoolManager) updateTools() error {
return err
}

func controllerIDFromLabels(labels []string) string {
for _, lbl := range labels {
if strings.HasPrefix(lbl, controllerLabelPrefix) {
return lbl[len(controllerLabelPrefix):]
}
}
return ""
}

func labelsFromRunner(runner *github.Runner) []string {
if runner == nil || runner.Labels == nil {
return []string{}
}

var labels []string
for _, val := range runner.Labels {
if val == nil {
continue
}
labels = append(labels, val.GetName())
}
return labels
}

// isManagedRunner returns true if labels indicate the runner belongs to a pool
// this manager is responsible for.
func (r *basePoolManager) isManagedRunner(labels []string) bool {
runnerControllerID := controllerIDFromLabels(labels)
return runnerControllerID == r.controllerID
}

// cleanupOrphanedProviderRunners compares runners in github with local runners and removes
// any local runners that are not present in Github. Runners that are "idle" in our
// provider, but do not exist in github, will be removed. This can happen if the
Expand All @@ -425,14 +368,14 @@ func (r *basePoolManager) isManagedRunner(labels []string) bool {
// If we were offline and did not process the webhook, the instance will linger.
// We need to remove it from the provider and database.
func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
dbInstances, err := r.FetchDbInstances()
dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity)
if err != nil {
return errors.Wrap(err, "fetching instances from db")
}

runnerNames := map[string]bool{}
for _, run := range runners {
if !r.isManagedRunner(labelsFromRunner(run)) {
if !isManagedRunner(labelsFromRunner(run), r.controllerID) {
slog.DebugContext(
r.ctx, "runner is not managed by a pool we manage",
"runner_name", run.GetName())
Expand Down Expand Up @@ -460,7 +403,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
continue
}

pool, err := r.GetPoolByID(instance.PoolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching instance pool info")
}
Expand Down Expand Up @@ -501,14 +444,14 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
// of "running" in the provider, but that has not registered with Github, and has
// received no new updates in the configured timeout interval.
func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
dbInstances, err := r.FetchDbInstances()
dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity)
if err != nil {
return errors.Wrap(err, "fetching instances from db")
}

runnersByName := map[string]*github.Runner{}
for _, run := range runners {
if !r.isManagedRunner(labelsFromRunner(run)) {
if !isManagedRunner(labelsFromRunner(run), r.controllerID) {
slog.DebugContext(
r.ctx, "runner is not managed by a pool we manage",
"runner_name", run.GetName())
Expand All @@ -530,7 +473,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
}
defer r.keyMux.Unlock(instance.Name, false)

pool, err := r.GetPoolByID(instance.PoolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching instance pool info")
}
Expand Down Expand Up @@ -558,15 +501,6 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
return nil
}

func instanceInList(instanceName string, instances []commonParams.ProviderInstance) (commonParams.ProviderInstance, bool) {
for _, val := range instances {
if val.Name == instanceName {
return val, true
}
}
return commonParams.ProviderInstance{}, false
}

// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear
// as offline and for which we no longer have a local instance.
// This may happen if someone manually deletes the instance in the provider. We need to
Expand All @@ -575,7 +509,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
poolInstanceCache := map[string][]commonParams.ProviderInstance{}
g, ctx := errgroup.WithContext(r.ctx)
for _, runner := range runners {
if !r.isManagedRunner(labelsFromRunner(runner)) {
if !isManagedRunner(labelsFromRunner(runner), r.controllerID) {
slog.DebugContext(
r.ctx, "runner is not managed by a pool we manage",
"runner_name", runner.GetName())
Expand All @@ -598,7 +532,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
slog.InfoContext(
r.ctx, "Runner has no database entry in garm, removing from github",
"runner_name", runner.GetName())
resp, err := r.RemoveGithubRunner(*runner.ID)
resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
if err != nil {
// Removed in the meantime?
if resp != nil && resp.StatusCode == http.StatusNotFound {
Expand Down Expand Up @@ -632,7 +566,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
}
}

pool, err := r.GetPoolByID(dbInstance.PoolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, dbInstance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
Expand Down Expand Up @@ -678,7 +612,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
slog.InfoContext(
r.ctx, "Runner instance is no longer on the provider, removing from github",
"runner_name", dbInstance.Name)
resp, err := r.RemoveGithubRunner(*runner.ID)
resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
if err != nil {
// Removed in the meantime?
if resp != nil && resp.StatusCode == http.StatusNotFound {
Expand Down Expand Up @@ -749,16 +683,7 @@ func (r *basePoolManager) setInstanceRunnerStatus(runnerName string, status para
updateParams := params.UpdateInstanceParams{
RunnerStatus: status,
}

instance, err := r.updateInstance(runnerName, updateParams)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
return instance, nil
}

func (r *basePoolManager) updateInstance(runnerName string, update params.UpdateInstanceParams) (params.Instance, error) {
instance, err := r.store.UpdateInstance(r.ctx, runnerName, update)
instance, err := r.store.UpdateInstance(r.ctx, runnerName, updateParams)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
Expand All @@ -771,15 +696,15 @@ func (r *basePoolManager) setInstanceStatus(runnerName string, status commonPara
ProviderFault: providerFault,
}

instance, err := r.updateInstance(runnerName, updateParams)
instance, err := r.store.UpdateInstance(r.ctx, runnerName, updateParams)
if err != nil {
return params.Instance{}, errors.Wrap(err, "updating runner state")
}
return instance, nil
}

func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditionalLabels []string) (err error) {
pool, err := r.GetPoolByID(poolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, poolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
Expand Down Expand Up @@ -838,7 +763,7 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditiona
}

if runner != nil {
_, runnerCleanupErr := r.RemoveGithubRunner(runner.GetID())
_, runnerCleanupErr := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
if err != nil {
slog.With(slog.Any("error", runnerCleanupErr)).ErrorContext(
ctx, "failed to remove runner",
Expand Down Expand Up @@ -888,7 +813,7 @@ func (r *basePoolManager) getLabelsForInstance(pool params.Pool) []string {
}

func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error {
pool, err := r.GetPoolByID(instance.PoolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
Expand Down Expand Up @@ -1332,7 +1257,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
ctx, "queueing previously failed instance for retry",
"runner_name", instance.Name)
// Set instance to pending create and wait for retry.
if _, err := r.updateInstance(instance.Name, updateParams); err != nil {
if _, err := r.store.UpdateInstance(r.ctx, instance.Name, updateParams); err != nil {
slog.With(slog.Any("error", err)).ErrorContext(
ctx, "failed to update runner status",
"runner_name", instance.Name)
Expand All @@ -1347,7 +1272,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
}

func (r *basePoolManager) retryFailedInstances() error {
pools, err := r.ListPools()
pools, err := r.store.ListEntityPools(r.ctx, r.entity)
if err != nil {
return fmt.Errorf("error listing pools: %w", err)
}
Expand All @@ -1370,7 +1295,7 @@ func (r *basePoolManager) retryFailedInstances() error {
}

func (r *basePoolManager) scaleDown() error {
pools, err := r.ListPools()
pools, err := r.store.ListEntityPools(r.ctx, r.entity)
if err != nil {
return fmt.Errorf("error listing pools: %w", err)
}
Expand All @@ -1391,7 +1316,7 @@ func (r *basePoolManager) scaleDown() error {
}

func (r *basePoolManager) ensureMinIdleRunners() error {
pools, err := r.ListPools()
pools, err := r.store.ListEntityPools(r.ctx, r.entity)
if err != nil {
return fmt.Errorf("error listing pools: %w", err)
}
Expand All @@ -1411,7 +1336,7 @@ func (r *basePoolManager) ensureMinIdleRunners() error {
}

func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instance params.Instance) error {
pool, err := r.GetPoolByID(instance.PoolID)
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
if err != nil {
return errors.Wrap(err, "fetching pool")
}
Expand Down Expand Up @@ -1441,7 +1366,7 @@ func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instan
}

func (r *basePoolManager) deletePendingInstances() error {
instances, err := r.FetchDbInstances()
instances, err := r.store.ListEntityInstances(r.ctx, r.entity)
if err != nil {
return fmt.Errorf("failed to fetch instances from store: %w", err)
}
Expand Down Expand Up @@ -1529,7 +1454,7 @@ func (r *basePoolManager) deletePendingInstances() error {
func (r *basePoolManager) addPendingInstances() error {
// nolint:golangci-lint,godox
// TODO: filter instances by status.
instances, err := r.FetchDbInstances()
instances, err := r.store.ListEntityInstances(r.ctx, r.entity)
if err != nil {
return fmt.Errorf("failed to fetch instances from store: %w", err)
}
Expand Down Expand Up @@ -1717,7 +1642,7 @@ func (r *basePoolManager) DeleteRunner(runner params.Instance, forceRemove, bypa
return runnerErrors.NewConflictError("pool manager is not running for %s", r.entity.String())
}
if runner.AgentID != 0 {
resp, err := r.RemoveGithubRunner(runner.AgentID)
resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.AgentID)
if err != nil {
if resp != nil {
switch resp.StatusCode {
Expand Down Expand Up @@ -2083,14 +2008,6 @@ func (r *basePoolManager) GithubRunnerRegistrationToken() (string, error) {
return *tk.Token, nil
}

func (r *basePoolManager) RemoveGithubRunner(runnerID int64) (*github.Response, error) {
ghResp, err := r.ghcli.RemoveEntityRunner(r.ctx, runnerID)
if err != nil {
return nil, fmt.Errorf("removing runner: %w", err)
}
return ghResp, nil
}

func (r *basePoolManager) FetchTools() ([]commonParams.RunnerApplicationDownload, error) {
tools, ghResp, err := r.ghcli.ListEntityRunnerApplicationDownloads(r.ctx)
if err != nil {
Expand Down Expand Up @@ -2153,18 +2070,6 @@ func (r *basePoolManager) GithubURL() string {
return ""
}

func (r *basePoolManager) FetchDbInstances() ([]params.Instance, error) {
return r.store.ListEntityInstances(r.ctx, r.entity)
}

func (r *basePoolManager) ListPools() ([]params.Pool, error) {
return r.store.ListEntityPools(r.ctx, r.entity)
}

func (r *basePoolManager) GetPoolByID(poolID string) (params.Pool, error) {
return r.store.GetEntityPool(r.ctx, r.entity, poolID)
}

func (r *basePoolManager) GetWebhookInfo(ctx context.Context) (params.HookInfo, error) {
allHooks, err := r.listHooks(ctx)
if err != nil {
Expand Down
Loading