Skip to content

Commit

Permalink
Merge pull request #107 from MattExact/postgres-always-send-feedback
Browse files Browse the repository at this point in the history
always send feedback for postgres replication messages
  • Loading branch information
long2ice authored May 23, 2024
2 parents f837e60 + b690b16 commit fc1877d
Showing 1 changed file with 48 additions and 43 deletions.
91 changes: 48 additions & 43 deletions meilisync/source/postgres.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import json
from asyncio import Queue
from typing import List
from typing import List, Any

import psycopg2
import psycopg2.errors
Expand Down Expand Up @@ -88,51 +88,56 @@ def _():

def _consumer(self, msg: ReplicationMessage):
payload = json.loads(msg.payload)
changes = payload.get("change")
if not changes:
return
next_lsn = payload["nextlsn"]

changes = payload.get("change", [])
for change in changes:
kind = change.get("kind")
table = change.get("table")
if table not in self.tables:
continue
columnnames = change.get("columnnames", [])
columnvalues = change.get("columnvalues", [])
columntypes = change.get("columntypes", [])

for i in range(len(columntypes)):
if columntypes[i] == "json":
columnvalues[i] = json.loads(columnvalues[i])

if kind == "update":
values = dict(zip(columnnames, columnvalues))
event_type = EventType.update
elif kind == "delete":
values = (
dict(zip(columnnames, columnvalues))
if columnvalues
else {change["oldkeys"]["keynames"][0]: change["oldkeys"]["keyvalues"][0]}
)
event_type = EventType.delete
elif kind == "insert":
values = dict(zip(columnnames, columnvalues))
event_type = EventType.create
else:
return
asyncio.new_event_loop().run_until_complete(
self.queue.put( # type: ignore
Event(
type=event_type,
table=table,
data=values,
progress={"start_lsn": payload.get("nextlsn")},
)
)
self.__handle_change(change, next_lsn)

# Always report success to the server to avoid a “disk full” condition.
# https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor.consume_stream
msg.cursor.send_feedback(flush_lsn=msg.data_start)

def __handle_change(self, change: dict[str, Any], next_lsn: str):
table = change.get("table")
if table not in self.tables:
return

columnnames = change.get("columnnames", [])
columnvalues = change.get("columnvalues", [])
columntypes = change.get("columntypes", [])

for i in range(len(columntypes)):
if columntypes[i] == "json":
columnvalues[i] = json.loads(columnvalues[i])

kind = change.get("kind")
if kind == "update":
values = dict(zip(columnnames, columnvalues))
event_type = EventType.update
elif kind == "delete":
values = (
dict(zip(columnnames, columnvalues))
if columnvalues
else {change["oldkeys"]["keynames"][0]: change["oldkeys"]["keyvalues"][0]}
)
event_type = EventType.delete
elif kind == "insert":
values = dict(zip(columnnames, columnvalues))
event_type = EventType.create
else:
return

# Report success to the server to avoid a “disk full” condition.
# https://www.psycopg.org/docs/extras.html#psycopg2.extras.ReplicationCursor.consume_stream
msg.cursor.send_feedback(flush_lsn=msg.data_start)
asyncio.new_event_loop().run_until_complete(
self.queue.put( # type: ignore
Event(
type=event_type,
table=table,
data=values,
progress={"start_lsn": next_lsn},
)
)
)

async def get_count(self, sync: Sync):
with self.conn_dict.cursor() as cur:
Expand Down

0 comments on commit fc1877d

Please sign in to comment.