You can add fluvio
to your list of dependencies in mix.exs
:
def deps do
[
{:fluvio, "~> 0.2"}
]
end
The following Fluvio Rust interfaces are supported on Elixir:
Rust | Elixir |
---|---|
fluvio::PartitionConsumer |
Fluvio.Consumer |
fluvio::TopicProducer |
Fluvio.Producer |
fluvio::FluvioAdmin |
Fluvio.Admin |
Each Elixir module tends to provide equivalent functionalities that fluvio
crate exposes, although Elixir-oriented to be well integrated with Elixir ecosystem and OTP.
fluvio::FluvioAdmin
is minimally supported since provisioning cluster resources is typically done upfront and not at the application level, but for experimentation it's useful to haveFluvio.Admin
.
This snippet illustrates a Fluvio.Consumer
connected to a "lobby"
topic and using an optional SmartModule filter. Initially, it lazily unfolds the infinite stream, taking 4 records and chunking every 2. Finally, it keeps consuming it.
alias Fluvio.Consumer
{:ok, pid} =
Consumer.start_link(%{
topic: "lobby",
offset: [from_beginning: 0],
smartmodule_path: "./examples/wasm/map_reverse.wasm"
})
Consumer.stream_unfold(pid)
|> Stream.take(4)
|> Stream.chunk_every(2)
|> Enum.to_list()
|> IO.inspect()
Consumer.stream_each(pid, fn result ->
case result do
{:ok, record} -> IO.inspect(record)
{:error, msg} -> IO.inspect("Error: #{msg}")
end
end)
MIX_ENV=prod mix run examples/consumer_smartmodule.exs
This snippet illustrates a Fluvio.Producer
connected to a "lobby"
topic. Initially, a string value "hello"
is sent and flushed synchronously. Also, 20 values are sent asynchronously and flushed in chunks of 10.
alias Fluvio.Producer
{:ok, pid} = Producer.start_link(%{topic: "lobby"})
{:ok, _} = Producer.send(pid, "hello")
{:ok, _} = Producer.flush(pid)
[] =
1..20
|> Stream.chunk_every(10)
|> Stream.flat_map(fn chunk ->
[
chunk
|> Enum.map(fn value ->
Task.async(fn -> {Producer.send(pid, to_string(value)), value} end)
end)
|> Task.await_many()
|> Enum.filter(&match?({{:error, _msg}, _value}, &1)),
[{Producer.flush(pid), :flush}]
|> Enum.filter(&match?({{:error, _msg}, _value}, &1))
]
end)
|> Stream.concat()
|> Enum.to_list()
MIX_ENV=prod mix run examples/producer.exs
This example demonstrates a ping pong application that uses two pairs of a Fluvio.Producer
and Fluvio.Consumer
, which have been linked to an Elixir Supervisor
(Task.Supervisor
) to provide process fault-tolerance inside your app. You could also strategically restart and init with a different Fluvio.Consumer
offset.
alias Fluvio.Producer
alias Fluvio.Consumer
alias Fluvio.Record
defmodule App do
def start(topic_1 \\ "ping", topic_2 \\ "pong", initial_value \\ "1") do
children = [{Task.Supervisor, name: TaskSup}]
{:ok, sup_pid} = Supervisor.start_link(children, strategy: :one_for_one)
{:ok, pid_one} =
Task.Supervisor.start_child(
TaskSup,
fn ->
{:ok, p1_pid} = Producer.start_link(%{topic: topic_1})
{:ok, c2_pid} = Consumer.start_link(%{topic: topic_2, offset: [from_end: 0]})
IO.inspect("Bootstrapping '#{topic_1}' by sending '#{initial_value}'")
{:ok, _} = Producer.send(p1_pid, initial_value)
{:ok, _} = Producer.flush(p1_pid)
App.PingPong.keep_consuming(c2_pid, p1_pid)
end,
restart: :permanent
)
{:ok, pid_two} =
Task.Supervisor.start_child(
TaskSup,
fn ->
{:ok, p2_pid} = Producer.start_link(%{topic: topic_2})
{:ok, c1_pid} = Consumer.start_link(%{topic: topic_1, offset: [from_end: 0]})
App.PingPong.keep_consuming(c1_pid, p2_pid)
end,
restart: :permanent
)
{sup_pid, pid_one, pid_two}
end
defmodule PingPong do
defp produce(p_pid, value) do
IO.inspect("Producing value: '#{value}'")
{:ok, _} = Producer.send(p_pid, value)
{:ok, _} = Producer.flush(p_pid)
end
defp do_consume({:ok, %Record{value: "4"}}, _c_pid, _p_pid) do
raise("simulating an unrecoverable error")
end
defp do_consume({:ok, record}, _c_pid, p_pid) do
IO.inspect("Consumed value: '#{record.value}'")
produce(p_pid, to_string(String.to_integer(record.value) + 1))
end
defp do_consume({:error, _msg}, c_pid, _p_pid), do: Process.exit(c_pid, :kill)
defp do_consume({:stop_next, _}, _c_pid, _p_pid), do: nil
def keep_consuming(c_pid, p_pid, min_freq_ms \\ 1000) do
Consumer.stream_each(c_pid, fn result ->
Process.sleep(min_freq_ms)
do_consume(result, c_pid, p_pid)
end)
end
end
end
If you were to run this example, you'd see that once the pairs of producer and consumer start, the initial value "1"
is sent, and each pair will keep incrementing the value and sending it. Once the value "4"
is reached, the ping consumer simulates an unrecoverable error, which will crash this process and the supervisor will restart it:
MIX_ENV=prod iex -S mix
iex(1)> c "examples/ping_pong.exs"
[App, App.PingPong]
iex(2)> App.start()
{#PID<0.220.0>, #PID<0.222.0>, #PID<0.223.0>}
"Bootstrapping 'ping' by sending '1'"
"Consumed value: '1'"
"Producing value: '2'"
"Consumed value: '2'"
"Producing value: '3'"
"Consumed value: '3'"
"Producing value: '4'"
iex(3)>
01:31:46.911 [error] Task #PID<0.222.0> started from #PID<0.207.0> terminating
** (RuntimeError) simulating an unrecoverable error
examples/ping_pong.exs:46: App.PingPong.do_consume/3
(fluvio 0.2.0) lib/fluvio/consumer.ex:172: Fluvio.Consumer.stream_each/3
(elixir 1.14.0) lib/task/supervised.ex:89: Task.Supervised.invoke_mfa/2
(stdlib 4.1) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Function: #Function<0.131083353/0 in App.start/3>
Args: []
"Bootstrapping 'ping' by sending '1'"
"Consumed value: '1'"
"Producing value: '2'"
"Consumed value: '2'"