Skip to content

Commit cb4d567

Browse files
Remove some code, move some code around
Remove code that was just wrapping other functions at this point, and move some code around. We need to get a better idea what is actually still needed in the pool manager, to begin to refactor it into something that can scale out. Signed-off-by: Gabriel Adrian Samfira <gsamfira@cloudbasesolutions.com>
1 parent 9525e01 commit cb4d567

File tree

3 files changed

+96
-119
lines changed

3 files changed

+96
-119
lines changed

runner/pool/locking.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package pool
2+
3+
import "sync"
4+
5+
type keyMutex struct {
6+
muxes sync.Map
7+
}
8+
9+
func (k *keyMutex) TryLock(key string) bool {
10+
mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{})
11+
keyMux := mux.(*sync.Mutex)
12+
return keyMux.TryLock()
13+
}
14+
15+
func (k *keyMutex) Unlock(key string, remove bool) {
16+
mux, ok := k.muxes.Load(key)
17+
if !ok {
18+
return
19+
}
20+
keyMux := mux.(*sync.Mutex)
21+
if remove {
22+
k.Delete(key)
23+
}
24+
keyMux.Unlock()
25+
}
26+
27+
func (k *keyMutex) Delete(key string) {
28+
k.muxes.Delete(key)
29+
}

runner/pool/pool.go

Lines changed: 24 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -62,32 +62,6 @@ const (
6262
maxCreateAttempts = 5
6363
)
6464

65-
type keyMutex struct {
66-
muxes sync.Map
67-
}
68-
69-
func (k *keyMutex) TryLock(key string) bool {
70-
mux, _ := k.muxes.LoadOrStore(key, &sync.Mutex{})
71-
keyMux := mux.(*sync.Mutex)
72-
return keyMux.TryLock()
73-
}
74-
75-
func (k *keyMutex) Unlock(key string, remove bool) {
76-
mux, ok := k.muxes.Load(key)
77-
if !ok {
78-
return
79-
}
80-
keyMux := mux.(*sync.Mutex)
81-
if remove {
82-
k.Delete(key)
83-
}
84-
keyMux.Unlock()
85-
}
86-
87-
func (k *keyMutex) Delete(key string) {
88-
k.muxes.Delete(key)
89-
}
90-
9165
type urls struct {
9266
callbackURL string
9367
metadataURL string
@@ -288,7 +262,7 @@ func (r *basePoolManager) HandleWorkflowJob(job params.WorkflowJob) error {
288262

289263
// A runner has picked up the job, and is now running it. It may need to be replaced if the pool has
290264
// a minimum number of idle runners configured.
291-
pool, err := r.GetPoolByID(instance.PoolID)
265+
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
292266
if err != nil {
293267
return errors.Wrap(err, "getting pool")
294268
}
@@ -386,37 +360,6 @@ func (r *basePoolManager) updateTools() error {
386360
return err
387361
}
388362

389-
func controllerIDFromLabels(labels []string) string {
390-
for _, lbl := range labels {
391-
if strings.HasPrefix(lbl, controllerLabelPrefix) {
392-
return lbl[len(controllerLabelPrefix):]
393-
}
394-
}
395-
return ""
396-
}
397-
398-
func labelsFromRunner(runner *github.Runner) []string {
399-
if runner == nil || runner.Labels == nil {
400-
return []string{}
401-
}
402-
403-
var labels []string
404-
for _, val := range runner.Labels {
405-
if val == nil {
406-
continue
407-
}
408-
labels = append(labels, val.GetName())
409-
}
410-
return labels
411-
}
412-
413-
// isManagedRunner returns true if labels indicate the runner belongs to a pool
414-
// this manager is responsible for.
415-
func (r *basePoolManager) isManagedRunner(labels []string) bool {
416-
runnerControllerID := controllerIDFromLabels(labels)
417-
return runnerControllerID == r.controllerID
418-
}
419-
420363
// cleanupOrphanedProviderRunners compares runners in github with local runners and removes
421364
// any local runners that are not present in Github. Runners that are "idle" in our
422365
// provider, but do not exist in github, will be removed. This can happen if the
@@ -425,14 +368,14 @@ func (r *basePoolManager) isManagedRunner(labels []string) bool {
425368
// If we were offline and did not process the webhook, the instance will linger.
426369
// We need to remove it from the provider and database.
427370
func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runner) error {
428-
dbInstances, err := r.FetchDbInstances()
371+
dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity)
429372
if err != nil {
430373
return errors.Wrap(err, "fetching instances from db")
431374
}
432375

433376
runnerNames := map[string]bool{}
434377
for _, run := range runners {
435-
if !r.isManagedRunner(labelsFromRunner(run)) {
378+
if !isManagedRunner(labelsFromRunner(run), r.controllerID) {
436379
slog.DebugContext(
437380
r.ctx, "runner is not managed by a pool we manage",
438381
"runner_name", run.GetName())
@@ -460,7 +403,7 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
460403
continue
461404
}
462405

463-
pool, err := r.GetPoolByID(instance.PoolID)
406+
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
464407
if err != nil {
465408
return errors.Wrap(err, "fetching instance pool info")
466409
}
@@ -501,14 +444,14 @@ func (r *basePoolManager) cleanupOrphanedProviderRunners(runners []*github.Runne
501444
// of "running" in the provider, but that has not registered with Github, and has
502445
// received no new updates in the configured timeout interval.
503446
func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
504-
dbInstances, err := r.FetchDbInstances()
447+
dbInstances, err := r.store.ListEntityInstances(r.ctx, r.entity)
505448
if err != nil {
506449
return errors.Wrap(err, "fetching instances from db")
507450
}
508451

509452
runnersByName := map[string]*github.Runner{}
510453
for _, run := range runners {
511-
if !r.isManagedRunner(labelsFromRunner(run)) {
454+
if !isManagedRunner(labelsFromRunner(run), r.controllerID) {
512455
slog.DebugContext(
513456
r.ctx, "runner is not managed by a pool we manage",
514457
"runner_name", run.GetName())
@@ -530,7 +473,7 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
530473
}
531474
defer r.keyMux.Unlock(instance.Name, false)
532475

533-
pool, err := r.GetPoolByID(instance.PoolID)
476+
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
534477
if err != nil {
535478
return errors.Wrap(err, "fetching instance pool info")
536479
}
@@ -558,15 +501,6 @@ func (r *basePoolManager) reapTimedOutRunners(runners []*github.Runner) error {
558501
return nil
559502
}
560503

561-
func instanceInList(instanceName string, instances []commonParams.ProviderInstance) (commonParams.ProviderInstance, bool) {
562-
for _, val := range instances {
563-
if val.Name == instanceName {
564-
return val, true
565-
}
566-
}
567-
return commonParams.ProviderInstance{}, false
568-
}
569-
570504
// cleanupOrphanedGithubRunners will forcefully remove any github runners that appear
571505
// as offline and for which we no longer have a local instance.
572506
// This may happen if someone manually deletes the instance in the provider. We need to
@@ -575,7 +509,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
575509
poolInstanceCache := map[string][]commonParams.ProviderInstance{}
576510
g, ctx := errgroup.WithContext(r.ctx)
577511
for _, runner := range runners {
578-
if !r.isManagedRunner(labelsFromRunner(runner)) {
512+
if !isManagedRunner(labelsFromRunner(runner), r.controllerID) {
579513
slog.DebugContext(
580514
r.ctx, "runner is not managed by a pool we manage",
581515
"runner_name", runner.GetName())
@@ -598,7 +532,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
598532
slog.InfoContext(
599533
r.ctx, "Runner has no database entry in garm, removing from github",
600534
"runner_name", runner.GetName())
601-
resp, err := r.RemoveGithubRunner(*runner.ID)
535+
resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
602536
if err != nil {
603537
// Removed in the meantime?
604538
if resp != nil && resp.StatusCode == http.StatusNotFound {
@@ -632,7 +566,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
632566
}
633567
}
634568

635-
pool, err := r.GetPoolByID(dbInstance.PoolID)
569+
pool, err := r.store.GetEntityPool(r.ctx, r.entity, dbInstance.PoolID)
636570
if err != nil {
637571
return errors.Wrap(err, "fetching pool")
638572
}
@@ -678,7 +612,7 @@ func (r *basePoolManager) cleanupOrphanedGithubRunners(runners []*github.Runner)
678612
slog.InfoContext(
679613
r.ctx, "Runner instance is no longer on the provider, removing from github",
680614
"runner_name", dbInstance.Name)
681-
resp, err := r.RemoveGithubRunner(*runner.ID)
615+
resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
682616
if err != nil {
683617
// Removed in the meantime?
684618
if resp != nil && resp.StatusCode == http.StatusNotFound {
@@ -749,16 +683,7 @@ func (r *basePoolManager) setInstanceRunnerStatus(runnerName string, status para
749683
updateParams := params.UpdateInstanceParams{
750684
RunnerStatus: status,
751685
}
752-
753-
instance, err := r.updateInstance(runnerName, updateParams)
754-
if err != nil {
755-
return params.Instance{}, errors.Wrap(err, "updating runner state")
756-
}
757-
return instance, nil
758-
}
759-
760-
func (r *basePoolManager) updateInstance(runnerName string, update params.UpdateInstanceParams) (params.Instance, error) {
761-
instance, err := r.store.UpdateInstance(r.ctx, runnerName, update)
686+
instance, err := r.store.UpdateInstance(r.ctx, runnerName, updateParams)
762687
if err != nil {
763688
return params.Instance{}, errors.Wrap(err, "updating runner state")
764689
}
@@ -771,15 +696,15 @@ func (r *basePoolManager) setInstanceStatus(runnerName string, status commonPara
771696
ProviderFault: providerFault,
772697
}
773698

774-
instance, err := r.updateInstance(runnerName, updateParams)
699+
instance, err := r.store.UpdateInstance(r.ctx, runnerName, updateParams)
775700
if err != nil {
776701
return params.Instance{}, errors.Wrap(err, "updating runner state")
777702
}
778703
return instance, nil
779704
}
780705

781706
func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditionalLabels []string) (err error) {
782-
pool, err := r.GetPoolByID(poolID)
707+
pool, err := r.store.GetEntityPool(r.ctx, r.entity, poolID)
783708
if err != nil {
784709
return errors.Wrap(err, "fetching pool")
785710
}
@@ -838,7 +763,7 @@ func (r *basePoolManager) AddRunner(ctx context.Context, poolID string, aditiona
838763
}
839764

840765
if runner != nil {
841-
_, runnerCleanupErr := r.RemoveGithubRunner(runner.GetID())
766+
_, runnerCleanupErr := r.ghcli.RemoveEntityRunner(r.ctx, runner.GetID())
842767
if err != nil {
843768
slog.With(slog.Any("error", runnerCleanupErr)).ErrorContext(
844769
ctx, "failed to remove runner",
@@ -888,7 +813,7 @@ func (r *basePoolManager) getLabelsForInstance(pool params.Pool) []string {
888813
}
889814

890815
func (r *basePoolManager) addInstanceToProvider(instance params.Instance) error {
891-
pool, err := r.GetPoolByID(instance.PoolID)
816+
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
892817
if err != nil {
893818
return errors.Wrap(err, "fetching pool")
894819
}
@@ -1332,7 +1257,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
13321257
ctx, "queueing previously failed instance for retry",
13331258
"runner_name", instance.Name)
13341259
// Set instance to pending create and wait for retry.
1335-
if _, err := r.updateInstance(instance.Name, updateParams); err != nil {
1260+
if _, err := r.store.UpdateInstance(r.ctx, instance.Name, updateParams); err != nil {
13361261
slog.With(slog.Any("error", err)).ErrorContext(
13371262
ctx, "failed to update runner status",
13381263
"runner_name", instance.Name)
@@ -1347,7 +1272,7 @@ func (r *basePoolManager) retryFailedInstancesForOnePool(ctx context.Context, po
13471272
}
13481273

13491274
func (r *basePoolManager) retryFailedInstances() error {
1350-
pools, err := r.ListPools()
1275+
pools, err := r.store.ListEntityPools(r.ctx, r.entity)
13511276
if err != nil {
13521277
return fmt.Errorf("error listing pools: %w", err)
13531278
}
@@ -1370,7 +1295,7 @@ func (r *basePoolManager) retryFailedInstances() error {
13701295
}
13711296

13721297
func (r *basePoolManager) scaleDown() error {
1373-
pools, err := r.ListPools()
1298+
pools, err := r.store.ListEntityPools(r.ctx, r.entity)
13741299
if err != nil {
13751300
return fmt.Errorf("error listing pools: %w", err)
13761301
}
@@ -1391,7 +1316,7 @@ func (r *basePoolManager) scaleDown() error {
13911316
}
13921317

13931318
func (r *basePoolManager) ensureMinIdleRunners() error {
1394-
pools, err := r.ListPools()
1319+
pools, err := r.store.ListEntityPools(r.ctx, r.entity)
13951320
if err != nil {
13961321
return fmt.Errorf("error listing pools: %w", err)
13971322
}
@@ -1411,7 +1336,7 @@ func (r *basePoolManager) ensureMinIdleRunners() error {
14111336
}
14121337

14131338
func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instance params.Instance) error {
1414-
pool, err := r.GetPoolByID(instance.PoolID)
1339+
pool, err := r.store.GetEntityPool(r.ctx, r.entity, instance.PoolID)
14151340
if err != nil {
14161341
return errors.Wrap(err, "fetching pool")
14171342
}
@@ -1441,7 +1366,7 @@ func (r *basePoolManager) deleteInstanceFromProvider(ctx context.Context, instan
14411366
}
14421367

14431368
func (r *basePoolManager) deletePendingInstances() error {
1444-
instances, err := r.FetchDbInstances()
1369+
instances, err := r.store.ListEntityInstances(r.ctx, r.entity)
14451370
if err != nil {
14461371
return fmt.Errorf("failed to fetch instances from store: %w", err)
14471372
}
@@ -1529,7 +1454,7 @@ func (r *basePoolManager) deletePendingInstances() error {
15291454
func (r *basePoolManager) addPendingInstances() error {
15301455
// nolint:golangci-lint,godox
15311456
// TODO: filter instances by status.
1532-
instances, err := r.FetchDbInstances()
1457+
instances, err := r.store.ListEntityInstances(r.ctx, r.entity)
15331458
if err != nil {
15341459
return fmt.Errorf("failed to fetch instances from store: %w", err)
15351460
}
@@ -1717,7 +1642,7 @@ func (r *basePoolManager) DeleteRunner(runner params.Instance, forceRemove, bypa
17171642
return runnerErrors.NewConflictError("pool manager is not running for %s", r.entity.String())
17181643
}
17191644
if runner.AgentID != 0 {
1720-
resp, err := r.RemoveGithubRunner(runner.AgentID)
1645+
resp, err := r.ghcli.RemoveEntityRunner(r.ctx, runner.AgentID)
17211646
if err != nil {
17221647
if resp != nil {
17231648
switch resp.StatusCode {
@@ -2083,14 +2008,6 @@ func (r *basePoolManager) GithubRunnerRegistrationToken() (string, error) {
20832008
return *tk.Token, nil
20842009
}
20852010

2086-
func (r *basePoolManager) RemoveGithubRunner(runnerID int64) (*github.Response, error) {
2087-
ghResp, err := r.ghcli.RemoveEntityRunner(r.ctx, runnerID)
2088-
if err != nil {
2089-
return nil, fmt.Errorf("removing runner: %w", err)
2090-
}
2091-
return ghResp, nil
2092-
}
2093-
20942011
func (r *basePoolManager) FetchTools() ([]commonParams.RunnerApplicationDownload, error) {
20952012
tools, ghResp, err := r.ghcli.ListEntityRunnerApplicationDownloads(r.ctx)
20962013
if err != nil {
@@ -2153,18 +2070,6 @@ func (r *basePoolManager) GithubURL() string {
21532070
return ""
21542071
}
21552072

2156-
func (r *basePoolManager) FetchDbInstances() ([]params.Instance, error) {
2157-
return r.store.ListEntityInstances(r.ctx, r.entity)
2158-
}
2159-
2160-
func (r *basePoolManager) ListPools() ([]params.Pool, error) {
2161-
return r.store.ListEntityPools(r.ctx, r.entity)
2162-
}
2163-
2164-
func (r *basePoolManager) GetPoolByID(poolID string) (params.Pool, error) {
2165-
return r.store.GetEntityPool(r.ctx, r.entity, poolID)
2166-
}
2167-
21682073
func (r *basePoolManager) GetWebhookInfo(ctx context.Context) (params.HookInfo, error) {
21692074
allHooks, err := r.listHooks(ctx)
21702075
if err != nil {

0 commit comments

Comments
 (0)