Skip to content

Commit

Permalink
Support display of all types of Broadway names (#13)
Browse files Browse the repository at this point in the history
This change adds support for Broadway names that are pids
or names that uses `:via`.
  • Loading branch information
feng19 authored Sep 7, 2021
1 parent 5bddcee commit 7996eea
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 49 deletions.
43 changes: 13 additions & 30 deletions lib/broadway_dashboard.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ defmodule BroadwayDashboard do
|> String.split("<!-- MDOC !-->")
|> Enum.fetch!(1)

alias BroadwayDashboard.Metrics
alias BroadwayDashboard.PipelineGraph
alias BroadwayDashboard.{Metrics, PipelineGraph}

# We check the Broadway version installed on remote nodes.
# This should match mix.exs.
Expand Down Expand Up @@ -52,11 +51,11 @@ defmodule BroadwayDashboard do

defp running_pipelines(node) do
case :rpc.call(node, Broadway, :all_running, []) do
[] ->
{:error, :no_pipelines_available}

pipelines when is_list(pipelines) ->
case Enum.filter(pipelines, &is_atom/1) do
[] -> {:error, :no_pipelines_available}
pipelines_names -> {:ok, pipelines_names}
end
{:ok, pipelines}

{:badrpc, _error} ->
{:error, :cannot_list_running_pipelines}
Expand All @@ -68,9 +67,7 @@ defmodule BroadwayDashboard do
case pipelines_or_auto_discover(pipelines, socket.assigns.page.node) do
{:ok, pipelines} ->
socket = assign(socket, :pipelines, pipelines)

nav_pipeline = nav_pipeline(params)
pipeline = nav_pipeline && Enum.find(pipelines, fn name -> name == nav_pipeline end)
pipeline = nav_pipeline(params, pipelines)

cond do
pipeline ->
Expand All @@ -95,7 +92,8 @@ defmodule BroadwayDashboard do
end

true ->
to = live_dashboard_path(socket, socket.assigns.page, nav: hd(pipelines))
nav = pipelines |> hd() |> inspect()
to = live_dashboard_path(socket, socket.assigns.page, nav: nav)
{:ok, push_redirect(socket, to: to)}
end

Expand All @@ -104,19 +102,10 @@ defmodule BroadwayDashboard do
end
end

defp nav_pipeline(params) do
defp nav_pipeline(params, pipelines) do
nav = params["nav"]
nav = if nav && nav != "", do: nav

if nav do
to_existing_atom_or_nil(nav)
end
end

defp to_existing_atom_or_nil(nav) do
String.to_existing_atom(nav)
rescue
ArgumentError -> nil
nav && Enum.find(pipelines, fn name -> inspect(name) == nav end)
end

defp check_socket_connection(socket) do
Expand Down Expand Up @@ -154,22 +143,16 @@ defmodule BroadwayDashboard do
else
items =
for name <- assigns.pipelines do
name = inspect(name)

{name,
name: format_nav_name(name),
render: fn -> render_pipeline_or_error(assigns) end,
method: :redirect}
name: name, render: fn -> render_pipeline_or_error(assigns) end, method: :redirect}
end

nav_bar(items: items)
end
end

defp format_nav_name(pipeline_name) do
"Elixir." <> name = Atom.to_string(pipeline_name)

name
end

defp render_pipeline_or_error(assigns) do
if assigns[:error] do
render_error(assigns)
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule BroadwayDashboard.MixProject do
defp deps do
[
{:broadway, "~> 1.0"},
{:phoenix_live_dashboard, "~> 0.5.0"},
{:phoenix_live_dashboard, "~> 0.5.1"},
{:phoenix_live_reload, "~> 1.2", only: :dev},
{:plug_cowboy, "~> 2.0", only: :dev},
{:jason, "~> 1.0", only: [:dev, :test, :docs]},
Expand Down
10 changes: 5 additions & 5 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"cowboy_telemetry": {:hex, :cowboy_telemetry, "0.3.1", "ebd1a1d7aff97f27c66654e78ece187abdc646992714164380d8a041eda16754", [:rebar3], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "3a6efd3366130eab84ca372cbd4a7d3c3a97bdfcfb4911233b035d117063f0af"},
"cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"},
"earmark_parser": {:hex, :earmark_parser, "1.4.15", "b29e8e729f4aa4a00436580dcc2c9c5c51890613457c193cc8525c388ccb2f06", [:mix], [], "hexpm", "044523d6438ea19c1b8ec877ec221b008661d3c27e3b848f4c879f500421ca5c"},
"ex_doc": {:hex, :ex_doc, "0.25.1", "4b736fa38dc76488a937e5ef2944f5474f3eff921de771b25371345a8dc810bc", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "3200b0a69ddb2028365281fbef3753ea9e728683863d8cdaa96580925c891f67"},
"ex_doc": {:hex, :ex_doc, "0.25.2", "4f1cae793c4d132e06674b282f1d9ea3bf409bcca027ddb2fe177c4eed6a253f", [:mix], [{:earmark_parser, "~> 1.4.0", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "5b0c172e87ac27f14dfd152d52a145238ec71a95efbf29849550278c58a393d6"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"floki": {:hex, :floki, "0.27.0", "6b29a14283f1e2e8fad824bc930eaa9477c462022075df6bea8f0ad811c13599", [:mix], [{:html_entities, "~> 0.5.0", [hex: :html_entities, repo: "hexpm", optional: false]}], "hexpm", "583b8c13697c37179f1f82443bcc7ad2f76fbc0bf4c186606eebd658f7f2631b"},
"gen_stage": {:hex, :gen_stage, "1.1.1", "78d83b14ca742f4c252770bcdf674d83378ca41579c387c57e2f06d70f596317", [:mix], [], "hexpm", "eb90d2d72609050a66ce42b7d4a69323a60c892a09ead0680d5d8ef16b9a034e"},
Expand All @@ -16,11 +16,11 @@
"mime": {:hex, :mime, "2.0.1", "0de4c81303fe07806ebc2494d5321ce8fb4df106e34dd5f9d787b637ebadc256", [:mix], [], "hexpm", "7a86b920d2aedce5fb6280ac8261ac1a739ae6c1a1ad38f5eadf910063008942"},
"nimble_options": {:hex, :nimble_options, "0.3.6", "365d03c05d43483d3eacf820671dafce5b49d692667b3bb8cae28447fd2414ef", [:mix], [], "hexpm", "1c1d3536c4aee1be2c8f3c691bf27c62dbd88d9bb3a0b1a011913453932e8c15"},
"nimble_parsec": {:hex, :nimble_parsec, "1.1.0", "3a6fca1550363552e54c216debb6a9e95bd8d32348938e13de5eda962c0d7f89", [:mix], [], "hexpm", "08eb32d66b706e913ff748f11694b17981c0b04a33ef470e33e11b3d3ac8f54b"},
"phoenix": {:hex, :phoenix, "1.5.10", "3ee7d5c17ff9626d72d374d8fc8909bf00f4323fd15549fbe3abbbd38b5299c8", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 2.13 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.1.2 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f9c2eaa5a8fe5a412610c6aa84ccdb6f3e92f333d4df7fbaeb0d5a157dbfb48d"},
"phoenix_html": {:hex, :phoenix_html, "3.0.1", "1bf7351b0c5f95965fba1f54ec38bf31798703dfe44026fe0e27dce65b534da9", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "89c5e350558d695d4f175b181a766b54beb9bf9b8e010a008ceb02037ab02e3a"},
"phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.5.0", "3282d8646e1bfc1ef1218f508d9fcefd48cf47f9081b7667bd9b281b688a49cf", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.6", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.16.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "609740be43de94ae0abd2c4300ff0356a6e8a9487bf340e69967643a59fa7ec8"},
"phoenix": {:hex, :phoenix, "1.5.12", "75fddb14c720388eea93d33886166a690416a7ff8633fbd93f364355b6fe1166", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_html, "~> 2.13 or ~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.0", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:plug, "~> 1.10", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 1.0 or ~> 2.2", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.1.2 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "8f0ae6734fcc18bbaa646c161e2febc46fb899eae43f82679b92530983324113"},
"phoenix_html": {:hex, :phoenix_html, "3.0.3", "32812d70841c7e975e01edb591989b2b002b69797db1005b8d0adc1fe717be30", [:mix], [{:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "e8152ae9e8c60705659761edb8d8c4bb7e29130a9b0803ec1854fe137ec62dde"},
"phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.5.1", "eec48a6e07a34a5692f0a3bd0810999b3467efd9e79145569357f18cd9081af9", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.6", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.16.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "b9e6a4101aec4b7a970ab3dae66dccc0bbbf133cc3be596bec953d405f903f8e"},
"phoenix_live_reload": {:hex, :phoenix_live_reload, "1.3.3", "3a53772a6118d5679bf50fc1670505a290e32a1d195df9e069d8c53ab040c054", [:mix], [{:file_system, "~> 0.2.1 or ~> 0.3", [hex: :file_system, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.4", [hex: :phoenix, repo: "hexpm", optional: false]}], "hexpm", "766796676e5f558dbae5d1bdb066849673e956005e3730dfd5affd7a6da4abac"},
"phoenix_live_view": {:hex, :phoenix_live_view, "0.16.0", "913d5a51b72fb6035762a4b443f22b515b4047b9872f82cb9afe44df850ee968", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix, "~> 1.5.9 or ~> 1.6.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.0.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "eed6ba0aae9dd3936b72b46444e580896d740c0dc1d4c42049fb95ea5009db3c"},
"phoenix_live_view": {:hex, :phoenix_live_view, "0.16.3", "f6f597c74cfc8b00919eb717852b9b750fc326f815ef6b8d6ae503c7d9d09871", [:mix], [{:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix, "~> 1.5.9 or ~> 1.6.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 3.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.2 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "42bac6b82fd182f10b0b3b39d57aaf2fbe5eab423f8216afeeea9c6b1bd14554"},
"phoenix_pubsub": {:hex, :phoenix_pubsub, "2.0.0", "a1ae76717bb168cdeb10ec9d92d1480fec99e3080f011402c0a2d68d47395ffb", [:mix], [], "hexpm", "c52d948c4f261577b9c6fa804be91884b381a7f8f18450c5045975435350f771"},
"plug": {:hex, :plug, "1.12.1", "645678c800601d8d9f27ad1aebba1fdb9ce5b2623ddb961a074da0b96c35187d", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:plug_crypto, "~> 1.1.1 or ~> 1.2", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "d57e799a777bc20494b784966dc5fbda91eb4a09f571f76545b72a634ce0d30b"},
"plug_cowboy": {:hex, :plug_cowboy, "2.5.1", "7cc96ff645158a94cf3ec9744464414f02287f832d6847079adfe0b58761cbd0", [:mix], [{:cowboy, "~> 2.7", [hex: :cowboy, repo: "hexpm", optional: false]}, {:cowboy_telemetry, "~> 0.3", [hex: :cowboy_telemetry, repo: "hexpm", optional: false]}, {:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "107d0a5865fa92bcb48e631cc0729ae9ccfa0a9f9a1bd8f01acb513abf1c2d64"},
Expand Down
28 changes: 15 additions & 13 deletions test/broadway_dashboard_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,19 +34,19 @@ defmodule BroadwayDashboardTest do
end

test "redirects to the first pipeline if no pipeline is provided" do
assert {:error, {:live_redirect, %{to: "/dashboard/broadway?nav=Elixir.Demo.Pipeline"}}} =
assert {:error, {:live_redirect, %{to: "/dashboard/broadway?nav=Demo.Pipeline"}}} =
live(build_conn(), "/dashboard/broadway")
end

test "redirects to the first pipeline if pipeline provided does not exist" do
assert {:error, {:live_redirect, %{to: "/dashboard/broadway?nav=Elixir.Demo.Pipeline"}}} =
live(build_conn(), "/dashboard/broadway?nav=Elixir.IDontExist")
assert {:error, {:live_redirect, %{to: "/dashboard/broadway?nav=Demo.Pipeline"}}} =
live(build_conn(), "/dashboard/broadway?nav=IDontExist")
end

test "redirects to the first pipeline if no pipeline is provided keeping node" do
base_path = URI.encode("/dashboard/#{node()}/broadway", &(&1 != ?@))

path_with_node_and_pipeline = "#{base_path}?nav=Elixir.Demo.Pipeline"
path_with_node_and_pipeline = "#{base_path}?nav=Demo.Pipeline"

assert {:error, {:live_redirect, %{to: ^path_with_node_and_pipeline}}} =
live(build_conn(), base_path)
Expand All @@ -68,22 +68,23 @@ defmodule BroadwayDashboardTest do
{:live_redirect, %{to: "/dashboard/broadway_auto_discovery?nav=" <> nav_name}}} =
live(build_conn(), "/dashboard/broadway_auto_discovery")

assert nav_name == to_string(name)
assert nav_name == inspect(name)
end

test "shows the pipeline after auto discover" do
name = new_unique_name()
{:ok, _broadway} = start_supervised({Demo.Pipeline, [broadway_name: name]})

{:ok, live, _} = live(build_conn(), "/dashboard/broadway_auto_discovery?nav=#{name}")
{:ok, live, _} =
live(build_conn(), "/dashboard/broadway_auto_discovery?nav=#{inspect(name)}")

rendered = render(live)
assert rendered =~ "Updates automatically"
assert rendered =~ "Throughput"
assert rendered =~ "All time"
end

test "renders error if auto discover is enabled but pipeline is registered using via" do
test "auto discover is enabled when pipeline is registered using via" do
{:ok, registry} = Registry.start_link(keys: :unique, name: MyRegistry)
name = via_tuple(:broadway)

Expand All @@ -99,10 +100,11 @@ defmodule BroadwayDashboardTest do
batchers: [default: []]
)

{:ok, live, _} = live(build_conn(), "/dashboard/broadway_auto_discovery")
nav_name = inspect(name) |> URI.encode_www_form()

rendered = render(live)
assert rendered =~ "There is no pipeline running"
assert {:error,
{:live_redirect, %{to: "/dashboard/broadway_auto_discovery?nav=" <> ^nav_name}}} =
live(build_conn(), "/dashboard/broadway_auto_discovery")

Process.exit(registry, :normal)
end
Expand All @@ -113,7 +115,7 @@ defmodule BroadwayDashboardTest do
test "shows the pipeline" do
start_supervised!(Demo.Pipeline)

{:ok, live, _} = live(build_conn(), "/dashboard/broadway?nav=Elixir.Demo.Pipeline")
{:ok, live, _} = live(build_conn(), "/dashboard/broadway?nav=Demo.Pipeline")

rendered = render(live)
assert rendered =~ "Updates automatically"
Expand Down Expand Up @@ -151,7 +153,7 @@ defmodule BroadwayDashboardTest do
end

test "renders an error message when pipeline does not exist" do
{:ok, live, _} = live(build_conn(), "/dashboard/broadway?nav=Elixir.MyDummy")
{:ok, live, _} = live(build_conn(), "/dashboard/broadway?nav=MyDummy")

rendered = render(live)

Expand All @@ -170,7 +172,7 @@ defmodule BroadwayDashboardTest do
Node.connect(remote_node)

base_path = URI.encode("/dashboard/#{remote_node}/broadway", &(&1 != ?@))
{:ok, live, _} = live(build_conn(), "#{base_path}?nav=Elixir.MyDummyOutdated")
{:ok, live, _} = live(build_conn(), "#{base_path}?nav=MyDummyOutdated")

rendered = render(live)

Expand Down

0 comments on commit 7996eea

Please sign in to comment.