From 03760bfc02d65e1b769b3f19e15fe579df59f884 Mon Sep 17 00:00:00 2001 From: Ian Vaughan Date: Fri, 5 Oct 2018 15:32:00 +0100 Subject: [PATCH] Add kafka --- .env.example | 8 ++ .iex.exs | 30 +++++++ README.md | 3 + config/config.exs | 65 ++++++++++++++- config/dev.exs | 1 + config/prod.exs | 5 ++ config/test.exs | 7 ++ lib/slot_sync/application.ex | 6 ++ lib/slot_sync/kafka_config.ex | 28 +++++++ lib/slot_sync/publishers/publisher.ex | 116 ++++++++++++++++++++++++++ mix.exs | 11 ++- mix.lock | 2 +- priv/schemas/slots-key.json | 11 +++ priv/schemas/slots-value.json | 110 ++++++++++++++++++++++++ 14 files changed, 400 insertions(+), 3 deletions(-) create mode 100644 .iex.exs create mode 100644 config/dev.exs create mode 100644 config/prod.exs create mode 100644 config/test.exs create mode 100644 lib/slot_sync/kafka_config.ex create mode 100644 lib/slot_sync/publishers/publisher.ex create mode 100644 priv/schemas/slots-key.json create mode 100644 priv/schemas/slots-value.json diff --git a/.env.example b/.env.example index 28c89d4..a8b2805 100644 --- a/.env.example +++ b/.env.example @@ -1,3 +1,11 @@ +export KAFKA_BROKERS="localhost:9092,localhost:9093" +export KAFKA_HEARTBEAT_INTERVAL=1000 +export KAFKA_COMMIT_INTERVAL=1000 +export KAFKA_SCHEMA_REGISTRY_URL="http://localhost:8081" +export KAFKA_TOPIC_NAME="uk.london.quiqup.slots" +export START_WORKERS=true +export SCHEMA_REGISTRY_URL="http://localhost:8081" +export AVLIZER_CONFLUENT_SCHEMAREGISTRY_URL="http://localhost:8081" export STATSD_HOST="localhost" export STATSD_PORT=8125 export WIW_KEY=123 \ No newline at end of file diff --git a/.iex.exs b/.iex.exs new file mode 100644 index 0000000..ecd7945 --- /dev/null +++ b/.iex.exs @@ -0,0 +1,30 @@ +# shift = %{"account_id" => 77967, "acknowledged" => 1, "acknowledged_at" => "Mon, 01 Oct 2018 15:27:30 +0100", "actionable" => false, "alerted" => false, "block_id" => 0, "break_time" => 0, "color" => "74a611", "created_at" => "Wed, 26 Sep 2018 12:24:58 +0100", "creator_id" => 5526232, "end_time" => "Mon, 08 Oct 2018 00:15:00 +0100", "id" => 2076303948, "instances" => 0, "is_open" => false, "linked_users" => nil, "location_id" => 3999871, "notes" => "", "notified_at" => "Tue, 02 Oct 2018 22:32:58 +0100", "position_id" => 709909, "published" => true, "published_date" => "Wed, 26 Sep 2018 16:59:05 +0100", "shiftchain_key" => "", "site_id" => 3530221, "start_time" => "Sun, 07 Oct 2018 21:00:00 +0100", "updated_at" => "Mon, 01 Oct 2018 15:27:30 +0100", "user_id" => 29205213} + +shift = %{ + "account_id" => 77967, + "acknowledged" => 1, + "acknowledged_at" => nil, + "actionable" => false, + "alerted" => false, + "block_id" => 0, + "break_time" => 0, + "color" => "74a611", + "created_at" => "Wed, 26 Sep 2018 12:24:58 +0100", + "creator_id" => 5_526_232, + "end_time" => "Mon, 08 Oct 2018 00:15:00 +0100", + "id" => 2_076_303_948, + "instances" => 0, + "is_open" => false, + "linked_users" => nil, + "location_id" => 3_999_871, + "notes" => "", + "notified_at" => "Tue, 02 Oct 2018 22:32:58 +0100", + "position_id" => 709_909, + "published" => true, + "published_date" => "Wed, 26 Sep 2018 16:59:05 +0100", + "shiftchain_key" => "", + "site_id" => 3_530_221, + "start_time" => "Sun, 07 Oct 2018 21:00:00 +0100", + "updated_at" => "Mon, 01 Oct 2018 15:27:30 +0100", + "user_id" => 29_205_213 +} diff --git a/README.md b/README.md index 9e442b7..8b6be03 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,5 @@ # Slot Sync +* Syncs slot data from external provider +* Caches the data locally in Redis +* If remote data changed then publishes to Kafka diff --git a/config/config.exs b/config/config.exs index c40b433..0ba8501 100644 --- a/config/config.exs +++ b/config/config.exs @@ -8,15 +8,78 @@ use Mix.Config # if you want to provide default values for your application for # 3rd-party users, it should be done in your "mix.exs" file. -config :slot_sync, SlotSync.Application, dummy: true +config :slot_sync, SlotSync.Application, start_workers: true config :slot_sync, SlotSync.WIW, http_adaptor: HTTPoison, key: {:system, :string, "WIW_KEY"} +config :slot_sync, brokers: {:system, :string, "KAFKA_BROKERS", "localhost:9092"} + +config :kafka_ex, + # Dont change this value, the actual value is being set by the statement above (config :slot_sync, brokers:), + # to bypass limitation of kafka_ex 0.8.3, which doesn't use Confex + brokers: [{"localhost", 9092}], + + # Ensure that the schema registry url is set and the host and port are valid. + # schema_versions: %{"com.quiqup.tracking_locations": "1"}, + + # Set this value to true if you do not want the default + # `KafkaEx.Server` worker to start during application start-up - + # i.e., if you want to start your own set of named workers + disable_default_worker: true, + # Timeout value, in msec, for synchronous operations (e.g., network calls). + # If this value is greater than GenServer's default timeout of 5000, it will also + # be used as the timeout for work dispatched via KafkaEx.Server.call (e.g., KafkaEx.metadata). + # In those cases, it should be considered a 'total timeout', encompassing both network calls and + # wait time for the genservers. + sync_timeout: 3000, + # Supervision max_restarts - the maximum amount of restarts allowed in a time frame + max_restarts: 10, + # Supervision max_seconds - the time frame in which :max_restarts applies + max_seconds: 60, + # Interval in milliseconds that GenConsumer waits to commit offsets. + commit_interval: 5_000, + # Threshold number of messages consumed for GenConsumer to commit offsets + # to the broker. + commit_threshold: 100, + use_ssl: false, + kafka_version: "1.0.1" + +config :avlizer, + avlizer_confluent: %{ + schema_registry_url: + {:system, "AVLIZER_CONFLUENT_SCHEMAREGISTRY_URL", "http://localhost:8081"} + } + +config :event_serializer, + schema_registry_url: {:system, "AVLIZER_CONFLUENT_SCHEMAREGISTRY_URL", "http://localhost:8081"}, + topic_name: {:system, "KAFKA_TOPIC_NAME", "uk.london.quiqup.slots"} + config :slot_sync, SlotSync.Datadog, host: {:system, "STATSD_HOST"}, port: {:system, :integer, "STATSD_PORT"}, namespace: "slot_sync", module: DogStatsd +config :slot_sync, SlotSync.Publishers.Kafka, + event_serializer_encoder: EventSerializer.Encoder, + kafka_client: KafkaEx, + topic_name: {:system, :string, "KAFKA_TOPIC_NAME", "uk.london.quiqup.slots"} + +config :slot_sync, SlotSync.Processor.Shift, publisher: SlotSync.Publishers.Kafka + +config :ktsllex, + run_migrations: true, + schema_registry_host: {:system, "AVLIZER_CONFLUENT_SCHEMAREGISTRY_URL", "http://localhost:8081"}, + base_path: {:system, "KAFKA_SCHEMA_BASE_PATH", "./schemas/slots"}, + schema_name: {:system, "KAFKA_SCHEMA_NAME", "uk.london.quiqup.slots"}, + app_name: :slot_sync, + lenses_host: {:system, "LENSES_HOST", "http://localhost:3030"}, + lenses_user: {:system, "LENSES_USER", "admin"}, + lenses_pass: {:system, "LENSES_PASS", "admin"}, + lenses_topic: {:system, "LENSES_TOPIC", "uk.london.quiqup.slots"} + +# Import environment specific config. This must remain at the bottom +# of this file so it overrides the configuration defined above. +import_config "#{Mix.env()}.exs" diff --git a/config/dev.exs b/config/dev.exs new file mode 100644 index 0000000..d2d855e --- /dev/null +++ b/config/dev.exs @@ -0,0 +1 @@ +use Mix.Config diff --git a/config/prod.exs b/config/prod.exs new file mode 100644 index 0000000..5000af1 --- /dev/null +++ b/config/prod.exs @@ -0,0 +1,5 @@ +use Mix.Config + +# Do not print debug messages in production +config :logger, level: :info + diff --git a/config/test.exs b/config/test.exs new file mode 100644 index 0000000..bb9a712 --- /dev/null +++ b/config/test.exs @@ -0,0 +1,7 @@ +use Mix.Config +config :logger, level: :debug + +config :slot_sync, SlotSync.Application, start_workers: false + + +config :event_serializer, enabled: false diff --git a/lib/slot_sync/application.ex b/lib/slot_sync/application.ex index 3ff0814..5e18322 100644 --- a/lib/slot_sync/application.ex +++ b/lib/slot_sync/application.ex @@ -3,8 +3,10 @@ defmodule SlotSync.Application do use Confex, otp_app: :slot_sync import Supervisor.Spec + import SlotSync.KafkaConfig require DogStatsd + @spec start(any(), any()) :: :ignore | {:error, any()} | {:ok, pid()} def start(_type, _args) do opts = [strategy: :one_for_one, name: SlotSync.Supervisor] @@ -14,6 +16,10 @@ defmodule SlotSync.Application do tags: ["application_start"] }) + if config()[:start_workers] do + KafkaEx.create_worker(:kafka_ex, uris: brokers()) + end + app end diff --git a/lib/slot_sync/kafka_config.ex b/lib/slot_sync/kafka_config.ex new file mode 100644 index 0000000..ada9af9 --- /dev/null +++ b/lib/slot_sync/kafka_config.ex @@ -0,0 +1,28 @@ +defmodule SlotSync.KafkaConfig do + require Logger + + def brokers do + :slot_sync + |> Confex.get_env(:brokers) + |> brokers() + |> IO.inspect(label: "brokers") + end + + defp brokers(nil), do: nil + defp brokers(list) when is_list(list), do: list + + defp brokers(value) when is_binary(value) do + for line <- String.split(value, ","), into: [] do + case line |> String.trim() |> String.split(":") do + [host] -> + msg = "Port not set for kafka broker #{host}" + Logger.warn(msg) + raise msg + + [host, port] -> + {port, _} = Integer.parse(port) + {host, port} + end + end + end +end diff --git a/lib/slot_sync/publishers/publisher.ex b/lib/slot_sync/publishers/publisher.ex new file mode 100644 index 0000000..75b799d --- /dev/null +++ b/lib/slot_sync/publishers/publisher.ex @@ -0,0 +1,116 @@ +defmodule SlotSync.Publishers.Kafka do + @moduledoc """ + Formats and encodes the payload and the publishes it to Kafka + """ + use Confex, otp_app: :slot_sync + + alias SlotSync.TrackingLocationsFormatter + alias KafkaEx.Protocol.Produce.Request + alias KafkaEx.Protocol.Produce.Message + require Logger + + # @type key() :: String.t() + # @type event() :: String.t() + # @type topic_name() :: String.t() | atom() + # @type encoded_message() :: String.t() + + @doc """ + This is the main function to trigger a message to be published to Kafka + + ## Examples + + """ + + # @type message_number() :: integer() + # @spec call(map(), integer(), topic_name()) :: {:ok, message_number()} | {:error, String.t()} + def call(data, id) do + id + |> build_event_key() + |> encode_event_message(data) + |> build_request(data) + |> publish() + end + + defp build_event_key(id), do: [{"id", id}] + + # @spec encode_event_message(key, event, topic_name) :: + # {:encoded_key, encoded_message, encoded_message: encoded_message} | {:error, key} + defp encode_event_message(key, event) do + with {:ok, encoded_key} <- encoder().call(key_schema_name(), key), + {:ok, encoded_message} <- encoder().call(value_schema_name(), transformed(event)) do + %{encoded_key: encoded_key, encoded_message: encoded_message} + else + {:error, error} -> {:error, error} + end + end + + defp transformed(event) do + event["linked_users"] |> IO.inspect(label: "linked_users") + + [ + {"account_id", event["account_id"]}, + {"acknowledged", event["acknowledged"]}, + {"acknowledged_at", string_or_nil(event["acknowledged_at"])}, + {"actionable", event["actionable"]}, + {"alerted", event["alerted"]}, + {"block_id", event["block_id"]}, + {"break_time", event["break_time"]}, + {"color", event["color"]}, + {"created_at", event["created_at"]}, + {"creator_id", event["creator_id"]}, + {"end_time", event["end_time"]}, + {"id", event["id"]}, + {"instances", event["instances"]}, + {"is_open", event["is_open"]}, + # {"linked_users", event["linked_users"]}, + {"linked_users", :null}, + {"location_id", event["location_id"]}, + {"notes", event["notes"]}, + {"notified_at", string_or_nil(event["notified_at"])}, + {"position_id", event["position_id"]}, + {"published", event["published"]}, + {"published_date", event["published_date"]}, + {"shiftchain_key", event["shiftchain_key"]}, + {"site_id", event["site_id"]}, + {"start_time", event["start_time"]}, + {"updated_at", event["updated_at"]}, + {"user_id", event["user_id"]} + ] + end + + defp string_or_nil(_), do: {"null", :null} + defp string_or_nil(value), do: {"string", value} + + defp build_request({:error, error}, event) do + stats("publisher.encoding.error") + Logger.warn("Failed to decode event:#{inspect(event)}, error:#{inspect(error)}") + {:error, error} + end + + defp build_request(%{encoded_key: encoded_key, encoded_message: encoded_message}, _event) do + stats("publisher.encoding.success") + + messages = [%Message{key: encoded_key, value: encoded_message}] + + {:ok, + %Request{ + topic: topic_name(), + partition: 0, + required_acks: 1, + messages: messages + }} + end + + defp publish({:error, error}), do: {:error, error} + defp publish({:ok, request}), do: request |> kafka_client().produce() + defp kafka_client(), do: config()[:kafka_client] + + defp key_schema_name, do: topic_name() <> "-key" + defp value_schema_name, do: topic_name() <> "-value" + + defp topic_name, do: config()[:topic_name] + + defp encoder(), do: config()[:event_serializer_encoder] + + defp stats(name), do: DogStatsd.increment(:datadogstatsd, name) +end diff --git a/mix.exs b/mix.exs index 45b713c..4ae8263 100644 --- a/mix.exs +++ b/mix.exs @@ -22,6 +22,9 @@ defmodule SlotSync.Mixfile do :logger, :confex, :dogstatsd, + :ktsllex, + :event_serializer, + :kafka_ex ] ] end @@ -38,7 +41,13 @@ defmodule SlotSync.Mixfile do {:dogstatsd, "~> 0.0.3"}, {:redix, ">= 0.0.0"}, {:sched_ex, "~> 1.0"}, - {:timex, "~> 3.1"} + {:timex, "~> 3.1"}, + {:kafka_ex, + git: "https://github.com/quiqupltd/kafka_ex.git", + branch: "remove-raise-and-handle-reconnection"}, + {:avlizer, "~> 0.2.1"}, + {:ktsllex, github: "quiqupltd/ktsllex"}, + {:event_serializer, git: "git@gitlab.quiqup.com:backend/event_serializer.git"} ] end diff --git a/mix.lock b/mix.lock index 84dcfc5..682b7c5 100644 --- a/mix.lock +++ b/mix.lock @@ -14,7 +14,7 @@ "idna": {:hex, :idna, "6.0.0", "689c46cbcdf3524c44d5f3dde8001f364cd7608a99556d8fbd8239a5798d4c10", [:rebar3], [{:unicode_util_compat, "0.4.1", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm"}, "jsone": {:hex, :jsone, "1.4.6", "644d6d57befb22c8e19b324dee19d73b1c004565009861a8f64c68b7b9e64dbf", [:rebar3], [], "hexpm"}, "kafka_ex": {:git, "https://github.com/quiqupltd/kafka_ex.git", "2a2ece78fcd5383026f7dfb59714f82229827951", [branch: "remove-raise-and-handle-reconnection"]}, - "ktsllex": {:git, "https://github.com/quiqupltd/ktsllex.git", "8a303915088cd7f0d9ec55f48442796a9b7ffaa2", []}, + "ktsllex": {:git, "https://github.com/quiqupltd/ktsllex.git", "37ba9f033833e470b3dc227147435d99ce5db5dc", []}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, "mime": {:hex, :mime, "1.3.0", "5e8d45a39e95c650900d03f897fbf99ae04f60ab1daa4a34c7a20a5151b7a5fe", [:mix], [], "hexpm"}, "mimerl": {:hex, :mimerl, "1.0.2", "993f9b0e084083405ed8252b99460c4f0563e41729ab42d9074fd5e52439be88", [:rebar3], [], "hexpm"}, diff --git a/priv/schemas/slots-key.json b/priv/schemas/slots-key.json new file mode 100644 index 0000000..c5043bd --- /dev/null +++ b/priv/schemas/slots-key.json @@ -0,0 +1,11 @@ +{ + "type": "record", + "name": "Key", + "namespace": "uk.london.quiqup.slots", + "fields": [{ + "name": "id", + "type": "int", + "doc": " The id of the user from which the slot was generated" + }], + "connect.name": "uk.london.quiqup.slots.Key" +} \ No newline at end of file diff --git a/priv/schemas/slots-value.json b/priv/schemas/slots-value.json new file mode 100644 index 0000000..5672240 --- /dev/null +++ b/priv/schemas/slots-value.json @@ -0,0 +1,110 @@ +{ + "type": "record", + "name": "slot", + "namespace": "uk.london.quiqup.slots", + "fields": [{ + "name": "account_id", + "type": "long" + }, + { + "name": "acknowledged", + "type": "long" + }, + { + "name": "acknowledged_at", + "type": ["null", "string"] + }, + { + "name": "actionable", + "type": "boolean" + }, + { + "name": "alerted", + "type": "boolean" + }, + { + "name": "block_id", + "type": "long" + }, + { + "name": "break_time", + "type": "long" + }, + { + "name": "color", + "type": "string" + }, + { + "name": "created_at", + "type": "string" + }, + { + "name": "creator_id", + "type": "long" + }, + { + "name": "end_time", + "type": "string" + }, + { + "name": "id", + "type": "long" + }, + { + "name": "instances", + "type": "long" + }, + { + "name": "is_open", + "type": "boolean" + }, + { + "name": "linked_users", + "type": ["null", "string"] + }, + { + "name": "location_id", + "type": "long" + }, + { + "name": "notes", + "type": "string" + }, + { + "name": "notified_at", + "type": ["null", "string"] + }, + { + "name": "position_id", + "type": "long" + }, + { + "name": "published", + "type": "boolean" + }, + { + "name": "published_date", + "type": "string" + }, + { + "name": "shiftchain_key", + "type": "string" + }, + { + "name": "site_id", + "type": "long" + }, + { + "name": "start_time", + "type": "string" + }, + { + "name": "updated_at", + "type": "string" + }, + { + "name": "user_id", + "type": "long" + } + ] +} \ No newline at end of file