-
Notifications
You must be signed in to change notification settings - Fork 71
Adds tee and cat commands and a supporting DataStore service #1101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
defmodule Cog.Command.Service.DataStore do | ||
@moduledoc """ | ||
Stores an arbitrary data structure for a given key. The only requirement is | ||
that the data structure must be able to be encoded as JSON. | ||
|
||
Keys may be fetched, replaced, and deleted. | ||
|
||
The JSON data is stored on the filesystem of the Cog host. See the | ||
Cog.NestedFile module for more details. | ||
""" | ||
|
||
use GenServer | ||
|
||
alias Cog.NestedFile | ||
|
||
defstruct [:base_path] | ||
|
||
@doc """ | ||
Starts the #{inspect __MODULE__} service. Accepts a path to use for the | ||
base directory to store content under. | ||
""" | ||
def start_link(base_path), | ||
do: GenServer.start_link(__MODULE__, base_path, name: __MODULE__) | ||
|
||
@doc """ | ||
Fetches the given key. Returns `{:ok, value}` if the key exists or `{:error, | ||
:unknown_key}` if it doesn't exist. | ||
""" | ||
def fetch(namespace, key), | ||
do: GenServer.call(__MODULE__, {:fetch, namespace, key}) | ||
|
||
@doc """ | ||
Replaces or sets the given key with the value. Returns `{:ok, value}`. | ||
""" | ||
def replace(namespace, key, value), | ||
do: GenServer.call(__MODULE__, {:replace, namespace, key, value}) | ||
|
||
@doc """ | ||
Deletes the given key. Returns `{:ok, key}` when successfully | ||
deleted or `{:error, :unknown_key}` if it doesn't exist. | ||
""" | ||
def delete(namespace, key), | ||
do: GenServer.call(__MODULE__, {:delete, namespace, key}) | ||
|
||
def init(nil), | ||
do: {:stop, "Unable to start #{__MODULE__}: Data path not configured"} | ||
def init(base_path) do | ||
state = %__MODULE__{base_path: base_path} | ||
{:ok, state} | ||
end | ||
|
||
def handle_call({:fetch, namespace, key}, _from, state) do | ||
case NestedFile.fetch([state.base_path] ++ namespace, key, "json") do | ||
{:ok, content} -> | ||
data = Poison.decode!(content) | ||
{:reply, {:ok, data}, state} | ||
{:error, reason} -> | ||
{:reply, {:error, reason}, state} | ||
end | ||
end | ||
|
||
def handle_call({:replace, namespace, key, value}, _from, state) do | ||
content = Poison.encode!(value) | ||
|
||
case NestedFile.replace([state.base_path] ++ namespace, key, content, "json") do | ||
{:ok, ^content} -> | ||
{:reply, {:ok, value}, state} | ||
{:error, reason} -> | ||
{:reply, {:error, reason}, state} | ||
end | ||
end | ||
|
||
def handle_call({:delete, namespace, key}, _from, state) do | ||
case NestedFile.delete([state.base_path] ++ namespace, key, "json") do | ||
:ok -> | ||
{:reply, {:ok, key}, state} | ||
{:error, reason} -> | ||
{:reply, {:error, reason}, state} | ||
end | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
defmodule Cog.Commands.Cat do | ||
use Cog.Command.GenCommand.Base, | ||
bundle: Cog.Util.Misc.embedded_bundle | ||
|
||
alias Cog.Command.Service.DataStore | ||
|
||
# Note, we use the `tee` namespace because the namespace we read from | ||
# must be the same one that data was written into. | ||
@data_namespace [ "commands", "tee" ] | ||
|
||
@description "Retrieve saved pipeline output" | ||
|
||
@long_description """ | ||
The cat command retrieves pipeline output that was previously saved using the tee command. | ||
""" | ||
|
||
@arguments "<name>" | ||
|
||
rule "when command is #{Cog.Util.Misc.embedded_bundle}:cat allow" | ||
|
||
option "merge", short: "m", type: "bool", required: false, | ||
description: "Merge current pipeline map into saved pipeline map" | ||
|
||
option "append", short: "a", type: "bool", required: false, | ||
description: "Append current pipeline output to saved pipeline data, returning an array" | ||
|
||
option "insert", short: "i", type: "string", required: false, | ||
description: "Insert current pipeline output into saved pipeline map as the field specified for this option" | ||
|
||
def handle_message(%{options: %{"merge" => true, "append" => true}} = req, state) do | ||
{:error, req.reply_to, "The append and merge options cannot be specified together", state} | ||
end | ||
|
||
def handle_message(%{args: [key], options: opts} = req, state) do | ||
case DataStore.fetch(@data_namespace, key) do | ||
{:ok, data} -> | ||
cond do | ||
opts["insert"] -> | ||
handle_transform(:insert, req, data, state) | ||
opts["append"] -> | ||
{:reply, req.reply_to, List.wrap(data) ++ List.wrap(req.cog_env), state} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't this actually "prepend"? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's appending the current pipeline output to the saved pipeline output. Example:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. D'oh, you're right... I got a tad mixed up. |
||
opts["merge"] -> | ||
handle_transform(:merge, req, data, state) | ||
true -> | ||
{:reply, req.reply_to, data, state} | ||
end | ||
{:error, reason} -> | ||
{:error, req.reply_to, "Unable to retrieve data for #{key}: #{inspect reason}", state} | ||
end | ||
end | ||
|
||
def handle_message(%{args: []} = req, state), | ||
do: {:error, req.reply_to, "#{Cog.Util.Misc.embedded_bundle}:cat requires a name to be specified", state} | ||
|
||
defp handle_transform(action, req, data, state) do | ||
case transform_map_data(action, req.cog_env, data, req.options) do | ||
{:ok, result} -> | ||
{:reply, req.reply_to, result, state} | ||
{:error, reason} -> | ||
{:error, req.reply_to, reason, state} | ||
end | ||
end | ||
|
||
defp transform_map_data(action, [prev], curr, opts), | ||
do: transform_map_data(action, prev, curr, opts) | ||
defp transform_map_data(action, prev, [curr], opts), | ||
do: transform_map_data(action, prev, curr, opts) | ||
defp transform_map_data(:merge, prev, curr, _opts) when is_map(prev) and is_map(curr) do | ||
{:ok, Map.merge(prev, curr)} | ||
end | ||
defp transform_map_data(:insert, prev, curr, opts) when is_map(prev) and is_map(curr) do | ||
{:ok, Map.put(prev, opts["insert"], curr)} | ||
end | ||
defp transform_map_data(action, _prev, _curr, _opts) do | ||
{:error, "The #{Atom.to_string(action)} option is only applicable for map values"} | ||
end | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
defmodule Cog.Commands.Tee do | ||
use Cog.Command.GenCommand.Base, | ||
bundle: Cog.Util.Misc.embedded_bundle | ||
|
||
require Logger | ||
|
||
alias Cog.Command.Service.MemoryClient | ||
alias Cog.Command.Service.DataStore | ||
|
||
@data_namespace [ "commands", "tee" ] | ||
|
||
@description "Save and pass through pipeline data" | ||
|
||
@long_description """ | ||
The tee command passes the output of a Cog pipeline through to the next command in the pipeline while also saving it using the provided name. The saved output can be retreived later using the cat command. | ||
|
||
If the name of a previously saved object is reused, tee will overwrite the existing data. There is not currently a way to delete saved content from Cog, but you can simulate this behavior by sending a replacement object to tee again with the name of the object you wish to delete. | ||
|
||
Think carefully about the type of data that you store using tee since it will be retrievable by default by any Cog user. Careful use of rules and naming conventions could be used to limit access, though keep in mind that a simple typo in naming could cause unexpected data to be accessible. For example, the rules below would require you to have the "site:prod-data" permission in order to save or retrieve objects whose names begin with "prod-". | ||
|
||
operable:rule create "when command is operable:tee with arg[0] == /^prod-.*/ must have site:prod-data" | ||
operable:rule create "when command is operable:cat with arg[0] == /^prod-.*/ must have site:prod-data" | ||
""" | ||
|
||
@arguments "<name>" | ||
|
||
@examples """ | ||
seed '{ "thing": "stuff" }' | tee foo | ||
> '{ "thing": "stuff" }' | ||
cat foo | ||
> '{ "thing": "stuff" }' | ||
""" | ||
|
||
rule "when command is #{Cog.Util.Misc.embedded_bundle}:tee allow" | ||
|
||
def handle_message(%{args: [key]} = req, state) do | ||
root = req.services_root | ||
token = req.service_token | ||
step = req.invocation_step | ||
value = req.cog_env | ||
memory_key = req.invocation_id | ||
|
||
MemoryClient.accum(root, token, memory_key, value) | ||
|
||
case step do | ||
step when step in ["first", nil] -> | ||
{:reply, req.reply_to, nil, state} | ||
"last"-> | ||
data = | ||
MemoryClient.fetch(root, token, memory_key) | ||
|> Enum.reject(fn(value) -> value == %{} end) | ||
|> maybe_unwrap | ||
|
||
MemoryClient.delete(root, token, memory_key) | ||
|
||
case DataStore.replace(@data_namespace, key, data) do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Might be worth calling out that existing keys are replaced in the desc. |
||
{:error, reason} -> | ||
{:error, req.reply_to, "Unable to store pipeline content: #{inspect reason}"} | ||
{:ok, _} -> | ||
{:reply, req.reply_to, data, state} | ||
end | ||
end | ||
end | ||
|
||
def handle_message(%{args: []} = req, state) do | ||
{:error, req.reply_to, "#{Cog.Util.Misc.embedded_bundle}:tee requires a name to be specified for the pipeline content", state} | ||
end | ||
|
||
defp maybe_unwrap([data]), do: data | ||
defp maybe_unwrap(data), do: data | ||
end |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
defmodule Cog.NestedFile do | ||
@moduledoc """ | ||
Stores an arbitrary String on the filesystem of the Cog host in a file | ||
named by combining the provided key and extension. Files are written to | ||
a directory hierarchy that is created by splitting the filename into | ||
two character segments and joining up to three of those segments with a | ||
provided list of base path segments. | ||
|
||
Note: The key is sanitized to remove dangerous characters, but the | ||
base paths and extension are not. Make sure not to use user entered | ||
values for these unsafe arguments. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we just sanitize everything here, then? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't really, since "everything" includes the base path which will obviously have things like slashes in it. We could sanitize the contents of the namespace list that is passed to DataStore but it felt unnecessary since this is an internal API that's not exposed outside of Cog core. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So is the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, the key is the only thing that is user specified. That note is there for future-me who wants to reuse this for something else so that I don't make a mistake an let user-specified data into the namespace for instance. |
||
|
||
Example: If the replace function was called with the following options: | ||
|
||
base_paths = [ "commands", "tee" ] | ||
key = "myfilename/../foo" | ||
ext = "json" | ||
|
||
The following would be created on disk: | ||
|
||
. | ||
└── commands | ||
└── tee | ||
└── my | ||
└── fi | ||
└── le | ||
└── myfilenamefoo.json | ||
|
||
This directory structure is created in order to deal with the fact that | ||
some filesystems demonstrate poor performance characteristics when working | ||
with directories that contain a very large number of files. | ||
""" | ||
require Logger | ||
|
||
def fetch(base_paths, key, ext \\ "data") do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason to specify an extension? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It was mostly there because I didn't know if we'd want to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens in the case of simultaneous writes to the same key? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Last one wins. They're serialized through the DataStore service's GenServer. |
||
case File.read(build_filename(base_paths, key, ext)) do | ||
{:error, reason} -> | ||
{:error, error_text(reason)} | ||
{:ok, content} -> | ||
{:ok, content} | ||
end | ||
end | ||
|
||
def replace(base_paths, key, content, ext \\ "data") do | ||
filename = build_filename(base_paths, key, ext) | ||
File.mkdir_p(Path.dirname(filename)) | ||
|
||
case File.write(filename, content) do | ||
{:error, reason} -> | ||
{:error, error_text(reason)} | ||
:ok -> | ||
{:ok, content} | ||
end | ||
end | ||
|
||
def delete(base_paths, key, ext \\ "data") do | ||
case File.rm(build_filename(base_paths, key, ext)) do | ||
{:error, reason} -> | ||
{:error, error_text(reason)} | ||
:ok -> | ||
:ok | ||
end | ||
end | ||
|
||
defp error_text(:enoent), do: "Object not found" | ||
defp error_text(:enospc), do: "No space available to save object" | ||
defp error_text(:eacces), do: "Permission denied" | ||
defp error_text(reason) do | ||
"E_" <> (Atom.to_string(reason) |> String.slice(1..-1) |> String.upcase) | ||
end | ||
|
||
defp build_filename(base_paths, key, ext) do | ||
key = sanitize_filename(key) | ||
|
||
segments = | ||
Regex.scan(~r/.{1,2}/, key) | ||
|> List.flatten | ||
|> Enum.slice(0,3) | ||
filename = key <> "." <> ext | ||
Path.join(base_paths ++ segments ++ [filename]) | ||
end | ||
|
||
defp sanitize_filename(name) do | ||
Regex.replace(~r/[^A-Za-z0-9_\-]/, name, "", global: true) | ||
end | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to verify this path exists, is a directory, can be written to, etc? Is that done somewhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
NestedFile
module will try to create it if it does not exist the first time it is attempted to be written to and return an error if it is unable to. That felt like the right thing in this case since not everyone is likely to use the commands that make use of it and I wouldn't want to keep Cog from working. It also defaults to be nested under the default data directory just like logs and audit_logs so it feels pretty safe to me.Happy to hear arguments to the contrary if you think we should do something else.