Skip to content

Commit c58b5bd

Browse files
authored
Merge pull request #1101 from operable/imbriaco/tee-command
Adds tee and cat commands and a supporting DataStore service
2 parents 8d73dc6 + b199ff7 commit c58b5bd

File tree

11 files changed

+495
-9
lines changed

11 files changed

+495
-9
lines changed

config/config.exs

+5-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ end
1616
# Embedded Command Bundle Version (for built-in commands)
1717
# NOTE: Do not change this value unless you know what you're doing.
1818
# ========================================================================
19-
config :cog, :embedded_bundle_version, "0.14.0"
19+
20+
config :cog, :embedded_bundle_version, "0.15.0"
2021

2122
# ========================================================================
2223
# Chat Adapters
@@ -57,6 +58,9 @@ config :cog, Cog.Command.Pipeline,
5758
interactive_timeout: {60, :sec},
5859
trigger_timeout: {300, :sec}
5960

61+
config :cog, Cog.Command.Service,
62+
data_path: data_dir("service_data")
63+
6064
# Set these to zero (0) to disable caching
6165
config :cog, :command_cache_ttl, {60, :sec}
6266
config :cog, :command_rule_ttl, {10, :sec}

lib/cog/command/gen_command/base.ex

+4-2
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,8 @@ defmodule Cog.Command.GenCommand.Base do
198198
"""
199199
def options(module) do
200200
attr_values(module, :options)
201-
|> Enum.reduce(%{}, fn(%{"name" => name, "type" => type, "required" => required, "short_flag" => short}, acc) ->
202-
Map.put(acc, name, %{"type" => type, "required" => required, "short_flag" => short})
201+
|> Enum.reduce(%{}, fn(%{"name" => name, "type" => type, "required" => required, "short_flag" => short, "description" => description}, acc) ->
202+
Map.put(acc, name, %{"type" => type, "required" => required, "short_flag" => short, "description" => description})
203203
end)
204204
end
205205

@@ -242,11 +242,13 @@ defmodule Cog.Command.GenCommand.Base do
242242
243243
"""
244244
defmacro option(name, options \\ []) do
245+
description = Keyword.get(options, :description, nil)
245246
required = Keyword.get(options, :required, false)
246247
type = Keyword.get(options, :type, "string")
247248
short = Keyword.get(options, :short, nil)
248249
quote do
249250
@options %{"name" => unquote(name),
251+
"description" => unquote(description),
250252
"required" => unquote(required),
251253
"short_flag" => unquote(short),
252254
"type" => unquote(type)}

lib/cog/command/service/data_store.ex

+81
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
defmodule Cog.Command.Service.DataStore do
2+
@moduledoc """
3+
Stores an arbitrary data structure for a given key. The only requirement is
4+
that the data structure must be able to be encoded as JSON.
5+
6+
Keys may be fetched, replaced, and deleted.
7+
8+
The JSON data is stored on the filesystem of the Cog host. See the
9+
Cog.NestedFile module for more details.
10+
"""
11+
12+
use GenServer
13+
14+
alias Cog.NestedFile
15+
16+
defstruct [:base_path]
17+
18+
@doc """
19+
Starts the #{inspect __MODULE__} service. Accepts a path to use for the
20+
base directory to store content under.
21+
"""
22+
def start_link(base_path),
23+
do: GenServer.start_link(__MODULE__, base_path, name: __MODULE__)
24+
25+
@doc """
26+
Fetches the given key. Returns `{:ok, value}` if the key exists or `{:error,
27+
:unknown_key}` if it doesn't exist.
28+
"""
29+
def fetch(namespace, key),
30+
do: GenServer.call(__MODULE__, {:fetch, namespace, key})
31+
32+
@doc """
33+
Replaces or sets the given key with the value. Returns `{:ok, value}`.
34+
"""
35+
def replace(namespace, key, value),
36+
do: GenServer.call(__MODULE__, {:replace, namespace, key, value})
37+
38+
@doc """
39+
Deletes the given key. Returns `{:ok, key}` when successfully
40+
deleted or `{:error, :unknown_key}` if it doesn't exist.
41+
"""
42+
def delete(namespace, key),
43+
do: GenServer.call(__MODULE__, {:delete, namespace, key})
44+
45+
def init(nil),
46+
do: {:stop, "Unable to start #{__MODULE__}: Data path not configured"}
47+
def init(base_path) do
48+
state = %__MODULE__{base_path: base_path}
49+
{:ok, state}
50+
end
51+
52+
def handle_call({:fetch, namespace, key}, _from, state) do
53+
case NestedFile.fetch([state.base_path] ++ namespace, key, "json") do
54+
{:ok, content} ->
55+
data = Poison.decode!(content)
56+
{:reply, {:ok, data}, state}
57+
{:error, reason} ->
58+
{:reply, {:error, reason}, state}
59+
end
60+
end
61+
62+
def handle_call({:replace, namespace, key, value}, _from, state) do
63+
content = Poison.encode!(value)
64+
65+
case NestedFile.replace([state.base_path] ++ namespace, key, content, "json") do
66+
{:ok, ^content} ->
67+
{:reply, {:ok, value}, state}
68+
{:error, reason} ->
69+
{:reply, {:error, reason}, state}
70+
end
71+
end
72+
73+
def handle_call({:delete, namespace, key}, _from, state) do
74+
case NestedFile.delete([state.base_path] ++ namespace, key, "json") do
75+
:ok ->
76+
{:reply, {:ok, key}, state}
77+
{:error, reason} ->
78+
{:reply, {:error, reason}, state}
79+
end
80+
end
81+
end

lib/cog/command/service_sup.ex

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
defmodule Cog.Command.Service.Supervisor do
22
use Supervisor
3-
require Logger
43

54
alias Cog.Command.Service
65

@@ -12,10 +11,19 @@ defmodule Cog.Command.Service.Supervisor do
1211
token_monitor_table = :ets.new(:token_monitor_table, [:public])
1312
memory_table = :ets.new(:memory_table, [:public])
1413
memory_monitor_table = :ets.new(:memory_monitor_table, [:public])
14+
data_path = data_path
15+
16+
Application.get_env(:cog, Cog.Command.Service, [])[:data_path]
1517

1618
children = [worker(Service.Tokens, [token_table, token_monitor_table]),
19+
worker(Service.DataStore, [data_path]),
1720
worker(Service.Memory, [memory_table, memory_monitor_table])]
1821

1922
supervise(children, strategy: :one_for_one)
2023
end
24+
25+
def data_path do
26+
Application.fetch_env!(:cog, Cog.Command.Service)
27+
|> Keyword.fetch!(:data_path)
28+
end
2129
end

lib/cog/commands/cat.ex

+77
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
defmodule Cog.Commands.Cat do
2+
use Cog.Command.GenCommand.Base,
3+
bundle: Cog.Util.Misc.embedded_bundle
4+
5+
alias Cog.Command.Service.DataStore
6+
7+
# Note, we use the `tee` namespace because the namespace we read from
8+
# must be the same one that data was written into.
9+
@data_namespace [ "commands", "tee" ]
10+
11+
@description "Retrieve saved pipeline output"
12+
13+
@long_description """
14+
The cat command retrieves pipeline output that was previously saved using the tee command.
15+
"""
16+
17+
@arguments "<name>"
18+
19+
rule "when command is #{Cog.Util.Misc.embedded_bundle}:cat allow"
20+
21+
option "merge", short: "m", type: "bool", required: false,
22+
description: "Merge current pipeline map into saved pipeline map"
23+
24+
option "append", short: "a", type: "bool", required: false,
25+
description: "Append current pipeline output to saved pipeline data, returning an array"
26+
27+
option "insert", short: "i", type: "string", required: false,
28+
description: "Insert current pipeline output into saved pipeline map as the field specified for this option"
29+
30+
def handle_message(%{options: %{"merge" => true, "append" => true}} = req, state) do
31+
{:error, req.reply_to, "The append and merge options cannot be specified together", state}
32+
end
33+
34+
def handle_message(%{args: [key], options: opts} = req, state) do
35+
case DataStore.fetch(@data_namespace, key) do
36+
{:ok, data} ->
37+
cond do
38+
opts["insert"] ->
39+
handle_transform(:insert, req, data, state)
40+
opts["append"] ->
41+
{:reply, req.reply_to, List.wrap(data) ++ List.wrap(req.cog_env), state}
42+
opts["merge"] ->
43+
handle_transform(:merge, req, data, state)
44+
true ->
45+
{:reply, req.reply_to, data, state}
46+
end
47+
{:error, reason} ->
48+
{:error, req.reply_to, "Unable to retrieve data for #{key}: #{inspect reason}", state}
49+
end
50+
end
51+
52+
def handle_message(%{args: []} = req, state),
53+
do: {:error, req.reply_to, "#{Cog.Util.Misc.embedded_bundle}:cat requires a name to be specified", state}
54+
55+
defp handle_transform(action, req, data, state) do
56+
case transform_map_data(action, req.cog_env, data, req.options) do
57+
{:ok, result} ->
58+
{:reply, req.reply_to, result, state}
59+
{:error, reason} ->
60+
{:error, req.reply_to, reason, state}
61+
end
62+
end
63+
64+
defp transform_map_data(action, [prev], curr, opts),
65+
do: transform_map_data(action, prev, curr, opts)
66+
defp transform_map_data(action, prev, [curr], opts),
67+
do: transform_map_data(action, prev, curr, opts)
68+
defp transform_map_data(:merge, prev, curr, _opts) when is_map(prev) and is_map(curr) do
69+
{:ok, Map.merge(prev, curr)}
70+
end
71+
defp transform_map_data(:insert, prev, curr, opts) when is_map(prev) and is_map(curr) do
72+
{:ok, Map.put(prev, opts["insert"], curr)}
73+
end
74+
defp transform_map_data(action, _prev, _curr, _opts) do
75+
{:error, "The #{Atom.to_string(action)} option is only applicable for map values"}
76+
end
77+
end

lib/cog/commands/tee.ex

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
defmodule Cog.Commands.Tee do
2+
use Cog.Command.GenCommand.Base,
3+
bundle: Cog.Util.Misc.embedded_bundle
4+
5+
require Logger
6+
7+
alias Cog.Command.Service.MemoryClient
8+
alias Cog.Command.Service.DataStore
9+
10+
@data_namespace [ "commands", "tee" ]
11+
12+
@description "Save and pass through pipeline data"
13+
14+
@long_description """
15+
The tee command passes the output of a Cog pipeline through to the next command in the pipeline while also saving it using the provided name. The saved output can be retreived later using the cat command.
16+
17+
If the name of a previously saved object is reused, tee will overwrite the existing data. There is not currently a way to delete saved content from Cog, but you can simulate this behavior by sending a replacement object to tee again with the name of the object you wish to delete.
18+
19+
Think carefully about the type of data that you store using tee since it will be retrievable by default by any Cog user. Careful use of rules and naming conventions could be used to limit access, though keep in mind that a simple typo in naming could cause unexpected data to be accessible. For example, the rules below would require you to have the "site:prod-data" permission in order to save or retrieve objects whose names begin with "prod-".
20+
21+
operable:rule create "when command is operable:tee with arg[0] == /^prod-.*/ must have site:prod-data"
22+
operable:rule create "when command is operable:cat with arg[0] == /^prod-.*/ must have site:prod-data"
23+
"""
24+
25+
@arguments "<name>"
26+
27+
@examples """
28+
seed '{ "thing": "stuff" }' | tee foo
29+
> '{ "thing": "stuff" }'
30+
cat foo
31+
> '{ "thing": "stuff" }'
32+
"""
33+
34+
rule "when command is #{Cog.Util.Misc.embedded_bundle}:tee allow"
35+
36+
def handle_message(%{args: [key]} = req, state) do
37+
root = req.services_root
38+
token = req.service_token
39+
step = req.invocation_step
40+
value = req.cog_env
41+
memory_key = req.invocation_id
42+
43+
MemoryClient.accum(root, token, memory_key, value)
44+
45+
case step do
46+
step when step in ["first", nil] ->
47+
{:reply, req.reply_to, nil, state}
48+
"last"->
49+
data =
50+
MemoryClient.fetch(root, token, memory_key)
51+
|> Enum.reject(fn(value) -> value == %{} end)
52+
|> maybe_unwrap
53+
54+
MemoryClient.delete(root, token, memory_key)
55+
56+
case DataStore.replace(@data_namespace, key, data) do
57+
{:error, reason} ->
58+
{:error, req.reply_to, "Unable to store pipeline content: #{inspect reason}"}
59+
{:ok, _} ->
60+
{:reply, req.reply_to, data, state}
61+
end
62+
end
63+
end
64+
65+
def handle_message(%{args: []} = req, state) do
66+
{:error, req.reply_to, "#{Cog.Util.Misc.embedded_bundle}:tee requires a name to be specified for the pipeline content", state}
67+
end
68+
69+
defp maybe_unwrap([data]), do: data
70+
defp maybe_unwrap(data), do: data
71+
end

lib/cog/nested_file.ex

+86
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
defmodule Cog.NestedFile do
2+
@moduledoc """
3+
Stores an arbitrary String on the filesystem of the Cog host in a file
4+
named by combining the provided key and extension. Files are written to
5+
a directory hierarchy that is created by splitting the filename into
6+
two character segments and joining up to three of those segments with a
7+
provided list of base path segments.
8+
9+
Note: The key is sanitized to remove dangerous characters, but the
10+
base paths and extension are not. Make sure not to use user entered
11+
values for these unsafe arguments.
12+
13+
Example: If the replace function was called with the following options:
14+
15+
base_paths = [ "commands", "tee" ]
16+
key = "myfilename/../foo"
17+
ext = "json"
18+
19+
The following would be created on disk:
20+
21+
.
22+
└── commands
23+
└── tee
24+
└── my
25+
└── fi
26+
└── le
27+
└── myfilenamefoo.json
28+
29+
This directory structure is created in order to deal with the fact that
30+
some filesystems demonstrate poor performance characteristics when working
31+
with directories that contain a very large number of files.
32+
"""
33+
require Logger
34+
35+
def fetch(base_paths, key, ext \\ "data") do
36+
case File.read(build_filename(base_paths, key, ext)) do
37+
{:error, reason} ->
38+
{:error, error_text(reason)}
39+
{:ok, content} ->
40+
{:ok, content}
41+
end
42+
end
43+
44+
def replace(base_paths, key, content, ext \\ "data") do
45+
filename = build_filename(base_paths, key, ext)
46+
File.mkdir_p(Path.dirname(filename))
47+
48+
case File.write(filename, content) do
49+
{:error, reason} ->
50+
{:error, error_text(reason)}
51+
:ok ->
52+
{:ok, content}
53+
end
54+
end
55+
56+
def delete(base_paths, key, ext \\ "data") do
57+
case File.rm(build_filename(base_paths, key, ext)) do
58+
{:error, reason} ->
59+
{:error, error_text(reason)}
60+
:ok ->
61+
:ok
62+
end
63+
end
64+
65+
defp error_text(:enoent), do: "Object not found"
66+
defp error_text(:enospc), do: "No space available to save object"
67+
defp error_text(:eacces), do: "Permission denied"
68+
defp error_text(reason) do
69+
"E_" <> (Atom.to_string(reason) |> String.slice(1..-1) |> String.upcase)
70+
end
71+
72+
defp build_filename(base_paths, key, ext) do
73+
key = sanitize_filename(key)
74+
75+
segments =
76+
Regex.scan(~r/.{1,2}/, key)
77+
|> List.flatten
78+
|> Enum.slice(0,3)
79+
filename = key <> "." <> ext
80+
Path.join(base_paths ++ segments ++ [filename])
81+
end
82+
83+
defp sanitize_filename(name) do
84+
Regex.replace(~r/[^A-Za-z0-9_\-]/, name, "", global: true)
85+
end
86+
end

0 commit comments

Comments
 (0)