From 55873d32ce605bee41e1fc86d302c038dae5bf68 Mon Sep 17 00:00:00 2001 From: Isaac Whitfield Date: Fri, 20 Sep 2024 06:55:54 -0700 Subject: [PATCH] Optimize Janitor and enable provisioning --- lib/cachex/services/janitor.ex | 85 ++++++++++++++++++++++++++++------ 1 file changed, 72 insertions(+), 13 deletions(-) diff --git a/lib/cachex/services/janitor.ex b/lib/cachex/services/janitor.ex index 0122c48..f836d7f 100644 --- a/lib/cachex/services/janitor.ex +++ b/lib/cachex/services/janitor.ex @@ -12,15 +12,16 @@ defmodule Cachex.Services.Janitor do scans, so it should be expected that this can take a while to execute. """ use GenServer + use Cachex.Provision # import parent macros import Cachex.Errors import Cachex.Spec # add some aliases + alias Cachex.Query alias Cachex.Services alias Services.Informant - alias Services.Overseer ############## # Public API # @@ -88,14 +89,19 @@ defmodule Cachex.Services.Janitor do # # This will create the structure used to store metadata about # the run cycles of the Janitor, and schedule the first run. - def init(cache), - do: {:ok, {schedule_check(cache), %{}}} + def init(_), + do: {:ok, {nil, false, nil}} + + @doc false + # Defines provisions required by this service. + def provisions, + do: [:cache] @doc false # Returns metadata about the last run of this Janitor. # # The returned information should be treated as non-guaranteed. - def handle_call(:last, _ctx, {_cache, last} = state), + def handle_call(:last, _ctx, {_cache, _active, last} = state), do: {:reply, {:ok, last}, state} @doc false @@ -103,36 +109,89 @@ defmodule Cachex.Services.Janitor do # # This will drop to the ETS level and use a select to match documents which # need to be removed; they are then deleted by ETS at very high speeds. - def handle_info(:purge, {cache(name: name), _last}) do - start_time = now() - new_caches = Overseer.retrieve(name) + def handle_info(:purge, {cache, false, _last}) do + started = now() + + {duration, active} = + :timer.tc(fn -> + query = + Query.build( + where: {:not, {:==, :expiration, nil}}, + output: true, + batch_size: 1 + ) + + cache + |> Cachex.stream!(query, const(:local) ++ const(:notify_false)) + |> Enum.empty?() + end) + + last = %{ + count: 0, + started: started, + duration: duration + } + + handle_activation(active, {cache, false, last}) + end + + @doc false + # Executes an expiration cleanup against a cache table. + # + # This will drop to the ETS level and use a select to match documents which + # need to be removed; they are then deleted by ETS at very high speeds. + def handle_info(:purge, {cache, true, _last}) do + started = now() {duration, {:ok, count} = result} = :timer.tc(fn -> - Cachex.purge(new_caches, const(:local)) + Cachex.purge(cache, const(:local)) end) case count do 0 -> nil - _ -> Informant.broadcast(new_caches, const(:purge_override_call), result) + _ -> Informant.broadcast(cache, const(:purge_override_call), result) end last = %{ count: count, - duration: duration, - started: start_time + started: started, + duration: duration } - {:noreply, {schedule_check(new_caches), last}} + {:noreply, {schedule(cache), true, last}} end + @doc false + # Receives a provisioned cache instance. + # + # The provided cache is then stored in the state and used for cache calls going + # forwards, in order to skip the lookups inside the cache overseer for performance. + def handle_provision({:cache, cache}, {nil, active, last}), + do: {:ok, {schedule(cache), active, last}} + + def handle_provision({:cache, cache}, {_cache, active, last}), + do: {:ok, {cache, active, last}} + ############### # Private API # ############### + @doc false + # Handles lazy Janitor activation paradigms. + # + # If the first argument is true, it reflects that there are no records with + # an expiration defined so we re-schedule and skip. In the case there are + # records with expiration defined, we continue to the main purge cycle. + defp handle_activation(true, {cache, _active, last}), + do: {:noreply, {schedule(cache), false, last}} + + defp handle_activation(false, {cache, _active, last}), + do: handle_info(:purge, {cache, true, last}) + # Schedules a check to occur after the designated interval. Once scheduled, # returns the state - this is just sugar for pipelining with a state. - defp schedule_check(cache(expiration: expiration(interval: interval)) = cache) do + defp schedule(cache(expiration: expiration(interval: interval)) = cache) do :erlang.send_after(interval, self(), :purge) cache end