|
| 1 | +defmodule FileCache do |
| 2 | + @type id :: term |
| 3 | + @type cache_name :: atom |
| 4 | + |
| 5 | + use Supervisor |
| 6 | + |
| 7 | + alias FileCache.AsyncPool |
| 8 | + alias FileCache.Temp |
| 9 | + alias FileCache.Perm |
| 10 | + alias FileCache.TempCleaner |
| 11 | + alias FileCache.StaleCleaner |
| 12 | + alias FileCache.Utils |
| 13 | + alias FileCache.Config |
| 14 | + |
| 15 | + defp options_schema() do |
| 16 | + namespace_single_type = {:or, [{:in, [nil, :host]}, {:fun, 0}, :mfa]} |
| 17 | + namespace_type = {:or, [namespace_single_type, {:list, namespace_single_type}]} |
| 18 | + |
| 19 | + [ |
| 20 | + name: [ |
| 21 | + type: {:custom, FileCache.Common, :validate_cache_name, []}, |
| 22 | + required: true |
| 23 | + ], |
| 24 | + dir: [ |
| 25 | + type: :string, |
| 26 | + required: true |
| 27 | + ], |
| 28 | + ttl: [ |
| 29 | + type: :pos_integer, |
| 30 | + # 1 Hour |
| 31 | + default: 1 * 60 * 60 * 1000 |
| 32 | + ], |
| 33 | + namespace: [ |
| 34 | + type: namespace_type, |
| 35 | + default: nil |
| 36 | + ], |
| 37 | + stale_clean_interval: [ |
| 38 | + type: :pos_integer, |
| 39 | + # 1 Hour |
| 40 | + default: 1 * 60 * 60 * 1000 |
| 41 | + ], |
| 42 | + temp_dir: [ |
| 43 | + type: :string, |
| 44 | + required: true |
| 45 | + ], |
| 46 | + # tmp directory: prefix by hostname # for NFS-like storages that might be shared |
| 47 | + temp_namespace: [ |
| 48 | + type: namespace_type, |
| 49 | + default: nil |
| 50 | + ], |
| 51 | + temp_clean_interval: [ |
| 52 | + type: :pos_integer, |
| 53 | + # 15 minutes |
| 54 | + default: 15 * 60 * 1000 |
| 55 | + ] |
| 56 | + ] |
| 57 | + end |
| 58 | + |
| 59 | + def init(opts) do |
| 60 | + config = |
| 61 | + opts |
| 62 | + |> NimbleOptions.validate!(options_schema()) |
| 63 | + |> Map.new() |
| 64 | + |
| 65 | + cache_name = config[:cache_name] |
| 66 | + |
| 67 | + Config.store(cache_name, config) |
| 68 | + |
| 69 | + children = [ |
| 70 | + [ |
| 71 | + {AsyncPool, cache_name}, |
| 72 | + TempCleaner, |
| 73 | + StaleCleaner |
| 74 | + ] |
| 75 | + ] |
| 76 | + |
| 77 | + Supervisor.start_link(children, strategy: :one_for_one) |
| 78 | + end |
| 79 | + |
| 80 | + defp op_options_schema do |
| 81 | + [ |
| 82 | + cache: [ |
| 83 | + type: :atom, |
| 84 | + required: true |
| 85 | + ], |
| 86 | + id: [ |
| 87 | + type: :string, |
| 88 | + required: true |
| 89 | + ], |
| 90 | + ttl: [ |
| 91 | + type: :pos_integer |
| 92 | + ], |
| 93 | + owner: [ |
| 94 | + type: :pid |
| 95 | + ], |
| 96 | + return: [ |
| 97 | + type: {:choice, [:filename, :data]}, |
| 98 | + default: :data |
| 99 | + ] |
| 100 | + ] |
| 101 | + end |
| 102 | + |
| 103 | + defp validate_op_options(opts) do |
| 104 | + NimbleOptions.validate!(opts, op_options_schema()) |
| 105 | + end |
| 106 | + |
| 107 | + # Try to read from cache, using data from the provided fallback otherwise |
| 108 | + # FileCache.execute(opts, "binary/iolist/stream or fn/0, that returns any of them") # {:ok | :commit | :ignore, filepath} | {:error, _} |
| 109 | + def execute(enum, id, opts \\ []) do |
| 110 | + opts = validate_op_options(opts) |
| 111 | + |
| 112 | + case do_get(id, opts) do |
| 113 | + {:ok, cached_stream} -> |
| 114 | + cached_stream |
| 115 | + |
| 116 | + {:error, :not_found} -> |
| 117 | + do_put(enum, id, opts) |
| 118 | + end |
| 119 | + end |
| 120 | + |
| 121 | + # Overwriting execute/2 |
| 122 | + # FileCache.put(opts, value) # {:ok, filepath} | {:error, } | no_return |
| 123 | + def put(enum, id, opts) do |
| 124 | + do_put(enum, id, validate_op_options(opts)) |
| 125 | + end |
| 126 | + |
| 127 | + # Returns absolute filepath to cached item (e.g. to use in Plug.Conn.send_file) |
| 128 | + # This is what must be used by default for sending files as-is over network |
| 129 | + ### FileCache.get(id) # filepath | nil | no_return |
| 130 | + # Returns lazy stream by default (or binary if binary: true is provided) |
| 131 | + ### FileCache.get(id) # (stream | binary) | nil |
| 132 | + def get(id, opts \\ []) do |
| 133 | + opts = validate_op_options(opts) |
| 134 | + cache_name = opts[:cache] |
| 135 | + |
| 136 | + Perm.find_for_id(id, cache_name) |
| 137 | + end |
| 138 | + |
| 139 | + # Some obvious operations too |
| 140 | + # FileCache.exists?(id) # boolean | no_return |
| 141 | + def exists?(id, opts) do |
| 142 | + id |
| 143 | + |> do_get(validate_op_options(opts)) |
| 144 | + |> Map.fetch!(:path) |
| 145 | + |> File.exists?() |
| 146 | + end |
| 147 | + |
| 148 | + # FileCache.del(id) # :ok | no_return |
| 149 | + def del(id, opts) do |
| 150 | + id |
| 151 | + |> do_get(validate_op_options(opts)) |
| 152 | + |> Map.fetch!(:path) |
| 153 | + |> Utils.rm_ignore_missing() |
| 154 | + end |
| 155 | + |
| 156 | + defp do_put(enum, id, opts) do |
| 157 | + cache_name = opts[:cache] |
| 158 | + |
| 159 | + StaleCleaner.schedule_clean(id, cache_name) |
| 160 | + |
| 161 | + temp_filepath = Temp.file_path(id, opts[:cache], opts) |
| 162 | + |
| 163 | + result = |
| 164 | + enum |
| 165 | + |> data_stream!() |
| 166 | + |> write_to_temp(temp_filepath) |
| 167 | + |
| 168 | + perm_filepath = Perm.file_path(id, opts[:cache], opts) |
| 169 | + move_from_temp(temp_filepath, perm_filepath) |
| 170 | + |
| 171 | + File.stream!(perm_filepath, [:binary], :byte) |
| 172 | + |
| 173 | + # 0. Check if it is present in cache, return if it is, otherwise... |
| 174 | + # 1. get stream/list/fun |
| 175 | + # 2. write data to temp file if list, stream-write to temp file otherwise |
| 176 | + # 3. As soon as data is finished, move temp file to perm location |
| 177 | + # 4. Remove stale files based on timestamp |
| 178 | + end |
| 179 | + |
| 180 | + defp do_get(id, opts) do |
| 181 | + cache_name = opts[:cache] |
| 182 | + # 1. Get a list of all files like pattern |
| 183 | + # 2. Sort by order (higher timestamp goes first) |
| 184 | + # 3. Try evaluating one by one |
| 185 | + # 4. As soon as one is found, remove the rest |
| 186 | + |
| 187 | + # NOTE that we do this to |
| 188 | + # StaleCleaner.schedule_file_removal(files) |
| 189 | + end |
| 190 | + |
| 191 | + defp data_stream!(fun) when is_function(fun, 0) do |
| 192 | + do_data_stream!(fun.()) |
| 193 | + end |
| 194 | + |
| 195 | + defp data_stream!(other), do: do_data_stream!(other) |
| 196 | + |
| 197 | + defp do_data_stream!(data) do |
| 198 | + cond do |
| 199 | + is_list(data) -> |
| 200 | + data |
| 201 | + |
| 202 | + is_binary(data) -> |
| 203 | + data |
| 204 | + |
| 205 | + Enumerable.impl_for(data) -> |
| 206 | + data |
| 207 | + |
| 208 | + true -> |
| 209 | + raise ArgumentError, |
| 210 | + message: |
| 211 | + "Passed data is not iodata, stream, or function that yields them. Got: #{inspect(data)}" |
| 212 | + end |
| 213 | + end |
| 214 | + |
| 215 | + defp write_to_temp(enum, filepath) do |
| 216 | + # TODO: any rescue wrappers? |
| 217 | + case enum do |
| 218 | + iodata when is_list(iodata) or is_binary(iodata) -> |
| 219 | + File.write!(filepath, iodata, [:binary]) |
| 220 | + |
| 221 | + stream -> |
| 222 | + Enum.into(stream, File.stream!(filepath, [:binary], :byte)) |
| 223 | + end |
| 224 | + end |
| 225 | + |
| 226 | + defp move_from_temp(temp_path, perm_path) do |
| 227 | + # try do |
| 228 | + # File.rename!() |
| 229 | + # end |
| 230 | + end |
| 231 | + |
| 232 | + def remove_stale_files(id) do |
| 233 | + # 1. list all files containing id sorted by timestamp |
| 234 | + # 2. Remove the stale ones (async) |
| 235 | + end |
| 236 | + |
| 237 | + defp validate_id!(id) do |
| 238 | + with :ok <- Utils.validate_filepath(id) do |
| 239 | + id |
| 240 | + else |
| 241 | + {:error, reason} -> |
| 242 | + raise ArgumentError, |
| 243 | + message: "FileCache ID #{reason}. Got: #{inspect(id)}" |
| 244 | + end |
| 245 | + end |
| 246 | +end |
0 commit comments