Skip to content

Commit

Permalink
feat: draft of influx integration
Browse files Browse the repository at this point in the history
To run:

- Make sure you have Docker installed/running
- Run `implementations/elixir/influx.sh`
- Run `cd implementations/elixir && mix test --no-start
test/integration/influx_test.exs`

The way this is put together is not ideal, but works as a rough draft to
iterate from.
  • Loading branch information
bitwalker authored and mrinalwadhwa committed May 14, 2020
1 parent 2bf2156 commit 6bc23e5
Show file tree
Hide file tree
Showing 25 changed files with 1,150 additions and 26 deletions.
1 change: 1 addition & 0 deletions implementations/elixir/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ import Config

config :ockam, vault: []
config :ockam, transports: []
config :ockam, services: []

import_config "#{Mix.env()}.exs"
16 changes: 16 additions & 0 deletions implementations/elixir/config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,19 @@ config :ockam, :transports,
listen_address: "0.0.0.0",
listen_port: 4000
]

config :ockam, :services,
influx_example: [
service: Ockam.Services.Influx,
database: "test",
http: [
host: "127.0.0.1",
port: 8086
]
]

config :fluxter, Ockam.Services.Influx.Fluxter,
host: "127.0.0.1",
port: 8089,
pool_size: 5,
prefix: nil
6 changes: 5 additions & 1 deletion implementations/elixir/config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,8 @@ import Config
config :logger,
level: :warn

config :ockam, :vault, curve: :curve25519
config :fluxter, Ockam.Services.Influx.Fluxter,
host: "127.0.0.1",
port: 8089,
pool_size: 5,
prefix: nil
22 changes: 22 additions & 0 deletions implementations/elixir/influx.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env bash

set -e
set -o pipefail

SCRIPT_DIR="$(cd "$(dirname "$0")" && pwd -P)"
BUILD_DIR="${SCRIPT_DIR}/_build"
VAR_DIR="${BUILD_DIR}/influxdb"

mkdir -p "${VAR_DIR}"

exec docker run -p 8086:8086 -p 8089:8089 \
-v "${VAR_DIR}:/var/lib/influxdb" \
-e INFLUXDB_DB=test \
-e INFLUXDB_HTTP_ENABLED=true \
-e INFLUXDB_HTTP_FLUX_ENABLED=true \
-e INFLUXDB_HTTP_BIND_ADDRESS=':8086' \
-e INFLUXDB_UDP_ENABLED=true \
-e INFLUXDB_UDP_BIND_ADDRESS=':8089' \
-e INFLUXDB_UDP_DATABASE=test \
-e INFLUXDB_REPORTING_DISABLED=true \
influxdb:1.8-alpine
3 changes: 3 additions & 0 deletions implementations/elixir/lib/app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ defmodule Ockam.App do
Logger.info("Starting #{__MODULE__}")

transports = Application.get_env(:ockam, :transports, [])
services = Application.get_env(:ockam, :services, [])

children = [
{Ockam.Registry, []},
{Ockam.Services, [services]},
{Ockam.Transport.Supervisor, [transports]}
]

Expand Down
26 changes: 26 additions & 0 deletions implementations/elixir/lib/registry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Ockam.Registry do
@type service :: module()
@type name :: term()

def child_spec([]) do
Registry.child_spec(keys: :unique, name: __MODULE__)
end

@doc "Lookup a registered service by name"
@spec lookup(name()) :: nil | {pid(), service()}
def lookup(name) do
case Registry.lookup(__MODULE__, name) do
[] ->
nil

[registered] ->
registered
end
end

@doc "Lookup registered instances of the given service"
@spec lookup_by_service(service()) :: [{pid(), name()}]
def lookup_by_service(service) do
Registry.select(__MODULE__, [{{:"$1", :"$2", service}, [], [{:"$2", :"$1"}]}])
end
end
39 changes: 39 additions & 0 deletions implementations/elixir/lib/router.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
defmodule Ockam.Router do
alias __MODULE__.Protocol.Endpoint

@opaque connection :: %{__struct__: module()}

@doc """
Connects to the given endpoint
Certain endpoints may be treated specially, such as when Influx is hosted
within the router itself; this allows the router to act in both its primary
capacity, and as the service shim layer for such services.
"""
@spec connect(Endpoint.t()) :: {:ok, connection} | {:error, term}
def connect(dest)

def connect(%Endpoint{value: %Endpoint.Local{data: name}}) do
case Ockam.Registry.lookup(name) do
nil ->
{:error, :not_found}

{pid, service} when is_pid(pid) and is_atom(service) ->
service.connect(pid, self())
end
end

def connect(%Endpoint{}), do: {:error, :unsupported}

@doc "Send a message to the given connection"
@spec send(connection, term) :: :ok | {:error, term}
def send(%mod{} = connection, message) do
mod.send(connection, message)
end

@doc "Execute a request to the given connection"
@spec request(connection, term) :: {:ok, reply :: term} | {:error, term}
def request(%mod{} = connection, req) do
mod.request(connection, req)
end
end
2 changes: 1 addition & 1 deletion implementations/elixir/lib/router/protocol/encoder.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defprotocol Ockam.Router.Protocol.Encoder do
@type t :: term()
@type opts :: Keyword.t()
@type opts :: map()
@type reason :: term()

@fallback_to_any true
Expand Down
4 changes: 4 additions & 0 deletions implementations/elixir/lib/router/protocol/message/ack.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
defmodule Ockam.Router.Protocol.Message.Ack do
use Ockam.Router.Protocol.Message,
type_id: 5
end
16 changes: 16 additions & 0 deletions implementations/elixir/lib/router/protocol/message/error.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Ockam.Router.Protocol.Message.Error do
use Ockam.Router.Protocol.Message,
type_id: 6,
schema: [code: :integer, description: :string]

def new(code, reason) when is_integer(code) and is_binary(reason) do
%__MODULE__{code: code, description: reason}
end

def new(code, reason) when is_integer(code) do
%__MODULE__{code: code, description: to_string(reason)}
rescue
Protocol.UndefinedError ->
%__MODULE__{code: code, description: inspect(reason)}
end
end
5 changes: 5 additions & 0 deletions implementations/elixir/lib/router/protocol/message/request.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule Ockam.Router.Protocol.Message.Request do
use Ockam.Router.Protocol.Message,
type_id: 8,
schema: [data: :raw]
end
5 changes: 5 additions & 0 deletions implementations/elixir/lib/router/protocol/message/send.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
defmodule Ockam.Router.Protocol.Message.Send do
use Ockam.Router.Protocol.Message,
type_id: 7,
schema: [data: :raw]
end
42 changes: 42 additions & 0 deletions implementations/elixir/lib/services.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
defmodule Ockam.Services do
use DynamicSupervisor

require Logger

def start_link([services]) when is_list(services) do
Logger.info("Starting #{__MODULE__} with #{inspect(services)}")

{:ok, pid} = DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)

for {service_name, service_config} <- services do
case start_service(service_name, service_config) do
{:ok, _} ->
:ok

{:error, reason} ->
Logger.error(
"Failed to start preconfigured service #{service_name}: #{inspect(reason)}"
)

exit(reason)
end
end

{:ok, pid}
end

def init(_) do
DynamicSupervisor.init(strategy: :one_for_one)
end

@spec start_service(atom, Keyword.t()) :: {:ok, pid} | {:error, term}
def start_service(name, config) do
{service, service_opts} = Keyword.pop!(config, :service)

Logger.info("Starting service #{inspect(service)} with config: #{inspect(service_opts)}")

meta = [name: {:via, Registry, {Ockam.Registry, to_string(name), service}}]
spec = {service, [meta, service_opts]}
DynamicSupervisor.start_child(__MODULE__, spec)
end
end
50 changes: 50 additions & 0 deletions implementations/elixir/lib/services/influx.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
defmodule Ockam.Services.Influx do
use Supervisor

alias __MODULE__.Connection

def start_link([meta, opts]) when is_list(meta) and is_list(opts) do
name = Keyword.get(meta, :name, __MODULE__)

Supervisor.start_link(__MODULE__, [name, opts])
end

@impl true
def init([name, opts]) do
children = [
__MODULE__.Fluxter.child_spec(),
{__MODULE__.ConnectionSupervisor, [name, opts]}
]

Supervisor.init(children, strategy: :rest_for_one)
end

@doc """
Connect the given process to Influx as a psuedo-client
All client operations are performed via the resulting connection, which
multiplexes operations over the connection pool started by the service.
Connections are automatically terminated if the originating process dies
"""
@spec connect(pid | atom, pid) :: {:ok, Connection.t()} | {:error, term}
defdelegate connect(sup, connecting_pid), to: __MODULE__.ConnectionSupervisor

@doc "Disconnects an active connection"
@spec disconnect(Connection.t()) :: :ok
defdelegate disconnect(conn), to: __MODULE__.Connection

@doc """
Writes the given measurement with the provided tags and fields.
Tags may be an empty list.
"""
@spec write(Connection.t(), binary(), Keyword.t(), Keyword.t()) :: :ok
defdelegate write(conn, measurement, tags, fields), to: __MODULE__.Connection

@doc """
Executes the given query (InfluxQL)
"""
@spec query(Connection.t(), binary()) :: {:ok, binary()} | {:error, term}
defdelegate query(conn, query_text), to: __MODULE__.Connection
end
Loading

0 comments on commit 6bc23e5

Please sign in to comment.