Skip to content

Adds tee and cat commands and a supporting DataStore service #1101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 27, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ end
# Embedded Command Bundle Version (for built-in commands)
# NOTE: Do not change this value unless you know what you're doing.
# ========================================================================
config :cog, :embedded_bundle_version, "0.14.0"

config :cog, :embedded_bundle_version, "0.15.0"

# ========================================================================
# Chat Adapters
Expand Down Expand Up @@ -57,6 +58,9 @@ config :cog, Cog.Command.Pipeline,
interactive_timeout: {60, :sec},
trigger_timeout: {300, :sec}

config :cog, Cog.Command.Service,
data_path: data_dir("service_data")

# Set these to zero (0) to disable caching
config :cog, :command_cache_ttl, {60, :sec}
config :cog, :command_rule_ttl, {10, :sec}
Expand Down
6 changes: 4 additions & 2 deletions lib/cog/command/gen_command/base.ex
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ defmodule Cog.Command.GenCommand.Base do
"""
def options(module) do
attr_values(module, :options)
|> Enum.reduce(%{}, fn(%{"name" => name, "type" => type, "required" => required, "short_flag" => short}, acc) ->
Map.put(acc, name, %{"type" => type, "required" => required, "short_flag" => short})
|> Enum.reduce(%{}, fn(%{"name" => name, "type" => type, "required" => required, "short_flag" => short, "description" => description}, acc) ->
Map.put(acc, name, %{"type" => type, "required" => required, "short_flag" => short, "description" => description})
end)
end

Expand Down Expand Up @@ -242,11 +242,13 @@ defmodule Cog.Command.GenCommand.Base do

"""
defmacro option(name, options \\ []) do
description = Keyword.get(options, :description, nil)
required = Keyword.get(options, :required, false)
type = Keyword.get(options, :type, "string")
short = Keyword.get(options, :short, nil)
quote do
@options %{"name" => unquote(name),
"description" => unquote(description),
"required" => unquote(required),
"short_flag" => unquote(short),
"type" => unquote(type)}
Expand Down
81 changes: 81 additions & 0 deletions lib/cog/command/service/data_store.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
defmodule Cog.Command.Service.DataStore do
@moduledoc """
Stores an arbitrary data structure for a given key. The only requirement is
that the data structure must be able to be encoded as JSON.

Keys may be fetched, replaced, and deleted.

The JSON data is stored on the filesystem of the Cog host. See the
Cog.NestedFile module for more details.
"""

use GenServer

alias Cog.NestedFile

defstruct [:base_path]

@doc """
Starts the #{inspect __MODULE__} service. Accepts a path to use for the
base directory to store content under.
"""
def start_link(base_path),
do: GenServer.start_link(__MODULE__, base_path, name: __MODULE__)

@doc """
Fetches the given key. Returns `{:ok, value}` if the key exists or `{:error,
:unknown_key}` if it doesn't exist.
"""
def fetch(namespace, key),
do: GenServer.call(__MODULE__, {:fetch, namespace, key})

@doc """
Replaces or sets the given key with the value. Returns `{:ok, value}`.
"""
def replace(namespace, key, value),
do: GenServer.call(__MODULE__, {:replace, namespace, key, value})

@doc """
Deletes the given key. Returns `{:ok, key}` when successfully
deleted or `{:error, :unknown_key}` if it doesn't exist.
"""
def delete(namespace, key),
do: GenServer.call(__MODULE__, {:delete, namespace, key})

def init(nil),
do: {:stop, "Unable to start #{__MODULE__}: Data path not configured"}
def init(base_path) do
state = %__MODULE__{base_path: base_path}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to verify this path exists, is a directory, can be written to, etc? Is that done somewhere else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The NestedFile module will try to create it if it does not exist the first time it is attempted to be written to and return an error if it is unable to. That felt like the right thing in this case since not everyone is likely to use the commands that make use of it and I wouldn't want to keep Cog from working. It also defaults to be nested under the default data directory just like logs and audit_logs so it feels pretty safe to me.

Happy to hear arguments to the contrary if you think we should do something else.

{:ok, state}
end

def handle_call({:fetch, namespace, key}, _from, state) do
case NestedFile.fetch([state.base_path] ++ namespace, key, "json") do
{:ok, content} ->
data = Poison.decode!(content)
{:reply, {:ok, data}, state}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end

def handle_call({:replace, namespace, key, value}, _from, state) do
content = Poison.encode!(value)

case NestedFile.replace([state.base_path] ++ namespace, key, content, "json") do
{:ok, ^content} ->
{:reply, {:ok, value}, state}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end

def handle_call({:delete, namespace, key}, _from, state) do
case NestedFile.delete([state.base_path] ++ namespace, key, "json") do
:ok ->
{:reply, {:ok, key}, state}
{:error, reason} ->
{:reply, {:error, reason}, state}
end
end
end
10 changes: 9 additions & 1 deletion lib/cog/command/service_sup.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
defmodule Cog.Command.Service.Supervisor do
use Supervisor
require Logger

alias Cog.Command.Service

Expand All @@ -12,10 +11,19 @@ defmodule Cog.Command.Service.Supervisor do
token_monitor_table = :ets.new(:token_monitor_table, [:public])
memory_table = :ets.new(:memory_table, [:public])
memory_monitor_table = :ets.new(:memory_monitor_table, [:public])
data_path = data_path

Application.get_env(:cog, Cog.Command.Service, [])[:data_path]

children = [worker(Service.Tokens, [token_table, token_monitor_table]),
worker(Service.DataStore, [data_path]),
worker(Service.Memory, [memory_table, memory_monitor_table])]

supervise(children, strategy: :one_for_one)
end

def data_path do
Application.fetch_env!(:cog, Cog.Command.Service)
|> Keyword.fetch!(:data_path)
end
end
77 changes: 77 additions & 0 deletions lib/cog/commands/cat.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
defmodule Cog.Commands.Cat do
use Cog.Command.GenCommand.Base,
bundle: Cog.Util.Misc.embedded_bundle

alias Cog.Command.Service.DataStore

# Note, we use the `tee` namespace because the namespace we read from
# must be the same one that data was written into.
@data_namespace [ "commands", "tee" ]

@description "Retrieve saved pipeline output"

@long_description """
The cat command retrieves pipeline output that was previously saved using the tee command.
"""

@arguments "<name>"

rule "when command is #{Cog.Util.Misc.embedded_bundle}:cat allow"

option "merge", short: "m", type: "bool", required: false,
description: "Merge current pipeline map into saved pipeline map"

option "append", short: "a", type: "bool", required: false,
description: "Append current pipeline output to saved pipeline data, returning an array"

option "insert", short: "i", type: "string", required: false,
description: "Insert current pipeline output into saved pipeline map as the field specified for this option"

def handle_message(%{options: %{"merge" => true, "append" => true}} = req, state) do
{:error, req.reply_to, "The append and merge options cannot be specified together", state}
end

def handle_message(%{args: [key], options: opts} = req, state) do
case DataStore.fetch(@data_namespace, key) do
{:ok, data} ->
cond do
opts["insert"] ->
handle_transform(:insert, req, data, state)
opts["append"] ->
{:reply, req.reply_to, List.wrap(data) ++ List.wrap(req.cog_env), state}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this actually "prepend"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's appending the current pipeline output to the saved pipeline output.

Example:

@ash seed '{ "foo": "value1" }' | tee foo
[
  {
    "foo": "value1"
  }
]
@ash seed '{ "foo": "value2" }' | cat -a foo
[
  {
    "foo": "value1"
  },
  {
    "foo": "value2"
  }
]

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D'oh, you're right... I got a tad mixed up.

opts["merge"] ->
handle_transform(:merge, req, data, state)
true ->
{:reply, req.reply_to, data, state}
end
{:error, reason} ->
{:error, req.reply_to, "Unable to retrieve data for #{key}: #{inspect reason}", state}
end
end

def handle_message(%{args: []} = req, state),
do: {:error, req.reply_to, "#{Cog.Util.Misc.embedded_bundle}:cat requires a name to be specified", state}

defp handle_transform(action, req, data, state) do
case transform_map_data(action, req.cog_env, data, req.options) do
{:ok, result} ->
{:reply, req.reply_to, result, state}
{:error, reason} ->
{:error, req.reply_to, reason, state}
end
end

defp transform_map_data(action, [prev], curr, opts),
do: transform_map_data(action, prev, curr, opts)
defp transform_map_data(action, prev, [curr], opts),
do: transform_map_data(action, prev, curr, opts)
defp transform_map_data(:merge, prev, curr, _opts) when is_map(prev) and is_map(curr) do
{:ok, Map.merge(prev, curr)}
end
defp transform_map_data(:insert, prev, curr, opts) when is_map(prev) and is_map(curr) do
{:ok, Map.put(prev, opts["insert"], curr)}
end
defp transform_map_data(action, _prev, _curr, _opts) do
{:error, "The #{Atom.to_string(action)} option is only applicable for map values"}
end
end
71 changes: 71 additions & 0 deletions lib/cog/commands/tee.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
defmodule Cog.Commands.Tee do
use Cog.Command.GenCommand.Base,
bundle: Cog.Util.Misc.embedded_bundle

require Logger

alias Cog.Command.Service.MemoryClient
alias Cog.Command.Service.DataStore

@data_namespace [ "commands", "tee" ]

@description "Save and pass through pipeline data"

@long_description """
The tee command passes the output of a Cog pipeline through to the next command in the pipeline while also saving it using the provided name. The saved output can be retreived later using the cat command.

If the name of a previously saved object is reused, tee will overwrite the existing data. There is not currently a way to delete saved content from Cog, but you can simulate this behavior by sending a replacement object to tee again with the name of the object you wish to delete.

Think carefully about the type of data that you store using tee since it will be retrievable by default by any Cog user. Careful use of rules and naming conventions could be used to limit access, though keep in mind that a simple typo in naming could cause unexpected data to be accessible. For example, the rules below would require you to have the "site:prod-data" permission in order to save or retrieve objects whose names begin with "prod-".

operable:rule create "when command is operable:tee with arg[0] == /^prod-.*/ must have site:prod-data"
operable:rule create "when command is operable:cat with arg[0] == /^prod-.*/ must have site:prod-data"
"""

@arguments "<name>"

@examples """
seed '{ "thing": "stuff" }' | tee foo
> '{ "thing": "stuff" }'
cat foo
> '{ "thing": "stuff" }'
"""

rule "when command is #{Cog.Util.Misc.embedded_bundle}:tee allow"

def handle_message(%{args: [key]} = req, state) do
root = req.services_root
token = req.service_token
step = req.invocation_step
value = req.cog_env
memory_key = req.invocation_id

MemoryClient.accum(root, token, memory_key, value)

case step do
step when step in ["first", nil] ->
{:reply, req.reply_to, nil, state}
"last"->
data =
MemoryClient.fetch(root, token, memory_key)
|> Enum.reject(fn(value) -> value == %{} end)
|> maybe_unwrap

MemoryClient.delete(root, token, memory_key)

case DataStore.replace(@data_namespace, key, data) do
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth calling out that existing keys are replaced in the desc.

{:error, reason} ->
{:error, req.reply_to, "Unable to store pipeline content: #{inspect reason}"}
{:ok, _} ->
{:reply, req.reply_to, data, state}
end
end
end

def handle_message(%{args: []} = req, state) do
{:error, req.reply_to, "#{Cog.Util.Misc.embedded_bundle}:tee requires a name to be specified for the pipeline content", state}
end

defp maybe_unwrap([data]), do: data
defp maybe_unwrap(data), do: data
end
86 changes: 86 additions & 0 deletions lib/cog/nested_file.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
defmodule Cog.NestedFile do
@moduledoc """
Stores an arbitrary String on the filesystem of the Cog host in a file
named by combining the provided key and extension. Files are written to
a directory hierarchy that is created by splitting the filename into
two character segments and joining up to three of those segments with a
provided list of base path segments.

Note: The key is sanitized to remove dangerous characters, but the
base paths and extension are not. Make sure not to use user entered
values for these unsafe arguments.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just sanitize everything here, then?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't really, since "everything" includes the base path which will obviously have things like slashes in it. We could sanitize the contents of the namespace list that is passed to DataStore but it felt unnecessary since this is an internal API that's not exposed outside of Cog core.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So is the key the only thing that's user-specified? The wording of the comment led me to believe otherwise.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the key is the only thing that is user specified. That note is there for future-me who wants to reuse this for something else so that I don't make a mistake an let user-specified data into the namespace for instance.


Example: If the replace function was called with the following options:

base_paths = [ "commands", "tee" ]
key = "myfilename/../foo"
ext = "json"

The following would be created on disk:

.
└── commands
└── tee
└── my
└── fi
└── le
└── myfilenamefoo.json

This directory structure is created in order to deal with the fact that
some filesystems demonstrate poor performance characteristics when working
with directories that contain a very large number of files.
"""
require Logger

def fetch(base_paths, key, ext \\ "data") do
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason to specify an extension?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was mostly there because I didn't know if we'd want to use NestedFile in other places in Cog since it's fairly generic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens in the case of simultaneous writes to the same key?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last one wins. They're serialized through the DataStore service's GenServer.

case File.read(build_filename(base_paths, key, ext)) do
{:error, reason} ->
{:error, error_text(reason)}
{:ok, content} ->
{:ok, content}
end
end

def replace(base_paths, key, content, ext \\ "data") do
filename = build_filename(base_paths, key, ext)
File.mkdir_p(Path.dirname(filename))

case File.write(filename, content) do
{:error, reason} ->
{:error, error_text(reason)}
:ok ->
{:ok, content}
end
end

def delete(base_paths, key, ext \\ "data") do
case File.rm(build_filename(base_paths, key, ext)) do
{:error, reason} ->
{:error, error_text(reason)}
:ok ->
:ok
end
end

defp error_text(:enoent), do: "Object not found"
defp error_text(:enospc), do: "No space available to save object"
defp error_text(:eacces), do: "Permission denied"
defp error_text(reason) do
"E_" <> (Atom.to_string(reason) |> String.slice(1..-1) |> String.upcase)
end

defp build_filename(base_paths, key, ext) do
key = sanitize_filename(key)

segments =
Regex.scan(~r/.{1,2}/, key)
|> List.flatten
|> Enum.slice(0,3)
filename = key <> "." <> ext
Path.join(base_paths ++ segments ++ [filename])
end

defp sanitize_filename(name) do
Regex.replace(~r/[^A-Za-z0-9_\-]/, name, "", global: true)
end
end
Loading