Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrency support #68

Merged
merged 9 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This is the README for `main` branch, which is for pre `0.2.0` release.
</p>
<p>
<a href="https://github.com/darwin67/ex-inngest/actions/workflows/ci.yml"><img src="https://github.com/darwin67/ex-inngest/actions/workflows/ci.yml/badge.svg"></a>
<a href="https://codecov.io/gh/inngest/ex_inngest" ><img src="https://codecov.io/gh/inngest/ex_inngest/graph/badge.svg?token=t7231eD24T"/></a>
<a href="https://hex.pm/packages/inngest"><img src="https://img.shields.io/hexpm/v/inngest.svg" /></a>
<a href="https://hexdocs.pm/inngest/"><img src="https://img.shields.io/badge/hex-docs-lightgreen.svg" /></a>
<br/>
Expand Down
4 changes: 2 additions & 2 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ coverage:
status:
patch:
default:
target: 50%
target: 60%
project:
default:
target: 50%
target: 60%
4 changes: 4 additions & 0 deletions lib/inngest/error.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,7 @@ end
defmodule Inngest.RateLimitConfigError do
defexception [:message]
end

defmodule Inngest.ConcurrencyConfigError do
defexception [:message]
end
6 changes: 6 additions & 0 deletions lib/inngest/function.ex
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ defmodule Inngest.Function do
|> maybe_debounce()
|> maybe_batch_events()
|> maybe_rate_limit()
|> maybe_concurrency()
] ++ handler
end

Expand All @@ -188,6 +189,11 @@ defmodule Inngest.Function do
|> Inngest.FnOpts.validate_rate_limit(config)
end

defp maybe_concurrency(config) do
fn_opts()
|> Inngest.FnOpts.validate_concurrency(config)
end

defp fn_opts() do
case __MODULE__.__info__(:attributes) |> Keyword.get(:func) |> List.first() do
nil -> %Inngest.FnOpts{}
Expand Down
56 changes: 55 additions & 1 deletion lib/inngest/function/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ defmodule Inngest.FnOpts do
:debounce,
:batch_events,
:rate_limit,
:concurrency,
retries: 3
]

Expand All @@ -20,7 +21,8 @@ defmodule Inngest.FnOpts do
retries: number() | nil,
debounce: debounce() | nil,
batch_events: batch_events() | nil,
rate_limit: rate_limit() | nil
rate_limit: rate_limit() | nil,
concurrency: concurrency() | nil
}

@type debounce() :: %{
Expand All @@ -39,6 +41,18 @@ defmodule Inngest.FnOpts do
key: binary() | nil
}

@type concurrency() ::
number()
| concurrency_option()
| list(concurrency_option())

@type concurrency_option() :: %{
limit: number(),
key: binary() | nil,
scope: binary() | nil
}
@concurrency_scopes ["fn", "env", "account"]

@doc """
Validate the debounce configuration
"""
Expand Down Expand Up @@ -139,4 +153,44 @@ defmodule Inngest.FnOpts do
Map.put(config, :rateLimit, rate_limit)
end
end

@doc """
Validate the concurrency config
"""
@spec validate_concurrency(t(), map()) :: map()
def validate_concurrency(fnopts, config) do
case fnopts |> Map.get(:concurrency) do
nil ->
config

%{} = setting ->
validate_concurrency(setting)
Map.put(config, :concurrency, setting)

[_ | _] = settings ->
Enum.each(settings, &validate_concurrency/1)
Map.put(config, :concurrency, settings)

setting ->
if is_number(setting) do
Map.put(config, :concurrency, setting)
else
raise Inngest.ConcurrencyConfigError, message: "invalid concurrency setting"
end
end
end

defp validate_concurrency(%{} = setting) do
limit = Map.get(setting, :limit)
scope = Map.get(setting, :scope)

if is_nil(limit) do
raise Inngest.ConcurrencyConfigError, message: "'limit' must be set for concurrency"
end

if !is_nil(scope) && !Enum.member?(@concurrency_scopes, scope) do
raise Inngest.ConcurrencyConfigError,
message: "invalid scope '#{scope}', needs to be \"fn\"|\"env\"|\"account\""
end
end
end
11 changes: 11 additions & 0 deletions test/inngest/function/cases/concurrency_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Inngest.Function.Cases.ConcurrencyTest do
use ExUnit.Case, async: true

# alias Inngest.Test.DevServer
# import Inngest.Test.Helper

# NOTE:
# no good way to test this in this SDK
# don't have access to atomic global variables to verify
# the concurrency
end
67 changes: 67 additions & 0 deletions test/inngest/function/config_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,71 @@ defmodule Inngest.FnOptsTest do
end
end
end

describe "validate_concurrency/2" do
@fn_opts %FnOpts{
id: "foobar",
name: "Foobar",
concurrency: %{
limit: 2
}
}

test "should succeed with just number" do
opts = %{@fn_opts | concurrency: 10}

assert %{
concurrency: 10
} = FnOpts.validate_concurrency(opts, @config)
end

test "should succeed with valid settings" do
assert %{
concurrency: %{
limit: 2
}
} = FnOpts.validate_concurrency(@fn_opts, @config)
end

test "should succeed with multiple settings" do
opts = %{@fn_opts | concurrency: [%{limit: 2, scope: "fn"}, %{limit: 10, scope: "account"}]}

assert %{
concurrency: [
%{limit: 2, scope: "fn"},
%{limit: 10, scope: "account"}
]
} = FnOpts.validate_concurrency(opts, @config)
end

test "should raise when limit is missing" do
opts = drop_at(@fn_opts, [:concurrency, :limit])

assert_raise Inngest.ConcurrencyConfigError,
"'limit' must be set for concurrency",
fn ->
FnOpts.validate_concurrency(opts, @config)
end
end

test "should raise if scope is invalid" do
opts = %{@fn_opts | concurrency: %{limit: 2, scope: "hello"}}

assert_raise Inngest.ConcurrencyConfigError,
"invalid scope 'hello', needs to be \"fn\"|\"env\"|\"account\"",
fn ->
FnOpts.validate_concurrency(opts, @config)
end
end

test "should raise if provided invalid setting" do
opts = %{@fn_opts | concurrency: "foobar"}

assert_raise Inngest.ConcurrencyConfigError,
"invalid concurrency setting",
fn ->
FnOpts.validate_concurrency(opts, @config)
end
end
end
end
27 changes: 27 additions & 0 deletions test/support/cases/concurrency_fn.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
defmodule Inngest.Test.Case.ConcurrencyFn do
@moduledoc false

use Inngest.Function
alias Inngest.{FnOpts, Trigger}

@func %FnOpts{
id: "concurrency-fn",
name: "Concurrency Function",
concurrency: %{
limit: 2
}
}
@trigger %Trigger{event: "test/plug.throttle"}

@impl true
@spec exec(any, %{:step => atom, optional(any) => any}) :: {:ok, <<_::72>>}
def exec(ctx, %{step: step} = _args) do
_ =
step.run(ctx, "wait", fn ->
Process.sleep(3_000)
"waited"
end)

{:ok, "Throttled"}
end
end