Skip to content

Commit

Permalink
move observer crons into dedicated agents for parallelism
Browse files Browse the repository at this point in the history
  • Loading branch information
michaeljguarino committed Sep 16, 2024
1 parent f097886 commit ab3c7b8
Show file tree
Hide file tree
Showing 12 changed files with 105 additions and 36 deletions.
4 changes: 2 additions & 2 deletions charts/controller/crds/deployments.plural.sh_observers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -553,15 +553,15 @@ spec:
- SEMVER
- LATEST
type: string
target:
type:
enum:
- OCI
- HELM
- GIT
type: string
required:
- order
- target
- type
type: object
required:
- crontab
Expand Down
2 changes: 1 addition & 1 deletion go/controller/api/v1alpha1/observer_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type ObserverPipelineAction struct {
type ObserverTarget struct {
// +kubebuilder:validation:Type:=string
// +kubebuilder:validation:Enum:=OCI;HELM;GIT
Type console.ObserverTargetType `json:"target"`
Type console.ObserverTargetType `json:"type"`

// +kubebuilder:validation:Optional
Format *string `json:"format,omitempty"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,15 +553,15 @@ spec:
- SEMVER
- LATEST
type: string
target:
type:
enum:
- OCI
- HELM
- GIT
type: string
required:
- order
- target
- type
type: object
required:
- crontab
Expand Down
2 changes: 1 addition & 1 deletion go/controller/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -1403,7 +1403,7 @@ _Appears in:_

| Field | Description | Default | Validation |
| --- | --- | --- | --- |
| `target` _[ObserverTargetType](#observertargettype)_ | | | Enum: [OCI HELM GIT] <br />Type: string <br /> |
| `type` _[ObserverTargetType](#observertargettype)_ | | | Enum: [OCI HELM GIT] <br />Type: string <br /> |
| `format` _string_ | | | Optional: {} <br /> |
| `order` _[ObserverTargetOrder](#observertargetorder)_ | | | Enum: [SEMVER LATEST] <br />Type: string <br /> |
| `helm` _[ObserverHelm](#observerhelm)_ | | | Optional: {} <br /> |
Expand Down
2 changes: 2 additions & 0 deletions lib/console/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ defmodule Console.Application do
{Registry, [keys: :unique, name: Console.Deployments.Pipelines.Supervisor.registry()]},
{Registry, [keys: :unique, name: Console.Deployments.Stacks.Worker.registry()]},
{Registry, [keys: :unique, name: Console.Deployments.Helm.Agent.registry()]},
{Registry, [keys: :unique, name: Console.Deployments.Observer.Worker.registry()]},
{Cluster.Supervisor, [topologies, [name: Console.ClusterSupervisor]]},
Console.Deployments.Git.Supervisor,
Console.Deployments.Stacks.Supervisor,
Console.Deployments.Helm.Server,
Console.Deployments.Pipelines.Supervisor,
Console.Deployments.Helm.Supervisor,
Console.Deployments.Observer.Supervisor,
Console.Deployments.Git.Kick,
Console.Deployments.Deprecations.Table,
Console.Deployments.Compatibilities.Table,
Expand Down
5 changes: 1 addition & 4 deletions lib/console/deployments/cron.ex
Original file line number Diff line number Diff line change
Expand Up @@ -294,10 +294,7 @@ defmodule Console.Deployments.Cron do
Observer.runnable()
|> Observer.ordered(asc: :id)
|> Repo.stream(method: :keyset)
|> Console.throttle()
|> Flow.from_enumerable()
|> Flow.map(&Console.Deployments.Observer.Runner.run/1)
|> Flow.run()
|> Enum.each(&Console.Deployments.Observer.Discovery.runner/1)
end

defp log({:ok, %{id: id}}, msg), do: "Successfully #{msg} for #{id}"
Expand Down
18 changes: 18 additions & 0 deletions lib/console/deployments/observer/discovery.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
defmodule Console.Deployments.Observer.Discovery do
alias Console.Deployments.Observer.Supervisor
alias Console.Schema.Observer

def runner(%Observer{} = observer), do: maybe_rpc(observer.id, Supervisor, :start_child, [observer])

defp maybe_rpc(id, module, func, args) do
me = node()
case worker_node(id) do
^me -> apply(module, func, args)
node -> :rpc.call(node, module, func, args)
end
end

def worker_node(id), do: HashRing.Managed.key_to_node(:cluster, id)

def local?(id), do: worker_node(id) == node()
end
17 changes: 17 additions & 0 deletions lib/console/deployments/observer/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
defmodule Console.Deployments.Observer.Supervisor do
use DynamicSupervisor
alias Console.Deployments.Observer.Worker

def start_link(init_arg \\ :ok) do
DynamicSupervisor.start_link(__MODULE__, init_arg, name: __MODULE__)
end

def start_child(run) do
DynamicSupervisor.start_child(__MODULE__, {Worker, run})
end

@impl true
def init(_init_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
end
56 changes: 56 additions & 0 deletions lib/console/deployments/observer/worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
defmodule Console.Deployments.Observer.Worker do
use GenServer, restart: :transient
alias Console.Schema.Observer
alias Console.Deployments.Observer.{Runner, Discovery}

require Logger

@poll :timer.minutes(1)

defmodule State, do: defstruct [:observer]

def registry(), do: __MODULE__

def start_link([%Observer{} = obs]), do: start_link(obs)
def start_link(%Observer{} = obs) do
GenServer.start_link(__MODULE__, obs, name: via(obs))
end

def init(observer) do
:timer.send_interval(@poll, :poll)
:timer.send_interval(@poll, :move)
send self(), :poll
{:ok, %State{observer: observer}}
end

def ping(pid), do: GenServer.call(pid, :ping)

defp via(%Observer{id: id}), do: {:via, Registry, {registry(), {:observer, id}}}

def handle_call(:ping, _, state), do: {:reply, :pong, state}

def handle_info(:poll, %State{observer: %Observer{next_run_at: at} = observer} = state) do
Logger.info "running observer #{observer.name}"
with {:at, true} <- {:at, Timex.after?(Timex.now(), at)},
{:ok, observer} <- Runner.run(refetch(observer)) do
Logger.info "ran observer #{observer.name}"
{:noreply, %{state | observer: observer}}
else
{:at, _} ->
Logger.info "cannot run observer #{observer.name} yet, next run at #{inspect(observer.next_run_at)}"
{:noreply, state}
{:error, err} ->
Logger.warn "failed to run observer #{observer.name}, error: #{inspect(err)}"
{:noreply, state}
end
end

def handle_info(:move, %State{observer: observer} = state) do
case Discovery.local?(observer) do
true -> {:noreply, state}
false -> {:stop, {:shutdown, :moved}, state}
end
end

defp refetch(%Observer{id: id}), do: Console.Repo.get(Observer, id)
end
8 changes: 3 additions & 5 deletions lib/console/deployments/stacks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,11 @@ defmodule Console.Deployments.Stacks do
def update_stack(attrs, id, %User{} = user) do
start_transaction()
|> add_operation(:stack, fn _ ->
stack = get_stack!(id)
|> preloaded()

stack
get_stack!(id)
|> preloaded()
|> allow(user, :write)
|> when_ok(fn s ->
Stack.changeset(s, Stability.stabilize(attrs, stack))
Stack.changeset(s, Stability.stabilize(attrs, s))
|> Stack.update_changeset()
end)
|> when_ok(:update)
Expand Down
4 changes: 2 additions & 2 deletions plural/helm/console/crds/deployments.plural.sh_observers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -553,15 +553,15 @@ spec:
- SEMVER
- LATEST
type: string
target:
type:
enum:
- OCI
- HELM
- GIT
type: string
required:
- order
- target
- type
type: object
required:
- crontab
Expand Down
19 changes: 0 additions & 19 deletions test/console/deployments/cron_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -284,22 +284,3 @@ defmodule Console.Deployments.CronTest do
end
end
end

defmodule Console.Deployments.AsyncCronTest do
use Console.DataCase, async: false
use Mimic
alias Console.Deployments.{Cron}

setup :set_mimic_global

describe "#run_observers/0" do
test "it can execute pending observers" do
insert_list(2, :observer, next_run_at: Timex.now() |> Timex.shift(minutes: -1))
insert(:observer, next_run_at: Timex.now() |> Timex.shift(minutes: 5))

expect(Console.Deployments.Observer.Runner, :run, 2, fn _ -> :ok end)

:ok = Cron.run_observers()
end
end
end

0 comments on commit ab3c7b8

Please sign in to comment.