Skip to content

Commit

Permalink
Add kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
Ian Vaughan committed Oct 5, 2018
1 parent 5881802 commit 03760bf
Show file tree
Hide file tree
Showing 14 changed files with 400 additions and 3 deletions.
8 changes: 8 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
export KAFKA_BROKERS="localhost:9092,localhost:9093"
export KAFKA_HEARTBEAT_INTERVAL=1000
export KAFKA_COMMIT_INTERVAL=1000
export KAFKA_SCHEMA_REGISTRY_URL="http://localhost:8081"
export KAFKA_TOPIC_NAME="uk.london.quiqup.slots"
export START_WORKERS=true
export SCHEMA_REGISTRY_URL="http://localhost:8081"
export AVLIZER_CONFLUENT_SCHEMAREGISTRY_URL="http://localhost:8081"
export STATSD_HOST="localhost"
export STATSD_PORT=8125
export WIW_KEY=123
30 changes: 30 additions & 0 deletions .iex.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# shift = %{"account_id" => 77967, "acknowledged" => 1, "acknowledged_at" => "Mon, 01 Oct 2018 15:27:30 +0100", "actionable" => false, "alerted" => false, "block_id" => 0, "break_time" => 0, "color" => "74a611", "created_at" => "Wed, 26 Sep 2018 12:24:58 +0100", "creator_id" => 5526232, "end_time" => "Mon, 08 Oct 2018 00:15:00 +0100", "id" => 2076303948, "instances" => 0, "is_open" => false, "linked_users" => nil, "location_id" => 3999871, "notes" => "", "notified_at" => "Tue, 02 Oct 2018 22:32:58 +0100", "position_id" => 709909, "published" => true, "published_date" => "Wed, 26 Sep 2018 16:59:05 +0100", "shiftchain_key" => "", "site_id" => 3530221, "start_time" => "Sun, 07 Oct 2018 21:00:00 +0100", "updated_at" => "Mon, 01 Oct 2018 15:27:30 +0100", "user_id" => 29205213}

shift = %{
"account_id" => 77967,
"acknowledged" => 1,
"acknowledged_at" => nil,
"actionable" => false,
"alerted" => false,
"block_id" => 0,
"break_time" => 0,
"color" => "74a611",
"created_at" => "Wed, 26 Sep 2018 12:24:58 +0100",
"creator_id" => 5_526_232,
"end_time" => "Mon, 08 Oct 2018 00:15:00 +0100",
"id" => 2_076_303_948,
"instances" => 0,
"is_open" => false,
"linked_users" => nil,
"location_id" => 3_999_871,
"notes" => "",
"notified_at" => "Tue, 02 Oct 2018 22:32:58 +0100",
"position_id" => 709_909,
"published" => true,
"published_date" => "Wed, 26 Sep 2018 16:59:05 +0100",
"shiftchain_key" => "",
"site_id" => 3_530_221,
"start_time" => "Sun, 07 Oct 2018 21:00:00 +0100",
"updated_at" => "Mon, 01 Oct 2018 15:27:30 +0100",
"user_id" => 29_205_213
}
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
# Slot Sync

* Syncs slot data from external provider
* Caches the data locally in Redis
* If remote data changed then publishes to Kafka
65 changes: 64 additions & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,78 @@ use Mix.Config
# if you want to provide default values for your application for
# 3rd-party users, it should be done in your "mix.exs" file.

config :slot_sync, SlotSync.Application, dummy: true
config :slot_sync, SlotSync.Application, start_workers: true

config :slot_sync, SlotSync.WIW,
http_adaptor: HTTPoison,
key: {:system, :string, "WIW_KEY"}

config :slot_sync, brokers: {:system, :string, "KAFKA_BROKERS", "localhost:9092"}

config :kafka_ex,
# Dont change this value, the actual value is being set by the statement above (config :slot_sync, brokers:),
# to bypass limitation of kafka_ex 0.8.3, which doesn't use Confex
brokers: [{"localhost", 9092}],

# Ensure that the schema registry url is set and the host and port are valid.
# schema_versions: %{"com.quiqup.tracking_locations": "1"},

# Set this value to true if you do not want the default
# `KafkaEx.Server` worker to start during application start-up -
# i.e., if you want to start your own set of named workers
disable_default_worker: true,
# Timeout value, in msec, for synchronous operations (e.g., network calls).
# If this value is greater than GenServer's default timeout of 5000, it will also
# be used as the timeout for work dispatched via KafkaEx.Server.call (e.g., KafkaEx.metadata).
# In those cases, it should be considered a 'total timeout', encompassing both network calls and
# wait time for the genservers.
sync_timeout: 3000,
# Supervision max_restarts - the maximum amount of restarts allowed in a time frame
max_restarts: 10,
# Supervision max_seconds - the time frame in which :max_restarts applies
max_seconds: 60,
# Interval in milliseconds that GenConsumer waits to commit offsets.
commit_interval: 5_000,
# Threshold number of messages consumed for GenConsumer to commit offsets
# to the broker.
commit_threshold: 100,
use_ssl: false,
kafka_version: "1.0.1"

config :avlizer,
avlizer_confluent: %{
schema_registry_url:
{:system, "AVLIZER_CONFLUENT_SCHEMAREGISTRY_URL", "http://localhost:8081"}
}

config :event_serializer,
schema_registry_url: {:system, "AVLIZER_CONFLUENT_SCHEMAREGISTRY_URL", "http://localhost:8081"},
topic_name: {:system, "KAFKA_TOPIC_NAME", "uk.london.quiqup.slots"}

config :slot_sync, SlotSync.Datadog,
host: {:system, "STATSD_HOST"},
port: {:system, :integer, "STATSD_PORT"},
namespace: "slot_sync",
module: DogStatsd

config :slot_sync, SlotSync.Publishers.Kafka,
event_serializer_encoder: EventSerializer.Encoder,
kafka_client: KafkaEx,
topic_name: {:system, :string, "KAFKA_TOPIC_NAME", "uk.london.quiqup.slots"}

config :slot_sync, SlotSync.Processor.Shift, publisher: SlotSync.Publishers.Kafka

config :ktsllex,
run_migrations: true,
schema_registry_host: {:system, "AVLIZER_CONFLUENT_SCHEMAREGISTRY_URL", "http://localhost:8081"},
base_path: {:system, "KAFKA_SCHEMA_BASE_PATH", "./schemas/slots"},
schema_name: {:system, "KAFKA_SCHEMA_NAME", "uk.london.quiqup.slots"},
app_name: :slot_sync,
lenses_host: {:system, "LENSES_HOST", "http://localhost:3030"},
lenses_user: {:system, "LENSES_USER", "admin"},
lenses_pass: {:system, "LENSES_PASS", "admin"},
lenses_topic: {:system, "LENSES_TOPIC", "uk.london.quiqup.slots"}

# Import environment specific config. This must remain at the bottom
# of this file so it overrides the configuration defined above.
import_config "#{Mix.env()}.exs"
1 change: 1 addition & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
use Mix.Config
5 changes: 5 additions & 0 deletions config/prod.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use Mix.Config

# Do not print debug messages in production
config :logger, level: :info

7 changes: 7 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use Mix.Config
config :logger, level: :debug

config :slot_sync, SlotSync.Application, start_workers: false


config :event_serializer, enabled: false
6 changes: 6 additions & 0 deletions lib/slot_sync/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,10 @@ defmodule SlotSync.Application do
use Confex, otp_app: :slot_sync

import Supervisor.Spec
import SlotSync.KafkaConfig
require DogStatsd

@spec start(any(), any()) :: :ignore | {:error, any()} | {:ok, pid()}
def start(_type, _args) do
opts = [strategy: :one_for_one, name: SlotSync.Supervisor]

Expand All @@ -14,6 +16,10 @@ defmodule SlotSync.Application do
tags: ["application_start"]
})

if config()[:start_workers] do
KafkaEx.create_worker(:kafka_ex, uris: brokers())
end

app
end

Expand Down
28 changes: 28 additions & 0 deletions lib/slot_sync/kafka_config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
defmodule SlotSync.KafkaConfig do
require Logger

def brokers do
:slot_sync
|> Confex.get_env(:brokers)
|> brokers()
|> IO.inspect(label: "brokers")
end

defp brokers(nil), do: nil
defp brokers(list) when is_list(list), do: list

defp brokers(value) when is_binary(value) do
for line <- String.split(value, ","), into: [] do
case line |> String.trim() |> String.split(":") do
[host] ->
msg = "Port not set for kafka broker #{host}"
Logger.warn(msg)
raise msg

[host, port] ->
{port, _} = Integer.parse(port)
{host, port}
end
end
end
end
116 changes: 116 additions & 0 deletions lib/slot_sync/publishers/publisher.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
defmodule SlotSync.Publishers.Kafka do
@moduledoc """
Formats and encodes the payload and the publishes it to Kafka
"""
use Confex, otp_app: :slot_sync

alias SlotSync.TrackingLocationsFormatter
alias KafkaEx.Protocol.Produce.Request
alias KafkaEx.Protocol.Produce.Message
require Logger

# @type key() :: String.t()
# @type event() :: String.t()
# @type topic_name() :: String.t() | atom()
# @type encoded_message() :: String.t()

@doc """
This is the main function to trigger a message to be published to Kafka
## Examples
"""

# @type message_number() :: integer()
# @spec call(map(), integer(), topic_name()) :: {:ok, message_number()} | {:error, String.t()}
def call(data, id) do
id
|> build_event_key()
|> encode_event_message(data)
|> build_request(data)
|> publish()
end

defp build_event_key(id), do: [{"id", id}]

# @spec encode_event_message(key, event, topic_name) ::
# {:encoded_key, encoded_message, encoded_message: encoded_message} | {:error, key}
defp encode_event_message(key, event) do
with {:ok, encoded_key} <- encoder().call(key_schema_name(), key),
{:ok, encoded_message} <- encoder().call(value_schema_name(), transformed(event)) do
%{encoded_key: encoded_key, encoded_message: encoded_message}
else
{:error, error} -> {:error, error}
end
end

defp transformed(event) do
event["linked_users"] |> IO.inspect(label: "linked_users")

[
{"account_id", event["account_id"]},
{"acknowledged", event["acknowledged"]},
{"acknowledged_at", string_or_nil(event["acknowledged_at"])},
{"actionable", event["actionable"]},
{"alerted", event["alerted"]},
{"block_id", event["block_id"]},
{"break_time", event["break_time"]},
{"color", event["color"]},
{"created_at", event["created_at"]},
{"creator_id", event["creator_id"]},
{"end_time", event["end_time"]},
{"id", event["id"]},
{"instances", event["instances"]},
{"is_open", event["is_open"]},
# {"linked_users", event["linked_users"]},
{"linked_users", :null},
{"location_id", event["location_id"]},
{"notes", event["notes"]},
{"notified_at", string_or_nil(event["notified_at"])},
{"position_id", event["position_id"]},
{"published", event["published"]},
{"published_date", event["published_date"]},
{"shiftchain_key", event["shiftchain_key"]},
{"site_id", event["site_id"]},
{"start_time", event["start_time"]},
{"updated_at", event["updated_at"]},
{"user_id", event["user_id"]}
]
end

defp string_or_nil(_), do: {"null", :null}
defp string_or_nil(value), do: {"string", value}

defp build_request({:error, error}, event) do
stats("publisher.encoding.error")
Logger.warn("Failed to decode event:#{inspect(event)}, error:#{inspect(error)}")
{:error, error}
end

defp build_request(%{encoded_key: encoded_key, encoded_message: encoded_message}, _event) do
stats("publisher.encoding.success")

messages = [%Message{key: encoded_key, value: encoded_message}]

{:ok,
%Request{
topic: topic_name(),
partition: 0,
required_acks: 1,
messages: messages
}}
end

defp publish({:error, error}), do: {:error, error}
defp publish({:ok, request}), do: request |> kafka_client().produce()
defp kafka_client(), do: config()[:kafka_client]

defp key_schema_name, do: topic_name() <> "-key"
defp value_schema_name, do: topic_name() <> "-value"

defp topic_name, do: config()[:topic_name]

defp encoder(), do: config()[:event_serializer_encoder]

defp stats(name), do: DogStatsd.increment(:datadogstatsd, name)
end
11 changes: 10 additions & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ defmodule SlotSync.Mixfile do
:logger,
:confex,
:dogstatsd,
:ktsllex,
:event_serializer,
:kafka_ex
]
]
end
Expand All @@ -38,7 +41,13 @@ defmodule SlotSync.Mixfile do
{:dogstatsd, "~> 0.0.3"},
{:redix, ">= 0.0.0"},
{:sched_ex, "~> 1.0"},
{:timex, "~> 3.1"}
{:timex, "~> 3.1"},
{:kafka_ex,
git: "https://github.com/quiqupltd/kafka_ex.git",
branch: "remove-raise-and-handle-reconnection"},
{:avlizer, "~> 0.2.1"},
{:ktsllex, github: "quiqupltd/ktsllex"},
{:event_serializer, git: "git@gitlab.quiqup.com:backend/event_serializer.git"}
]
end

Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
"idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"},
"jsone": {:hex, :jsone, "1.4.6", "644d6d57befb22c8e19b324dee19d73b1c004565009861a8f64c68b7b9e64dbf", [:rebar3], [], "hexpm"},
"kafka_ex": {:git, "https://github.com/quiqupltd/kafka_ex.git", "2a2ece78fcd5383026f7dfb59714f82229827951", [branch: "remove-raise-and-handle-reconnection"]},
"ktsllex": {:git, "https://github.com/quiqupltd/ktsllex.git", "8a303915088cd7f0d9ec55f48442796a9b7ffaa2", []},
"ktsllex": {:git, "https://github.com/quiqupltd/ktsllex.git", "37ba9f033833e470b3dc227147435d99ce5db5dc", []},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"},
"mime": {:hex, :mime, "1.3.0", "5e8d45a39e95c650900d03f897fbf99ae04f60ab1daa4a34c7a20a5151b7a5fe", [:mix], [], "hexpm"},
"mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm"},
Expand Down
11 changes: 11 additions & 0 deletions priv/schemas/slots-key.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
{
"type": "record",
"name": "Key",
"namespace": "uk.london.quiqup.slots",
"fields": [{
"name": "id",
"type": "int",
"doc": " The id of the user from which the slot was generated"
}],
"connect.name": "uk.london.quiqup.slots.Key"
}
Loading

0 comments on commit 03760bf

Please sign in to comment.