Skip to content

Conversation

@blj
Copy link

@blj blj commented Jan 16, 2026

Problem

When a transaction's changes reach exactly max_change_batch_size (100), the MessageConverter flushes a fragment with 100 changes and creates a new empty fragment. When the commit arrives, the commit fragment is created with last_log_offset = state.last_log_offset, which is the same offset as the flushed fragment.

ShapeLogCollector.handle_txn_fragment/2 has a guard clause that drops fragments where last_log_offset <= last_processed_offset. Since the commit fragment had the same offset as the already-processed flushed fragment, it was dropped as "already processed", preventing the transaction from completing and causing zero messages to be received by clients.

Affected scenarios:

  • Batch operations (INSERT/UPDATE/DELETE) with exactly 100, 200, 300, etc. changes
  • Multi-statement transactions with exactly 100 individual statements
  • Mixed operation transactions that total exactly 100 changes
  • Sequential transactions where any transaction reaches exactly 100 changes

Solution

For empty commit fragments created after a flush, use msg.end_lsn (the commit's final LSN) instead of state.last_log_offset to set last_log_offset. This ensures the commit fragment has a unique offset greater than any previous fragment, so it won't be dropped.

The fix is minimal and only affects the offset calculation for empty commit fragments, leaving all other behavior unchanged.

Testing

Added comprehensive integration tests in batch_operations_test.exs covering:

  • Single-statement batch operations (INSERT/UPDATE/DELETE) at various sizes
  • Multi-statement transactions with 100 individual statements
  • Mixed operation transactions (INSERT + UPDATE + DELETE combinations)
  • Sequential transactions with varying sizes including batch boundaries

All tests verify that operations at batch size boundaries (100, 200, 300, etc.) are properly replicated, while operations just below or above the boundary continue to work correctly.

Fixes #3726

Summary by CodeRabbit

  • Bug Fixes

    • Corrected handling of empty commit fragments during synchronization so commits with no changes properly update replication offsets and flush as expected.
  • Tests

    • Added a comprehensive integration test suite covering batch operations, multi-statement and mixed-operation transactions, sequential workloads, and verification helpers to validate replication behavior across varied scenarios.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Jan 16, 2026

📝 Walkthrough

Walkthrough

Modifies commit handling in the Postgres replication message converter to detect empty commit fragments after flush and choose the appropriate last_log_offset; adds a new comprehensive integration test module exercising batch and multi-statement transaction replication across many sizes and operation mixes.

Changes

Cohort / File(s) Summary
Message Converter Logic
packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex
Added private predicate empty_commit_fragment_after_flush?/3 and updated Commit handling to set last_log_offset from msg.end_lsn when the predicate is true; otherwise preserve prior logic (use state.last_log_offset if present, else derive from fragment.lsn).
Batch Operations Integration Tests
packages/sync-service/test/integration/batch_operations_test.exs
New integration test module Electric.Integration.BatchOperationsTest exercising batch INSERT/UPDATE/DELETE across many sizes (including boundary sizes), multi-statement and mixed transactions, and helpers for generating data, executing Postgrex batches/transactions, streaming ChangeMessage events, and validating emitted replication messages.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~30 minutes

Poem

🐰 I nibble logs and count each row,
Through flush and commit my whiskers glow,
Batches hop, some big, some small,
I watch their messages — I catch them all! ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and concisely describes the main fix: addressing dropped commit fragments that occur at batch size boundaries.
Linked Issues check ✅ Passed The PR fully addresses issue #3726 by fixing the root cause of batch replication failures at multiples of 100, implementing the identified solution, and adding comprehensive integration test coverage.
Out of Scope Changes check ✅ Passed All changes are directly scoped to fixing the identified bug: the core fix in message_converter.ex and comprehensive integration tests in batch_operations_test.exs.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

📜 Recent review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a787aac and 3ad9803.

📒 Files selected for processing (2)
  • packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex
  • packages/sync-service/test/integration/batch_operations_test.exs
🧰 Additional context used
🧠 Learnings (6)
📚 Learning: 2026-01-14T14:45:20.880Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: examples/tanstack-db-web-starter/AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:20.880Z
Learning: Applies to examples/tanstack-db-web-starter/src/lib/trpc/**/*.ts : Generate transaction IDs using `pg_current_xact_id()::xid::text` for Electric sync compatibility

Applied to files:

  • packages/sync-service/test/integration/batch_operations_test.exs
📚 Learning: 2026-01-14T14:45:20.880Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: examples/tanstack-db-web-starter/AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:20.880Z
Learning: Applies to examples/tanstack-db-web-starter/**/*.{ts,tsx} : All reads from the Postgres database must be done via the Electric sync engine, not tRPC

Applied to files:

  • packages/sync-service/test/integration/batch_operations_test.exs
📚 Learning: 2026-01-14T14:45:05.838Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:05.838Z
Learning: Applies to **/{api,server,routes,db}/**/*.{ts,tsx} : Backend API endpoints must return Postgres txid (via `SELECT pg_current_xact_id()::xid::text as txid`) as an integer for client-side synchronization

Applied to files:

  • packages/sync-service/test/integration/batch_operations_test.exs
📚 Learning: 2026-01-14T14:45:20.880Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: examples/tanstack-db-web-starter/AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:20.880Z
Learning: Applies to examples/tanstack-db-web-starter/**/*.{ts,tsx} : NEVER use tRPC for data reads - only use Electric SQL and useLiveQuery

Applied to files:

  • packages/sync-service/test/integration/batch_operations_test.exs
📚 Learning: 2026-01-14T14:45:05.838Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:05.838Z
Learning: Avoid old Electric patterns (bidirectional SQLite sync, `electrify()` API) – use Electric HTTP streaming with TanStack DB collections instead

Applied to files:

  • packages/sync-service/test/integration/batch_operations_test.exs
📚 Learning: 2026-01-14T14:45:20.880Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: examples/tanstack-db-web-starter/AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:20.880Z
Learning: Applies to examples/tanstack-db-web-starter/src/lib/collections.ts : Use TanStack DB with Electric Sync Collections for client-side data management

Applied to files:

  • packages/sync-service/test/integration/batch_operations_test.exs
🔇 Additional comments (6)
packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex (2)

230-235: LGTM! Correct fix for the batch boundary commit fragment issue.

The conditional properly detects when a commit follows a flush with no additional changes, and uses msg.end_lsn to give the commit fragment a unique offset that won't be dropped by ShapeLogCollector.handle_txn_fragment/2's guard clause.


279-281: Clear predicate encapsulating the edge case detection.

The function name and conditions clearly express when an empty commit fragment follows a flush, making the fix logic in convert/2 easy to understand.

packages/sync-service/test/integration/batch_operations_test.exs (4)

1-28: Well-structured test module with good documentation.

The moduledoc clearly explains the test coverage scope, and the comment documenting @max_change_batch_size helps future maintainers understand the dependency on ReplicationClient.


234-267: Solid test helper structure with proper verification.

The helper properly:

  1. Sets up the live stream and waits for up-to-date state
  2. Executes the batch operation
  3. Verifies database state independently
  4. Collects and validates replicated messages

This pattern is consistently applied across INSERT, UPDATE, and DELETE tests, ensuring reliable regression coverage for the batch boundary fix.


844-880: Test structure correctly separates insert-only and mixed-op sequential transaction flows.

The design uses test_sequential_transactions for insert-only scenarios (lines 189-214) and test_sequential_transactions_with_mixed_ops for mixed operations (line 216), avoiding the complexity of handling updates/deletes on non-existent rows in the simpler helper.


667-715: Mixed operation transaction handling is well-designed.

The separation of concerns between prepare_mixed_operation_data (tracking which IDs need pre-insertion) and execute_mixed_operation_txn (executing operations with correct ID references) ensures UPDATE and DELETE operations target existing rows.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@packages/sync-service/test/integration/batch_operations_test.exs`:
- Around line 758-822: The helper execute_sequential_transactions/2 assumes only
inserts (it generates IDs from an insert counter via generate_uuid) so
update/delete ops will target non-existent rows; add a guard at the start of
execute_sequential_transactions/2 that scans the transactions list and either
(1) raises a clear error when any non-:insert operation is present, or (2)
delegates to the mixed-op helper (e.g., the existing mixed transactions helper
used elsewhere, or collect_all_changes_mixed workflow) so update/delete
operations are handled correctly; reference execute_sequential_transactions/2
and generate_uuid to locate where to add the validation and the code path
switch.
- Around line 25-112: The tests duplicate the same batch_sizes computation in
each describe; extract it to a shared module attribute (e.g., `@batch_sizes`)
computed once using `@max_change_batch_size` and reuse that attribute in the three
describes, and optionally read `@max_change_batch_size` from shared config if
available; update references in the INSERT/UPDATE/DELETE blocks to use
`@batch_sizes` and keep the timeout/verify_all logic the same while leaving test
invocations (test_batch_insert, test_batch_update, test_batch_delete) unchanged.
📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between f10e5a4 and 83d067a.

📒 Files selected for processing (2)
  • packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex
  • packages/sync-service/test/integration/batch_operations_test.exs
🧰 Additional context used
🧬 Code graph analysis (2)
packages/sync-service/test/integration/batch_operations_test.exs (1)
packages/sync-service/test/support/stream_consumer.ex (3)
  • assert_up_to_date (124-135)
  • await_count (161-166)
  • collect_messages (242-247)
packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex (2)
packages/sync-service/lib/electric/replication/log_offset.ex (2)
  • new (58-60)
  • new (62-64)
packages/sync-service/lib/electric/postgres/lsn.ex (1)
  • to_integer (95-98)
🔇 Additional comments (8)
packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex (2)

230-235: Commit fragment now advances offset at the batch boundary.

Using msg.end_lsn for empty post-flush commit fragments should prevent duplicate offsets and dropped commits.


279-281: TransactionFragment struct confirms change_count defaults to 0.

Verified: the struct at packages/sync-service/lib/electric/replication/changes.ex initializes change_count: 0. The predicate in empty_commit_fragment_after_flush? will work as intended.

packages/sync-service/test/integration/batch_operations_test.exs (6)

185-287: Batch insert helpers validate DB state and stream output thoroughly.


291-392: Batch update helpers look solid and are consistent with insert flow.


395-496: Batch delete helpers are consistent and correctly validate PK-only payloads.


500-594: Multi-statement transaction helpers provide clear DB and stream validation.


598-755: Mixed-operation transaction setup and expected counts are well handled.


824-941: Sequential mixed-op helpers are comprehensive and consistent with mixed-txn flow.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines 25 to 136
# This should match @max_change_batch_size in Electric.Postgres.ReplicationClient.
# The value 100 is set in ReplicationClient to balance performance (avoiding message
# passing overhead) and memory usage. If that value changes, update this constant.
@max_change_batch_size 100

describe "batch INSERT replication" do
setup [:with_unique_db, :with_basic_tables, :with_sql_execute]
setup :with_complete_stack
setup :with_electric_client

batch_sizes =
[
@max_change_batch_size - 25,
@max_change_batch_size - 1,
@max_change_batch_size,
@max_change_batch_size + 1,
@max_change_batch_size + 25,
@max_change_batch_size * 2,
@max_change_batch_size * 3,
@max_change_batch_size * 4,
@max_change_batch_size * 5
] |> Enum.uniq() |> Enum.sort()

for batch_size <- batch_sizes do
timeout = if batch_size <= 75, do: 10_000, else: 30_000
verify_all = batch_size <= 75

test "batch insert (#{batch_size})", %{client: client, db_conn: db_conn} do
test_batch_insert(client, db_conn, unquote(batch_size), unquote(timeout), unquote(verify_all))
end
end
end

describe "batch UPDATE replication" do
setup [:with_unique_db, :with_basic_tables, :with_sql_execute]
setup :with_complete_stack
setup :with_electric_client

batch_sizes =
[
@max_change_batch_size - 25,
@max_change_batch_size - 1,
@max_change_batch_size,
@max_change_batch_size + 1,
@max_change_batch_size + 25,
@max_change_batch_size * 2,
@max_change_batch_size * 3,
@max_change_batch_size * 4,
@max_change_batch_size * 5
] |> Enum.uniq() |> Enum.sort()

for batch_size <- batch_sizes do
timeout = if batch_size <= 75, do: 10_000, else: 30_000
verify_all = batch_size <= 75

test "batch update (#{batch_size})", %{client: client, db_conn: db_conn} do
test_batch_update(client, db_conn, unquote(batch_size), unquote(timeout), unquote(verify_all))
end
end
end

describe "batch DELETE replication" do
setup [:with_unique_db, :with_basic_tables, :with_sql_execute]
setup :with_complete_stack
setup :with_electric_client

batch_sizes =
[
@max_change_batch_size - 25,
@max_change_batch_size - 1,
@max_change_batch_size,
@max_change_batch_size + 1,
@max_change_batch_size + 25,
@max_change_batch_size * 2,
@max_change_batch_size * 3,
@max_change_batch_size * 4,
@max_change_batch_size * 5
] |> Enum.uniq() |> Enum.sort()

for batch_size <- batch_sizes do
timeout = if batch_size <= 75, do: 10_000, else: 30_000
verify_all = batch_size <= 75

test "batch delete (#{batch_size})", %{client: client, db_conn: db_conn} do
test_batch_delete(client, db_conn, unquote(batch_size), unquote(timeout), unquote(verify_all))
end
end
end
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Extract the shared batch size list (and consider sourcing max size from config).

The same batch_sizes list is repeated across INSERT/UPDATE/DELETE describes. Extracting it into a module attribute keeps the tests in sync and reduces drift. If possible, consider sourcing @max_change_batch_size from runtime config or a shared constant to avoid manual updates.

♻️ Suggested refactor
@@
-  `@max_change_batch_size` 100
+  `@max_change_batch_size` 100
+  `@batch_sizes` [
+    `@max_change_batch_size` - 25,
+    `@max_change_batch_size` - 1,
+    `@max_change_batch_size`,
+    `@max_change_batch_size` + 1,
+    `@max_change_batch_size` + 25,
+    `@max_change_batch_size` * 2,
+    `@max_change_batch_size` * 3,
+    `@max_change_batch_size` * 4,
+    `@max_change_batch_size` * 5
+  ] |> Enum.uniq() |> Enum.sort()
@@
-    batch_sizes =
-      [
-        `@max_change_batch_size` - 25,
-        `@max_change_batch_size` - 1,
-        `@max_change_batch_size`,
-        `@max_change_batch_size` + 1,
-        `@max_change_batch_size` + 25,
-        `@max_change_batch_size` * 2,
-        `@max_change_batch_size` * 3,
-        `@max_change_batch_size` * 4,
-        `@max_change_batch_size` * 5
-      ] |> Enum.uniq() |> Enum.sort()
-
-    for batch_size <- batch_sizes do
+    for batch_size <- `@batch_sizes` do
       timeout = if batch_size <= 75, do: 10_000, else: 30_000
       verify_all = batch_size <= 75
@@
-    batch_sizes =
-      [
-        `@max_change_batch_size` - 25,
-        `@max_change_batch_size` - 1,
-        `@max_change_batch_size`,
-        `@max_change_batch_size` + 1,
-        `@max_change_batch_size` + 25,
-        `@max_change_batch_size` * 2,
-        `@max_change_batch_size` * 3,
-        `@max_change_batch_size` * 4,
-        `@max_change_batch_size` * 5
-      ] |> Enum.uniq() |> Enum.sort()
-
-    for batch_size <- batch_sizes do
+    for batch_size <- `@batch_sizes` do
@@
-    batch_sizes =
-      [
-        `@max_change_batch_size` - 25,
-        `@max_change_batch_size` - 1,
-        `@max_change_batch_size`,
-        `@max_change_batch_size` + 1,
-        `@max_change_batch_size` + 25,
-        `@max_change_batch_size` * 2,
-        `@max_change_batch_size` * 3,
-        `@max_change_batch_size` * 4,
-        `@max_change_batch_size` * 5
-      ] |> Enum.uniq() |> Enum.sort()
-
-    for batch_size <- batch_sizes do
+    for batch_size <- `@batch_sizes` do
🤖 Prompt for AI Agents
In `@packages/sync-service/test/integration/batch_operations_test.exs` around
lines 25 - 112, The tests duplicate the same batch_sizes computation in each
describe; extract it to a shared module attribute (e.g., `@batch_sizes`) computed
once using `@max_change_batch_size` and reuse that attribute in the three
describes, and optionally read `@max_change_batch_size` from shared config if
available; update references in the INSERT/UPDATE/DELETE blocks to use
`@batch_sizes` and keep the timeout/verify_all logic the same while leaving test
invocations (test_batch_insert, test_batch_update, test_batch_delete) unchanged.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Fix all issues with AI agents
In `@packages/sync-service/test/integration/batch_operations_test.exs`:
- Around line 296-312: The execute_batch_insert helper currently builds SQL via
string interpolation and manual escaping; change it to a parameterized query:
unzip test_data into ids and values with Enum.unzip(test_data) and call
Postgrex.query! with a single SQL using unnest (e.g. "INSERT INTO items (id,
value) SELECT * FROM unnest($1::text[], $2::text[])") and pass [ids, values] as
parameters so the DB driver handles escaping and batching.
- Around line 593-601: Inside the :delete case branch the conditional if
operation == :delete is redundant; set expected_count to 0 directly (e.g.,
replace expected_count = if operation == :delete, do: 0, else: count with
expected_count = 0) and keep the subsequent assert using db_count and
expected_count, referencing the :delete branch, expected_count variable and
db_count/Postgrex.query! call to locate the change.
📜 Review details

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 83d067a and a787aac.

📒 Files selected for processing (2)
  • packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex
  • packages/sync-service/test/integration/batch_operations_test.exs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2026-01-14T14:45:20.880Z
Learnt from: CR
Repo: electric-sql/electric PR: 0
File: examples/tanstack-db-web-starter/AGENTS.md:0-0
Timestamp: 2026-01-14T14:45:20.880Z
Learning: Applies to examples/tanstack-db-web-starter/src/lib/trpc/**/*.ts : Generate transaction IDs using `pg_current_xact_id()::xid::text` for Electric sync compatibility

Applied to files:

  • packages/sync-service/test/integration/batch_operations_test.exs
🧬 Code graph analysis (1)
packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex (3)
packages/sync-service/lib/electric/replication/log_offset.ex (2)
  • new (58-60)
  • new (62-64)
packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex (1)
  • new (84-92)
packages/sync-service/lib/electric/postgres/lsn.ex (1)
  • to_integer (95-98)
🔇 Additional comments (4)
packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex (2)

279-281: Well-designed predicate for detecting the boundary condition.

The predicate correctly identifies the specific case:

  • fragment.change_count == 0 confirms the fragment is empty (no changes since last flush)
  • not is_nil(state.last_log_offset) confirms a flush occurred (this field is set in change_received/2 and retained after maybe_flush)
  • not is_nil(msg.end_lsn) ensures the commit message has a valid end LSN

This precisely targets the bug scenario without affecting normal commit processing.


230-235: Correct fix for the batch boundary issue.

The conditional properly addresses the root cause:

  • For empty commit fragments after flush, using msg.end_lsn ensures the offset is greater than the previously flushed fragment's offset
  • The op_offset of 0 is appropriate since this is a commit marker, not an actual change
  • Original fallback logic preserved for normal cases (non-empty fragments or no prior flush)

This ensures ShapeLogCollector won't drop the commit fragment due to last_log_offset <= last_processed_offset.

packages/sync-service/test/integration/batch_operations_test.exs (2)

1-28: Comprehensive test coverage for the batch boundary bug.

The test module properly exercises the scenarios described in issue #3726:

  • Batch sizes at exact boundaries (100, 200, 300, 400, 500)
  • Near-boundary sizes (75, 99, 101, 125)
  • Multi-statement transactions with exactly 100 operations
  • Mixed operation transactions totaling 100 changes
  • Sequential transactions including boundary cases

The comment at lines 25-28 explaining the relationship to @max_change_batch_size in ReplicationClient is helpful for future maintainers.


269-289: Good timeout handling with fallback collection.

The pattern of attempting await_count with the full timeout, then falling back to collect_messages with a short timeout on failure, ensures tests can still report partial progress for debugging purposes rather than failing silently.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +296 to +312
defp execute_batch_insert(db_conn, test_data) do
values_clause =
test_data
|> Enum.map(fn {id, value} ->
escaped_id = String.replace(id, "'", "''")
escaped_value = String.replace(value, "'", "''")
"('#{escaped_id}', '#{escaped_value}')"
end)
|> Enum.join(",\n ")

sql = """
INSERT INTO items VALUES
#{values_clause}
"""

Postgrex.query!(db_conn, sql, [])
end
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Consider using parameterized queries for SQL construction.

The current approach uses string interpolation with manual escaping (String.replace(id, "'", "''")). While safe for these controlled test inputs (generated UUIDs and hardcoded strings), parameterized queries would be more maintainable and align with best practices.

♻️ Example refactor for batch insert
defp execute_batch_insert(db_conn, test_data) do
  # Using unnest for batch insert with parameters
  {ids, values} = Enum.unzip(test_data)
  
  Postgrex.query!(
    db_conn,
    "INSERT INTO items (id, value) SELECT * FROM unnest($1::text[], $2::text[])",
    [ids, values]
  )
end
🤖 Prompt for AI Agents
In `@packages/sync-service/test/integration/batch_operations_test.exs` around
lines 296 - 312, The execute_batch_insert helper currently builds SQL via string
interpolation and manual escaping; change it to a parameterized query: unzip
test_data into ids and values with Enum.unzip(test_data) and call
Postgrex.query! with a single SQL using unnest (e.g. "INSERT INTO items (id,
value) SELECT * FROM unnest($1::text[], $2::text[])") and pass [ids, values] as
parameters so the DB driver handles escaping and batching.

Comment on lines +593 to +601
:delete ->
%Postgrex.Result{rows: rows} =
Postgrex.query!(db_conn, "SELECT COUNT(*) FROM items", [])

db_count = List.first(List.first(rows))
expected_count = if operation == :delete, do: 0, else: count

assert db_count == expected_count,
"Expected #{expected_count} rows in database, but found #{db_count}. Multi-statement delete may have failed."
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Redundant conditional inside :delete branch.

Line 598 checks if operation == :delete when already inside the :delete case branch. This is always true and the conditional is unnecessary.

🔧 Proposed fix
         :delete ->
           %Postgrex.Result{rows: rows} =
             Postgrex.query!(db_conn, "SELECT COUNT(*) FROM items", [])

           db_count = List.first(List.first(rows))
-          expected_count = if operation == :delete, do: 0, else: count

-          assert db_count == expected_count,
+          assert db_count == 0,
                  "Expected #{expected_count} rows in database, but found #{db_count}. Multi-statement delete may have failed."
🤖 Prompt for AI Agents
In `@packages/sync-service/test/integration/batch_operations_test.exs` around
lines 593 - 601, Inside the :delete case branch the conditional if operation ==
:delete is redundant; set expected_count to 0 directly (e.g., replace
expected_count = if operation == :delete, do: 0, else: count with expected_count
= 0) and keep the subsequent assert using db_count and expected_count,
referencing the :delete branch, expected_count variable and
db_count/Postgrex.query! call to locate the change.

blj and others added 2 commits January 20, 2026 10:21
Add comprehensive integration tests demonstrating that batch operations
(INSERT, UPDATE, DELETE) fail to replicate when the number of changes
reaches exactly max_change_batch_size (100).

The bug occurs because commit fragments created after flushing at the
batch boundary reuse the same last_log_offset as the flushed fragment,
causing them to be dropped as "already processed" by ShapeLogCollector.

Test coverage includes:
- Single-statement batch operations at various sizes
- Multi-statement transactions with 100 individual statements
- Mixed operation transactions (INSERT + UPDATE + DELETE)
- Sequential transactions with varying sizes including batch boundaries

The tests verify that operations at batch size boundaries (100, 200, 300,
etc.) fail to replicate, while operations just below or above the
boundary work correctly.

Related to electric-sql#3726
Empty commit fragments created after flushing at max_batch_size were
dropped because they reused the same last_log_offset as the flushed
fragment. Use commit message's end_lsn for empty commit fragments to
ensure unique offsets.

Update batch insert test to reference configured batch size constant.

Fixes electric-sql#3726
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Batch INSERT Replication Fails for Sizes Divisible by 100

1 participant