Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 38 additions & 82 deletions lib/logflare/backends.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ defmodule Logflare.Backends do
@moduledoc false

import Ecto.Query
import Logflare.Utils.Guards

alias Ecto.Changeset
alias Logflare.Backends.Adaptor
Expand Down Expand Up @@ -185,12 +184,9 @@ defmodule Logflare.Backends do
default_ingest_modified? =
Map.has_key?(attrs, "default_ingest?") or Map.has_key?(attrs, :default_ingest?)

source_id = Map.get(attrs, "source_id") || Map.get(attrs, :source_id)

changeset =
backend
|> Backend.changeset(attrs)
|> validate_default_ingest_source(source_id)
|> then(fn changeset ->
if alerts_modified do
Changeset.put_assoc(changeset, :alert_queries, Map.get(attrs, :alert_queries))
Expand All @@ -207,7 +203,7 @@ defmodule Logflare.Backends do
if default_ingest_modified? do
was_enabled? = backend.default_ingest?
is_enabled? = updated.default_ingest?
handle_default_ingest_associations(updated, source_id, was_enabled?, is_enabled?)
handle_default_ingest_flag_change(updated, was_enabled?, is_enabled?)
sync_backend_across_cluster(updated.id)

# force refresh after association changes
Expand All @@ -225,61 +221,20 @@ defmodule Logflare.Backends do
end
end

@spec validate_default_ingest_source(Changeset.t(), String.t() | integer() | nil) ::
Changeset.t()
defp validate_default_ingest_source(%{changes: %{default_ingest?: true}} = changeset, source_id)
when is_non_empty_binary(source_id) or is_integer(source_id) do
case Sources.get(source_id) do
%Source{default_ingest_backend_enabled?: true} ->
changeset

%Source{default_ingest_backend_enabled?: false} ->
Changeset.add_error(
changeset,
:default_ingest?,
"Source must have default ingest backend support enabled"
)

nil ->
Changeset.add_error(
changeset,
:default_ingest?,
"Source not found"
)
end
end

defp validate_default_ingest_source(
%Changeset{changes: %{default_ingest?: true}} = changeset,
_source_id
) do
Changeset.add_error(
changeset,
:default_ingest?,
"Please select a source when enabling default ingest"
)
end

defp validate_default_ingest_source(changeset, _source_id), do: changeset

@spec handle_default_ingest_associations(
@spec handle_default_ingest_flag_change(
Backend.t(),
source_id :: String.t() | integer() | nil,
was_enabled :: boolean(),
is_enabled :: boolean()
) :: :ok
defp handle_default_ingest_associations(backend, source_id, _was_enabled, true)
when is_non_empty_binary(source_id) or is_integer(source_id) do
source = Sources.get(source_id) |> Sources.preload_backends()

if not Enum.any?(source.backends, &(&1.id == backend.id)) do
update_source_backends(source, source.backends ++ [backend])
end

defp handle_default_ingest_flag_change(_backend, false, true) do
# When enabling default_ingest?, backend will now receive logs from all user sources
# No need to manage explicit source associations
:ok
end

defp handle_default_ingest_associations(backend, _source_id, true, false) do
defp handle_default_ingest_flag_change(backend, true, false) do
# When disabling default_ingest?, remove all source associations
# User can manually add specific sources later if needed
backend_with_sources =
backend
|> Repo.reload!()
Expand All @@ -294,8 +249,7 @@ defmodule Logflare.Backends do
:ok
end

defp handle_default_ingest_associations(_backend, _source_id, _was_enabled, _is_enabled),
do: :ok
defp handle_default_ingest_flag_change(_backend, _was_enabled, _is_enabled), do: :ok

@doc """
Updates the backends of a source wholly. Does not work on partial data, all backends must be provided.
Expand Down Expand Up @@ -539,7 +493,7 @@ defmodule Logflare.Backends do
end

defp dispatch_to_backends(source, nil, log_events) do
backends = __MODULE__.Cache.list_backends(source_id: source.id)
backends = __MODULE__.Cache.list_dispatch_backends(source)

for backend <- [nil | backends] do
{backend_id, backend_type} =
Expand Down Expand Up @@ -714,43 +668,30 @@ defmodule Logflare.Backends do

For sources with `default_ingest_backend_enabled? = true`:
- Checks the system default backend queue
- Checks user-designated default backends
- Returns true if ANY of these are full
- Falls back to checking all queues if no user default backends are configured
- Checks all dispatch backends (including default_ingest backends)
- Returns true if ANY of these queues are full

For normal sources:
- Checks ALL backend queues
- Returns true only if ALL queues are full
"""
@spec cached_local_pending_buffer_full?(Source.t()) :: boolean()
def cached_local_pending_buffer_full?(%Source{
id: source_id,
default_ingest_backend_enabled?: true
}) do
default_backend_ids =
__MODULE__.Cache.list_backends(source_id: source_id)
|> Enum.filter(& &1.default_ingest?)
|> MapSet.new(& &1.id)
def cached_local_pending_buffer_full?(
%Source{
id: source_id
} = source
) do
# Get all dispatch backends using direct query
dispatch_backends = __MODULE__.Cache.list_dispatch_backends(source)
backend_ids = Enum.map(dispatch_backends, & &1.id)

# Check system default backend (nil backend_id)
system_default_full? = buffer_full_for_backend?(source_id, nil)

# Check user-configured default backends
user_defaults_full? = Enum.any?(default_backend_ids, &buffer_full_for_backend?(source_id, &1))

system_default_full? || user_defaults_full?
end
# Check if ANY dispatch backend queue is full
any_backend_full? = Enum.any?(backend_ids, &buffer_full_for_backend?(source_id, &1))

def cached_local_pending_buffer_full?(%Source{id: source_id}) do
PubSubRates.Cache.get_local_buffer(source_id, nil)
|> Map.get(:queues, [])
|> case do
[] ->
false

queues ->
Enum.all?(queues, fn {_key, v} -> v > @max_pending_buffer_len_per_queue end)
end
system_default_full? || any_backend_full?
end

@spec buffer_full_for_backend?(
Expand Down Expand Up @@ -806,6 +747,21 @@ defmodule Logflare.Backends do
PubSubRates.Cache.get_cluster_buffers(source.id, Map.get(backend || %{}, :id))
end

@doc """
Lists all backends that should receive logs for a given source during dispatch.
This includes backends explicitly associated with the source and default ingest backends.
Uses a direct Ecto query to fetch from the backends table.
"""
@spec list_dispatch_backends(Source.t()) :: [Backend.t()]
def list_dispatch_backends(%Source{id: source_id, user_id: user_id}) do
from(b in Backend,
left_join: sb in assoc(b, :sources),
where: (b.default_ingest? == true or sb.id == ^source_id) and b.user_id == ^user_id
)
|> Repo.all()
|> Enum.map(&typecast_config_string_map_to_atom_map/1)
end

@doc """
Lists the latest recent logs of all caches across the cluster.
Performs a check to ensure that the cache is started. If not started yet globally, it will start the cache locally.
Expand Down
1 change: 1 addition & 0 deletions lib/logflare/backends/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ defmodule Logflare.Backends.Cache do
end

def list_backends(arg), do: apply_repo_fun(__ENV__.function, [arg])
def list_dispatch_backends(arg), do: apply_repo_fun(__ENV__.function, [arg])
def get_backend(arg), do: apply_repo_fun(__ENV__.function, [arg])

defp apply_repo_fun(arg1, arg2) do
Expand Down
2 changes: 1 addition & 1 deletion lib/logflare/backends/source_sup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ defmodule Logflare.Backends.SourceSup do
end

def init(source) do
ingest_backends = Backends.Cache.list_backends(source_id: source.id)
ingest_backends = Backends.Cache.list_dispatch_backends(source)

rules_backends =
Backends.Cache.list_backends(rules_source_id: source.id)
Expand Down
3 changes: 0 additions & 3 deletions lib/logflare/sources.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,6 @@ defmodule Logflare.Sources do
{:user_id, user_id}, q when is_integer(user_id) ->
where(q, [s], s.user_id == ^user_id)

{:default_ingest_backend_enabled?, enabled}, q when is_boolean(enabled) ->
where(q, [s], s.default_ingest_backend_enabled? == ^enabled)

_, q ->
q
end)
Expand Down
9 changes: 1 addition & 8 deletions lib/logflare/sources/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ defmodule Logflare.Sources.Source do
:backends,
:retention_days,
:transform_copy_fields,
:bigquery_clustering_fields,
:default_ingest_backend_enabled?
:bigquery_clustering_fields
]}

defp env_dataset_id_append,
Expand Down Expand Up @@ -136,10 +135,6 @@ defmodule Logflare.Sources.Source do
field :transform_copy_fields, :string
field :bigquery_clustering_fields, :string

field :default_ingest_backend_enabled?, :boolean,
source: :default_ingest_backend_enabled,
default: false

# Causes a shitstorm
# field :bigquery_schema, Ecto.Term

Expand Down Expand Up @@ -192,7 +187,6 @@ defmodule Logflare.Sources.Source do
:retention_days,
:transform_copy_fields,
:disable_tailing,
:default_ingest_backend_enabled?,
:bq_storage_write_api
])
|> cast_embed(:notifications, with: &Notifications.changeset/2)
Expand Down Expand Up @@ -221,7 +215,6 @@ defmodule Logflare.Sources.Source do
:retention_days,
:transform_copy_fields,
:disable_tailing,
:default_ingest_backend_enabled?,
:bq_storage_write_api
])
|> cast_embed(:notifications, with: &Notifications.changeset/2)
Expand Down
1 change: 1 addition & 0 deletions lib/logflare_web/live/backends/actions/index.html.heex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
</p>
<div class="tw-flex tw-gap-2 tw-mt-1">
<span class="badge badge-pill badge-secondary tw-leading-normal"><%= backend.type %></span>
<span :if={backend.default_ingest?} class="badge badge-pill badge-primary tw-leading-normal">default ingest</span>
<span>drain rules: <%= Enum.count(backend.rules) %></span>
<span :for={{k, v} when is_binary(v) <- backend.metadata || %{}} :if={backend.metadata}><%= k %>: <%= v %></span>
</div>
Expand Down
80 changes: 45 additions & 35 deletions lib/logflare_web/live/backends/actions/show.html.heex
Original file line number Diff line number Diff line change
Expand Up @@ -29,47 +29,57 @@
</section>

<section :if={@flag_multibackend == true and Logflare.Backends.Adaptor.supports_default_ingest?(@backend)} class="mx-auto container pt-3 tw-flex tw-flex-col tw-gap-4">
<h4>Default Ingest Sources</h4>
<p>
Sources configured to use this backend as a default ingest destination.
</p>
<h4>Ingest Configuration</h4>

<div :if={not Enum.empty?(@available_sources || [])}>
<.button phx-click="toggle_default_ingest_form" variant="secondary">
<%= if @show_default_ingest_form? do %>
Cancel
<% else %>
Add a Source
<% end %>
</.button>
</div>
<%= if @backend.default_ingest? do %>
<div class="alert alert-info">
<i class="fas fa-info-circle"></i>
<strong>Ingest from all sources:</strong> This backend is configured to receive logs from ALL your sources automatically.
To limit this backend to specific sources, disable the "Ingest from all sources" option in the backend settings.
</div>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if you folks decide the UI but I wonder if you should move the toggle here, so someone who wants to manage sources does not need to go to another page, then come back here, to control them.

<% else %>
<p>
This backend only receives logs from sources explicitly configured below.
To make this backend receive logs from ALL your sources, enable the "Ingest from all sources" option in the backend settings.
</p>

<.form :let={f} :if={@show_default_ingest_form?} id="default_ingest" for={%{}} as={:default_ingest} action="#" phx-submit="save_default_ingest">
<div class="form-group">
<%= label(f, :source_id, "Source") %>
<%= select(f, :source_id, Enum.map(@available_sources || [], fn s -> {s.name, s.id} end),
class: "form-control",
prompt: [key: "Choose a source"]
) %>
<small id="source_id" class="form-text text-muted">This source will use this backend as its default ingest destination. Only sources with default ingest backend support enabled are shown.</small>
<div :if={length(@available_sources || []) > 0}>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
<div :if={length(@available_sources || []) > 0}>
<div :if={@available_sources != []}>

<.button phx-click="toggle_default_ingest_form" variant="secondary">
<%= if @show_default_ingest_form? do %>
Cancel
<% else %>
Add a Source
<% end %>
</.button>
</div>

<%= hidden_input(f, :backend_id, value: @backend.id) %>
<%= submit("Save", class: "btn btn-primary") %>
</.form>
<.form :let={f} :if={@show_default_ingest_form?} id="default_ingest" for={%{}} as={:default_ingest} action="#" phx-submit="save_default_ingest">
<div class="form-group">
<%= label(f, :source_id, "Source") %>
<%= select(f, :source_id, Enum.map(@available_sources || [], fn s -> {s.name, s.id} end),
class: "form-control",
prompt: [key: "Choose a source"]
) %>
<small id="source_id" class="form-text text-muted">Select a source to send its logs to this backend.</small>
</div>

<p :if={Enum.empty?(@default_ingest_sources || [])}>No sources configured for default ingest</p>
<ul :for={source <- @default_ingest_sources || []} class="tw-flex tw-flex-row tw-gap-4">
<li class="list-group-item tw-pointer-none tw-w-full tw-flex tw-flex-row tw-justify-between">
<span>
Source <code><%= source.name %></code> uses this backend as default ingest
</span>
<%= hidden_input(f, :backend_id, value: @backend.id) %>
<%= submit("Save", class: "btn btn-primary") %>
</.form>

<.button phx-click="remove_default_ingest" phx-value-source_id={source.id} variant="danger">
Remove
</.button>
</li>
</ul>
<p :if={Enum.empty?(@backend.sources || [])}>No sources configured yet</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this is a pattern from before this pull request, but it would be great to remove the conditionals such as || [], by making sure they are always initialized to a list.

<ul :for={source <- @backend.sources || []} class="tw-flex tw-flex-row tw-gap-4">
<li class="list-group-item tw-pointer-none tw-w-full tw-flex tw-flex-row tw-justify-between">
<span>
Source <code><%= source.name %></code> sends logs to this backend
</span>

<.button phx-click="remove_source" phx-value-source_id={source.id} variant="danger">
Remove
</.button>
</li>
</ul>
<% end %>
</section>

<section class="mx-auto container pt-3 tw-flex tw-flex-col tw-gap-4">
Expand Down
Loading
Loading