-
Notifications
You must be signed in to change notification settings - Fork 305
Fix dropped commit fragments at batch size boundary #3728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
📝 WalkthroughWalkthroughModifies commit handling in the Postgres replication message converter to detect empty commit fragments after flush and choose the appropriate last_log_offset; adds a new comprehensive integration test module exercising batch and multi-statement transaction replication across many sizes and operation mixes. Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~30 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
📜 Recent review detailsConfiguration used: Organization UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (2)
🧰 Additional context used🧠 Learnings (6)📚 Learning: 2026-01-14T14:45:20.880ZApplied to files:
📚 Learning: 2026-01-14T14:45:20.880ZApplied to files:
📚 Learning: 2026-01-14T14:45:05.838ZApplied to files:
📚 Learning: 2026-01-14T14:45:20.880ZApplied to files:
📚 Learning: 2026-01-14T14:45:05.838ZApplied to files:
📚 Learning: 2026-01-14T14:45:20.880ZApplied to files:
🔇 Additional comments (6)
✏️ Tip: You can disable this entire section by setting Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@packages/sync-service/test/integration/batch_operations_test.exs`:
- Around line 758-822: The helper execute_sequential_transactions/2 assumes only
inserts (it generates IDs from an insert counter via generate_uuid) so
update/delete ops will target non-existent rows; add a guard at the start of
execute_sequential_transactions/2 that scans the transactions list and either
(1) raises a clear error when any non-:insert operation is present, or (2)
delegates to the mixed-op helper (e.g., the existing mixed transactions helper
used elsewhere, or collect_all_changes_mixed workflow) so update/delete
operations are handled correctly; reference execute_sequential_transactions/2
and generate_uuid to locate where to add the validation and the code path
switch.
- Around line 25-112: The tests duplicate the same batch_sizes computation in
each describe; extract it to a shared module attribute (e.g., `@batch_sizes`)
computed once using `@max_change_batch_size` and reuse that attribute in the three
describes, and optionally read `@max_change_batch_size` from shared config if
available; update references in the INSERT/UPDATE/DELETE blocks to use
`@batch_sizes` and keep the timeout/verify_all logic the same while leaving test
invocations (test_batch_insert, test_batch_update, test_batch_delete) unchanged.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
packages/sync-service/lib/electric/postgres/replication_client/message_converter.expackages/sync-service/test/integration/batch_operations_test.exs
🧰 Additional context used
🧬 Code graph analysis (2)
packages/sync-service/test/integration/batch_operations_test.exs (1)
packages/sync-service/test/support/stream_consumer.ex (3)
assert_up_to_date(124-135)await_count(161-166)collect_messages(242-247)
packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex (2)
packages/sync-service/lib/electric/replication/log_offset.ex (2)
new(58-60)new(62-64)packages/sync-service/lib/electric/postgres/lsn.ex (1)
to_integer(95-98)
🔇 Additional comments (8)
packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex (2)
230-235: Commit fragment now advances offset at the batch boundary.Using
msg.end_lsnfor empty post-flush commit fragments should prevent duplicate offsets and dropped commits.
279-281: TransactionFragment struct confirmschange_countdefaults to0.Verified: the struct at
packages/sync-service/lib/electric/replication/changes.exinitializeschange_count: 0. The predicate inempty_commit_fragment_after_flush?will work as intended.packages/sync-service/test/integration/batch_operations_test.exs (6)
185-287: Batch insert helpers validate DB state and stream output thoroughly.
291-392: Batch update helpers look solid and are consistent with insert flow.
395-496: Batch delete helpers are consistent and correctly validate PK-only payloads.
500-594: Multi-statement transaction helpers provide clear DB and stream validation.
598-755: Mixed-operation transaction setup and expected counts are well handled.
824-941: Sequential mixed-op helpers are comprehensive and consistent with mixed-txn flow.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| # This should match @max_change_batch_size in Electric.Postgres.ReplicationClient. | ||
| # The value 100 is set in ReplicationClient to balance performance (avoiding message | ||
| # passing overhead) and memory usage. If that value changes, update this constant. | ||
| @max_change_batch_size 100 | ||
|
|
||
| describe "batch INSERT replication" do | ||
| setup [:with_unique_db, :with_basic_tables, :with_sql_execute] | ||
| setup :with_complete_stack | ||
| setup :with_electric_client | ||
|
|
||
| batch_sizes = | ||
| [ | ||
| @max_change_batch_size - 25, | ||
| @max_change_batch_size - 1, | ||
| @max_change_batch_size, | ||
| @max_change_batch_size + 1, | ||
| @max_change_batch_size + 25, | ||
| @max_change_batch_size * 2, | ||
| @max_change_batch_size * 3, | ||
| @max_change_batch_size * 4, | ||
| @max_change_batch_size * 5 | ||
| ] |> Enum.uniq() |> Enum.sort() | ||
|
|
||
| for batch_size <- batch_sizes do | ||
| timeout = if batch_size <= 75, do: 10_000, else: 30_000 | ||
| verify_all = batch_size <= 75 | ||
|
|
||
| test "batch insert (#{batch_size})", %{client: client, db_conn: db_conn} do | ||
| test_batch_insert(client, db_conn, unquote(batch_size), unquote(timeout), unquote(verify_all)) | ||
| end | ||
| end | ||
| end | ||
|
|
||
| describe "batch UPDATE replication" do | ||
| setup [:with_unique_db, :with_basic_tables, :with_sql_execute] | ||
| setup :with_complete_stack | ||
| setup :with_electric_client | ||
|
|
||
| batch_sizes = | ||
| [ | ||
| @max_change_batch_size - 25, | ||
| @max_change_batch_size - 1, | ||
| @max_change_batch_size, | ||
| @max_change_batch_size + 1, | ||
| @max_change_batch_size + 25, | ||
| @max_change_batch_size * 2, | ||
| @max_change_batch_size * 3, | ||
| @max_change_batch_size * 4, | ||
| @max_change_batch_size * 5 | ||
| ] |> Enum.uniq() |> Enum.sort() | ||
|
|
||
| for batch_size <- batch_sizes do | ||
| timeout = if batch_size <= 75, do: 10_000, else: 30_000 | ||
| verify_all = batch_size <= 75 | ||
|
|
||
| test "batch update (#{batch_size})", %{client: client, db_conn: db_conn} do | ||
| test_batch_update(client, db_conn, unquote(batch_size), unquote(timeout), unquote(verify_all)) | ||
| end | ||
| end | ||
| end | ||
|
|
||
| describe "batch DELETE replication" do | ||
| setup [:with_unique_db, :with_basic_tables, :with_sql_execute] | ||
| setup :with_complete_stack | ||
| setup :with_electric_client | ||
|
|
||
| batch_sizes = | ||
| [ | ||
| @max_change_batch_size - 25, | ||
| @max_change_batch_size - 1, | ||
| @max_change_batch_size, | ||
| @max_change_batch_size + 1, | ||
| @max_change_batch_size + 25, | ||
| @max_change_batch_size * 2, | ||
| @max_change_batch_size * 3, | ||
| @max_change_batch_size * 4, | ||
| @max_change_batch_size * 5 | ||
| ] |> Enum.uniq() |> Enum.sort() | ||
|
|
||
| for batch_size <- batch_sizes do | ||
| timeout = if batch_size <= 75, do: 10_000, else: 30_000 | ||
| verify_all = batch_size <= 75 | ||
|
|
||
| test "batch delete (#{batch_size})", %{client: client, db_conn: db_conn} do | ||
| test_batch_delete(client, db_conn, unquote(batch_size), unquote(timeout), unquote(verify_all)) | ||
| end | ||
| end | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Extract the shared batch size list (and consider sourcing max size from config).
The same batch_sizes list is repeated across INSERT/UPDATE/DELETE describes. Extracting it into a module attribute keeps the tests in sync and reduces drift. If possible, consider sourcing @max_change_batch_size from runtime config or a shared constant to avoid manual updates.
♻️ Suggested refactor
@@
- `@max_change_batch_size` 100
+ `@max_change_batch_size` 100
+ `@batch_sizes` [
+ `@max_change_batch_size` - 25,
+ `@max_change_batch_size` - 1,
+ `@max_change_batch_size`,
+ `@max_change_batch_size` + 1,
+ `@max_change_batch_size` + 25,
+ `@max_change_batch_size` * 2,
+ `@max_change_batch_size` * 3,
+ `@max_change_batch_size` * 4,
+ `@max_change_batch_size` * 5
+ ] |> Enum.uniq() |> Enum.sort()
@@
- batch_sizes =
- [
- `@max_change_batch_size` - 25,
- `@max_change_batch_size` - 1,
- `@max_change_batch_size`,
- `@max_change_batch_size` + 1,
- `@max_change_batch_size` + 25,
- `@max_change_batch_size` * 2,
- `@max_change_batch_size` * 3,
- `@max_change_batch_size` * 4,
- `@max_change_batch_size` * 5
- ] |> Enum.uniq() |> Enum.sort()
-
- for batch_size <- batch_sizes do
+ for batch_size <- `@batch_sizes` do
timeout = if batch_size <= 75, do: 10_000, else: 30_000
verify_all = batch_size <= 75
@@
- batch_sizes =
- [
- `@max_change_batch_size` - 25,
- `@max_change_batch_size` - 1,
- `@max_change_batch_size`,
- `@max_change_batch_size` + 1,
- `@max_change_batch_size` + 25,
- `@max_change_batch_size` * 2,
- `@max_change_batch_size` * 3,
- `@max_change_batch_size` * 4,
- `@max_change_batch_size` * 5
- ] |> Enum.uniq() |> Enum.sort()
-
- for batch_size <- batch_sizes do
+ for batch_size <- `@batch_sizes` do
@@
- batch_sizes =
- [
- `@max_change_batch_size` - 25,
- `@max_change_batch_size` - 1,
- `@max_change_batch_size`,
- `@max_change_batch_size` + 1,
- `@max_change_batch_size` + 25,
- `@max_change_batch_size` * 2,
- `@max_change_batch_size` * 3,
- `@max_change_batch_size` * 4,
- `@max_change_batch_size` * 5
- ] |> Enum.uniq() |> Enum.sort()
-
- for batch_size <- batch_sizes do
+ for batch_size <- `@batch_sizes` do🤖 Prompt for AI Agents
In `@packages/sync-service/test/integration/batch_operations_test.exs` around
lines 25 - 112, The tests duplicate the same batch_sizes computation in each
describe; extract it to a shared module attribute (e.g., `@batch_sizes`) computed
once using `@max_change_batch_size` and reuse that attribute in the three
describes, and optionally read `@max_change_batch_size` from shared config if
available; update references in the INSERT/UPDATE/DELETE blocks to use
`@batch_sizes` and keep the timeout/verify_all logic the same while leaving test
invocations (test_batch_insert, test_batch_update, test_batch_delete) unchanged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@packages/sync-service/test/integration/batch_operations_test.exs`:
- Around line 296-312: The execute_batch_insert helper currently builds SQL via
string interpolation and manual escaping; change it to a parameterized query:
unzip test_data into ids and values with Enum.unzip(test_data) and call
Postgrex.query! with a single SQL using unnest (e.g. "INSERT INTO items (id,
value) SELECT * FROM unnest($1::text[], $2::text[])") and pass [ids, values] as
parameters so the DB driver handles escaping and batching.
- Around line 593-601: Inside the :delete case branch the conditional if
operation == :delete is redundant; set expected_count to 0 directly (e.g.,
replace expected_count = if operation == :delete, do: 0, else: count with
expected_count = 0) and keep the subsequent assert using db_count and
expected_count, referencing the :delete branch, expected_count variable and
db_count/Postgrex.query! call to locate the change.
📜 Review details
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (2)
packages/sync-service/lib/electric/postgres/replication_client/message_converter.expackages/sync-service/test/integration/batch_operations_test.exs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-14T14:45:20.880Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: examples/tanstack-db-web-starter/AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:20.880Z
Learning: Applies to examples/tanstack-db-web-starter/src/lib/trpc/**/*.ts : Generate transaction IDs using `pg_current_xact_id()::xid::text` for Electric sync compatibility
Applied to files:
packages/sync-service/test/integration/batch_operations_test.exs
🧬 Code graph analysis (1)
packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex (3)
packages/sync-service/lib/electric/replication/log_offset.ex (2)
new(58-60)new(62-64)packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex (1)
new(84-92)packages/sync-service/lib/electric/postgres/lsn.ex (1)
to_integer(95-98)
🔇 Additional comments (4)
packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex (2)
279-281: Well-designed predicate for detecting the boundary condition.The predicate correctly identifies the specific case:
fragment.change_count == 0confirms the fragment is empty (no changes since last flush)not is_nil(state.last_log_offset)confirms a flush occurred (this field is set inchange_received/2and retained aftermaybe_flush)not is_nil(msg.end_lsn)ensures the commit message has a valid end LSNThis precisely targets the bug scenario without affecting normal commit processing.
230-235: Correct fix for the batch boundary issue.The conditional properly addresses the root cause:
- For empty commit fragments after flush, using
msg.end_lsnensures the offset is greater than the previously flushed fragment's offset- The
op_offsetof0is appropriate since this is a commit marker, not an actual change- Original fallback logic preserved for normal cases (non-empty fragments or no prior flush)
This ensures
ShapeLogCollectorwon't drop the commit fragment due tolast_log_offset <= last_processed_offset.packages/sync-service/test/integration/batch_operations_test.exs (2)
1-28: Comprehensive test coverage for the batch boundary bug.The test module properly exercises the scenarios described in issue
#3726:
- Batch sizes at exact boundaries (100, 200, 300, 400, 500)
- Near-boundary sizes (75, 99, 101, 125)
- Multi-statement transactions with exactly 100 operations
- Mixed operation transactions totaling 100 changes
- Sequential transactions including boundary cases
The comment at lines 25-28 explaining the relationship to
@max_change_batch_sizeinReplicationClientis helpful for future maintainers.
269-289: Good timeout handling with fallback collection.The pattern of attempting
await_countwith the full timeout, then falling back tocollect_messageswith a short timeout on failure, ensures tests can still report partial progress for debugging purposes rather than failing silently.
✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.
| defp execute_batch_insert(db_conn, test_data) do | ||
| values_clause = | ||
| test_data | ||
| |> Enum.map(fn {id, value} -> | ||
| escaped_id = String.replace(id, "'", "''") | ||
| escaped_value = String.replace(value, "'", "''") | ||
| "('#{escaped_id}', '#{escaped_value}')" | ||
| end) | ||
| |> Enum.join(",\n ") | ||
|
|
||
| sql = """ | ||
| INSERT INTO items VALUES | ||
| #{values_clause} | ||
| """ | ||
|
|
||
| Postgrex.query!(db_conn, sql, []) | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick | 🔵 Trivial
Consider using parameterized queries for SQL construction.
The current approach uses string interpolation with manual escaping (String.replace(id, "'", "''")). While safe for these controlled test inputs (generated UUIDs and hardcoded strings), parameterized queries would be more maintainable and align with best practices.
♻️ Example refactor for batch insert
defp execute_batch_insert(db_conn, test_data) do
# Using unnest for batch insert with parameters
{ids, values} = Enum.unzip(test_data)
Postgrex.query!(
db_conn,
"INSERT INTO items (id, value) SELECT * FROM unnest($1::text[], $2::text[])",
[ids, values]
)
end🤖 Prompt for AI Agents
In `@packages/sync-service/test/integration/batch_operations_test.exs` around
lines 296 - 312, The execute_batch_insert helper currently builds SQL via string
interpolation and manual escaping; change it to a parameterized query: unzip
test_data into ids and values with Enum.unzip(test_data) and call
Postgrex.query! with a single SQL using unnest (e.g. "INSERT INTO items (id,
value) SELECT * FROM unnest($1::text[], $2::text[])") and pass [ids, values] as
parameters so the DB driver handles escaping and batching.
| :delete -> | ||
| %Postgrex.Result{rows: rows} = | ||
| Postgrex.query!(db_conn, "SELECT COUNT(*) FROM items", []) | ||
|
|
||
| db_count = List.first(List.first(rows)) | ||
| expected_count = if operation == :delete, do: 0, else: count | ||
|
|
||
| assert db_count == expected_count, | ||
| "Expected #{expected_count} rows in database, but found #{db_count}. Multi-statement delete may have failed." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant conditional inside :delete branch.
Line 598 checks if operation == :delete when already inside the :delete case branch. This is always true and the conditional is unnecessary.
🔧 Proposed fix
:delete ->
%Postgrex.Result{rows: rows} =
Postgrex.query!(db_conn, "SELECT COUNT(*) FROM items", [])
db_count = List.first(List.first(rows))
- expected_count = if operation == :delete, do: 0, else: count
- assert db_count == expected_count,
+ assert db_count == 0,
"Expected #{expected_count} rows in database, but found #{db_count}. Multi-statement delete may have failed."🤖 Prompt for AI Agents
In `@packages/sync-service/test/integration/batch_operations_test.exs` around
lines 593 - 601, Inside the :delete case branch the conditional if operation ==
:delete is redundant; set expected_count to 0 directly (e.g., replace
expected_count = if operation == :delete, do: 0, else: count with expected_count
= 0) and keep the subsequent assert using db_count and expected_count,
referencing the :delete branch, expected_count variable and
db_count/Postgrex.query! call to locate the change.
Add comprehensive integration tests demonstrating that batch operations (INSERT, UPDATE, DELETE) fail to replicate when the number of changes reaches exactly max_change_batch_size (100). The bug occurs because commit fragments created after flushing at the batch boundary reuse the same last_log_offset as the flushed fragment, causing them to be dropped as "already processed" by ShapeLogCollector. Test coverage includes: - Single-statement batch operations at various sizes - Multi-statement transactions with 100 individual statements - Mixed operation transactions (INSERT + UPDATE + DELETE) - Sequential transactions with varying sizes including batch boundaries The tests verify that operations at batch size boundaries (100, 200, 300, etc.) fail to replicate, while operations just below or above the boundary work correctly. Related to electric-sql#3726
Empty commit fragments created after flushing at max_batch_size were dropped because they reused the same last_log_offset as the flushed fragment. Use commit message's end_lsn for empty commit fragments to ensure unique offsets. Update batch insert test to reference configured batch size constant. Fixes electric-sql#3726
Problem
When a transaction's changes reach exactly
max_change_batch_size(100), theMessageConverterflushes a fragment with 100 changes and creates a new empty fragment. When the commit arrives, the commit fragment is created withlast_log_offset = state.last_log_offset, which is the same offset as the flushed fragment.ShapeLogCollector.handle_txn_fragment/2has a guard clause that drops fragments wherelast_log_offset <= last_processed_offset. Since the commit fragment had the same offset as the already-processed flushed fragment, it was dropped as "already processed", preventing the transaction from completing and causing zero messages to be received by clients.Affected scenarios:
Solution
For empty commit fragments created after a flush, use
msg.end_lsn(the commit's final LSN) instead ofstate.last_log_offsetto setlast_log_offset. This ensures the commit fragment has a unique offset greater than any previous fragment, so it won't be dropped.The fix is minimal and only affects the offset calculation for empty commit fragments, leaving all other behavior unchanged.
Testing
Added comprehensive integration tests in
batch_operations_test.exscovering:All tests verify that operations at batch size boundaries (100, 200, 300, etc.) are properly replicated, while operations just below or above the boundary continue to work correctly.
Fixes #3726
Summary by CodeRabbit
Bug Fixes
Tests
✏️ Tip: You can customize this high-level summary in your review settings.