Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sync-service): Clean up publication filters #2154

Merged
merged 24 commits into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add publication manager tests
  • Loading branch information
msfstef committed Dec 16, 2024
commit 3aa004576da04892e4c9ec5716d88807cceb2e52
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ defmodule Electric.Replication.PublicationManager do
:waiters,
:publication_name,
:db_pool,
:pg_version
:pg_version,
:configure_tables_for_replication_fn
]

@typep state() :: %__MODULE__{
Expand All @@ -34,7 +35,8 @@ defmodule Electric.Replication.PublicationManager do
waiters: list(GenServer.from()),
publication_name: String.t(),
db_pool: term(),
pg_version: non_neg_integer()
pg_version: non_neg_integer(),
configure_tables_for_replication_fn: fun()
}
@typep filter_operation :: :add | :remove

Expand Down Expand Up @@ -63,8 +65,13 @@ defmodule Electric.Replication.PublicationManager do
stack_id: [type: :string, required: true],
publication_name: [type: :string, required: true],
db_pool: [type: {:or, [:atom, :pid, @name_schema_tuple]}],
pg_version: [type: :integer, required: false],
pg_version: [type: {:or, [:integer, :atom]}, required: false, default: nil],
update_debounce_timeout: [type: :timeout, default: @default_debounce_timeout],
configure_tables_for_replication_fn: [
type: {:fun, 4},
required: false,
default: &Configuration.configure_tables_for_replication!/4
],
server: [type: :any, required: false]
)

Expand Down Expand Up @@ -146,7 +153,9 @@ defmodule Electric.Replication.PublicationManager do
Access.get(opts, :update_debounce_timeout, @default_debounce_timeout),
publication_name: Access.fetch!(opts, :publication_name),
db_pool: Access.fetch!(opts, :db_pool),
pg_version: Access.get(opts, :pg_version, nil)
pg_version: Access.fetch!(opts, :pg_version),
configure_tables_for_replication_fn:
Access.fetch!(opts, :configure_tables_for_replication_fn)
}

{:ok, state, {:continue, :get_pg_version}}
Expand Down Expand Up @@ -232,10 +241,11 @@ defmodule Electric.Replication.PublicationManager do
prepared_relation_filters: relation_filters,
publication_name: publication_name,
db_pool: db_pool,
pg_version: pg_version
pg_version: pg_version,
configure_tables_for_replication_fn: configure_tables_for_replication_fn
} = _state
) do
Configuration.configure_tables_for_replication!(
configure_tables_for_replication_fn.(
db_pool,
Map.values(relation_filters),
pg_version,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
defmodule Electric.Replication.PublicationManagerTest do
alias Electric.Replication.Eval.Expr
alias Electric.Replication.PublicationManager.RelationFilter
alias Electric.Shapes.Shape
alias Electric.Replication.PublicationManager

use ExUnit.Case, async: true

import Support.ComponentSetup

defp generate_shape(relation, where_clause \\ nil, selected_columns \\ nil) do
%Shape{
root_table: relation,
root_table_id: 1,
table_info: %{
relation => %{
columns:
([
%{name: "id", type: :text, type_id: {25, 1}},
%{name: "value", type: :text, type_id: {25, 1}}
] ++ (selected_columns || []))
|> Enum.map(fn col -> %{name: col, type: :text, type_id: {25, 1}} end),
pk: ["id"]
}
},
where: where_clause,
selected_columns: selected_columns
}
end

setup :with_stack_id_from_test

setup ctx do
test_pid = self()
configure_tables_fn = fn _, filters, _, _ -> send(test_pid, {:filters, filters}) end

%{publication_manager: {_, publication_manager_opts}} =
with_publication_manager(%{
module: ctx.module,
test: ctx.test,
stack_id: ctx.stack_id,
update_debounce_timeout: Access.get(ctx, :update_debounce_timeout, 0),
publication_name: "pub_#{ctx.stack_id}",
pool: :no_pool,
pg_version: 150_001,
configure_tables_for_replication_fn: configure_tables_fn
})

%{opts: publication_manager_opts}
end

describe "add_shape/2" do
test "should add filters for single shape", %{opts: opts} do
shape = generate_shape({"public", "items"}, %{query: "id = 1"})
assert :ok == PublicationManager.add_shape(shape, opts)

assert_receive {:filters,
[
%RelationFilter{
relation: {"public", "items"},
where_clauses: [%{query: "id = 1"}]
}
]}
end

test "should accept multiple shapes for different relations", %{opts: opts} do
shape1 = generate_shape({"public", "items"}, %{query: "id = 1"})
shape2 = generate_shape({"public", "other"})
assert :ok == PublicationManager.add_shape(shape1, opts)
assert :ok == PublicationManager.add_shape(shape2, opts)

assert_receive {:filters,
[
%RelationFilter{
relation: {"public", "items"},
where_clauses: [%{query: "id = 1"}]
},
%RelationFilter{relation: {"public", "other"}}
]}
end

test "should merge where clauses for same relation", %{opts: opts} do
shape1 = generate_shape({"public", "items"}, %{query: "id = 1"})
shape2 = generate_shape({"public", "items"}, %{query: "id = 2"})
shape3 = generate_shape({"public", "items"}, %{query: "id = 1"})
assert :ok == PublicationManager.add_shape(shape1, opts)
assert :ok == PublicationManager.add_shape(shape2, opts)
assert :ok == PublicationManager.add_shape(shape3, opts)

assert_receive {:filters,
[
%RelationFilter{
relation: {"public", "items"},
where_clauses: [%{query: "id = 2"}, %{query: "id = 1"}]
}
]}
end

test "should remove where clauses when one covers everything", %{opts: opts} do
shape1 = generate_shape({"public", "items"}, %{query: "id = 1"})
shape2 = generate_shape({"public", "items"}, nil)
assert :ok == PublicationManager.add_shape(shape1, opts)
assert :ok == PublicationManager.add_shape(shape2, opts)

assert_receive {:filters,
[
%RelationFilter{
relation: {"public", "items"},
where_clauses: nil
}
]}
end

test "should merge selected columns for same relation", %{opts: opts} do
shape1 = generate_shape({"public", "items"}, nil, ["id", "value"])
shape2 = generate_shape({"public", "items"}, nil, ["id", "potato"])
assert :ok == PublicationManager.add_shape(shape1, opts)
assert :ok == PublicationManager.add_shape(shape2, opts)

assert_receive {:filters,
[
%RelationFilter{
relation: {"public", "items"},
selected_columns: ["value", "potato", "id"]
}
]}
end

test "should remove selected columns when all selected by shape", %{opts: opts} do
shape1 = generate_shape({"public", "items"}, nil, ["id", "value"])
shape2 = generate_shape({"public", "items"}, nil, nil)
assert :ok == PublicationManager.add_shape(shape1, opts)
assert :ok == PublicationManager.add_shape(shape2, opts)

assert_receive {:filters,
[
%RelationFilter{
relation: {"public", "items"},
selected_columns: nil
}
]}
end

test "should include selected columns referenced in where clauses", %{opts: opts} do
shape =
generate_shape(
{"public", "items"},
%Expr{
query: "id = 1",
used_refs: %{["id"] => :int8, ["created_at"] => :timestamp}
},
["id", "value"]
)

assert :ok == PublicationManager.add_shape(shape, opts)

assert_receive {:filters,
[
%RelationFilter{
relation: {"public", "items"},
where_clauses: [%{query: "id = 1"}],
selected_columns: ["value", "id", "created_at"]
}
]}
end

@tag update_debounce_timeout: 50
test "should not update publication if new shape adds nothing", %{opts: opts} do
shape1 = generate_shape({"public", "items"}, %{query: "id = 1"})
shape2 = generate_shape({"public", "items"}, %{query: "id = 2"})
shape3 = generate_shape({"public", "items"}, %{query: "id = 1"})

task1 = Task.async(fn -> PublicationManager.add_shape(shape1, opts) end)
task2 = Task.async(fn -> PublicationManager.add_shape(shape2, opts) end)

Task.await_many([task1, task2])

assert_receive {:filters,
[
%RelationFilter{
relation: {"public", "items"},
where_clauses: [%{query: "id = 2"}, %{query: "id = 1"}]
}
]}

assert :ok == PublicationManager.add_shape(shape3, opts)

refute_receive {:filters, _}, 500
end
end

describe "remove_shape/2" do
test "should remove single shape", %{opts: opts} do
shape = generate_shape({"public", "items"}, %{query: "id = 1"})
assert :ok == PublicationManager.add_shape(shape, opts)
assert :ok == PublicationManager.remove_shape(shape, opts)

assert_receive {:filters, []}
end

@tag update_debounce_timeout: 50
test "should reference count to avoid removing needed filters", %{opts: opts} do
shape1 = generate_shape({"public", "items"}, %{query: "id = 1"})
shape2 = generate_shape({"public", "items"}, %{query: "id = 2"})
shape3 = generate_shape({"public", "items"}, %{query: "id = 1"})
task1 = Task.async(fn -> PublicationManager.add_shape(shape1, opts) end)
task2 = Task.async(fn -> PublicationManager.add_shape(shape2, opts) end)
task3 = Task.async(fn -> PublicationManager.add_shape(shape3, opts) end)

Task.await_many([task1, task2, task3])

assert_receive {:filters,
[
%RelationFilter{
relation: {"public", "items"},
where_clauses: [%{query: "id = 2"}, %{query: "id = 1"}]
}
]}

assert :ok == PublicationManager.remove_shape(shape1, opts)

refute_receive {:filters, _}, 500
end
end

describe "recover_shape/2" do
test "should add filters for single shape without updating anything", %{opts: opts} do
shape = generate_shape({"public", "items"}, %{query: "id = 1"})
assert :ok == PublicationManager.recover_shape(shape, opts)

refute_receive {:filters, _}, 500
end
end

describe "refresh_publication/2" do
test "should update publication if there are changes to add", %{opts: opts} do
shape = generate_shape({"public", "items"}, %{query: "id = 1"})
assert :ok == PublicationManager.recover_shape(shape, opts)

refute_receive {:filters, _}, 500

assert :ok == PublicationManager.refresh_publication(opts)

assert_receive {:filters,
[
%RelationFilter{
relation: {"public", "items"},
where_clauses: [%{query: "id = 1"}]
}
]}
end
end
end
21 changes: 15 additions & 6 deletions packages/sync-service/test/support/component_setup.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,21 @@ defmodule Support.ComponentSetup do
def with_publication_manager(ctx) do
server = :"publication_manager_#{full_test_name(ctx)}"

Electric.Replication.PublicationManager.start_link(
name: server,
stack_id: ctx.stack_id,
publication_name: ctx.publication_name,
db_pool: ctx.pool
)
{:ok, _} =
Electric.Replication.PublicationManager.start_link(
name: server,
stack_id: ctx.stack_id,
publication_name: ctx.publication_name,
update_debounce_timeout: Access.get(ctx, :update_debounce_timeout, 0),
db_pool: ctx.pool,
pg_version: Access.get(ctx, :pg_version, nil),
configure_tables_for_replication_fn:
Access.get(
ctx,
:configure_tables_for_replication_fn,
&Electric.Postgres.Configuration.configure_tables_for_replication!/4
)
)

%{
publication_manager:
Expand Down