Skip to content

Commit

Permalink
Optimize Janitor and enable provisioning
Browse files Browse the repository at this point in the history
  • Loading branch information
whitfin committed Sep 20, 2024
1 parent cee51f2 commit 55873d3
Showing 1 changed file with 72 additions and 13 deletions.
85 changes: 72 additions & 13 deletions lib/cachex/services/janitor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ defmodule Cachex.Services.Janitor do
scans, so it should be expected that this can take a while to execute.
"""
use GenServer
use Cachex.Provision

# import parent macros
import Cachex.Errors
import Cachex.Spec

# add some aliases
alias Cachex.Query
alias Cachex.Services
alias Services.Informant
alias Services.Overseer

##############
# Public API #
Expand Down Expand Up @@ -88,51 +89,109 @@ defmodule Cachex.Services.Janitor do
#
# This will create the structure used to store metadata about
# the run cycles of the Janitor, and schedule the first run.
def init(cache),
do: {:ok, {schedule_check(cache), %{}}}
def init(_),
do: {:ok, {nil, false, nil}}

@doc false
# Defines provisions required by this service.
def provisions,
do: [:cache]

@doc false
# Returns metadata about the last run of this Janitor.
#
# The returned information should be treated as non-guaranteed.
def handle_call(:last, _ctx, {_cache, last} = state),
def handle_call(:last, _ctx, {_cache, _active, last} = state),
do: {:reply, {:ok, last}, state}

@doc false
# Executes an expiration cleanup against a cache table.
#
# This will drop to the ETS level and use a select to match documents which
# need to be removed; they are then deleted by ETS at very high speeds.
def handle_info(:purge, {cache(name: name), _last}) do
start_time = now()
new_caches = Overseer.retrieve(name)
def handle_info(:purge, {cache, false, _last}) do
started = now()

{duration, active} =
:timer.tc(fn ->
query =
Query.build(
where: {:not, {:==, :expiration, nil}},
output: true,
batch_size: 1
)

cache
|> Cachex.stream!(query, const(:local) ++ const(:notify_false))
|> Enum.empty?()
end)

last = %{
count: 0,
started: started,
duration: duration
}

handle_activation(active, {cache, false, last})
end

@doc false
# Executes an expiration cleanup against a cache table.
#
# This will drop to the ETS level and use a select to match documents which
# need to be removed; they are then deleted by ETS at very high speeds.
def handle_info(:purge, {cache, true, _last}) do
started = now()

{duration, {:ok, count} = result} =
:timer.tc(fn ->
Cachex.purge(new_caches, const(:local))
Cachex.purge(cache, const(:local))
end)

case count do
0 -> nil
_ -> Informant.broadcast(new_caches, const(:purge_override_call), result)
_ -> Informant.broadcast(cache, const(:purge_override_call), result)
end

last = %{
count: count,
duration: duration,
started: start_time
started: started,
duration: duration
}

{:noreply, {schedule_check(new_caches), last}}
{:noreply, {schedule(cache), true, last}}
end

@doc false
# Receives a provisioned cache instance.
#
# The provided cache is then stored in the state and used for cache calls going
# forwards, in order to skip the lookups inside the cache overseer for performance.
def handle_provision({:cache, cache}, {nil, active, last}),
do: {:ok, {schedule(cache), active, last}}

def handle_provision({:cache, cache}, {_cache, active, last}),
do: {:ok, {cache, active, last}}

###############
# Private API #
###############

@doc false
# Handles lazy Janitor activation paradigms.
#
# If the first argument is true, it reflects that there are no records with
# an expiration defined so we re-schedule and skip. In the case there are
# records with expiration defined, we continue to the main purge cycle.
defp handle_activation(true, {cache, _active, last}),
do: {:noreply, {schedule(cache), false, last}}

defp handle_activation(false, {cache, _active, last}),
do: handle_info(:purge, {cache, true, last})

# Schedules a check to occur after the designated interval. Once scheduled,
# returns the state - this is just sugar for pipelining with a state.
defp schedule_check(cache(expiration: expiration(interval: interval)) = cache) do
defp schedule(cache(expiration: expiration(interval: interval)) = cache) do
:erlang.send_after(interval, self(), :purge)
cache
end
Expand Down

0 comments on commit 55873d3

Please sign in to comment.