Skip to content

Commit

Permalink
use a server side cursor for performant/efficient full table retrieval
Browse files Browse the repository at this point in the history
  • Loading branch information
MattExact committed May 20, 2024
1 parent e144418 commit 2acbc90
Showing 1 changed file with 12 additions and 14 deletions.
26 changes: 12 additions & 14 deletions meilisync/source/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,20 @@ async def get_full_data(self, sync: Sync, size: int):
fields = ", ".join(f"{field} as {sync.fields[field] or field}" for field in sync.fields)
else:
fields = "*"
offset = 0

def _():
with self.conn_dict.cursor() as cur:
cur.execute(
f"SELECT {fields} FROM {sync.table} ORDER BY "
f"{sync.pk} LIMIT {size} OFFSET {offset}"
)
return cur.fetchall()
def execute():
cur.execute(
f"SELECT {fields} FROM {sync.table}"
)

while True:
ret = await asyncio.get_event_loop().run_in_executor(None, _)
if not ret:
break
offset += size
yield ret
def fetch():
return cur.fetchmany(size)

# Open a server-side cursor and fetch the dataset incrementally.
with self.conn_dict.cursor(name="get_full_data") as cur:
await asyncio.get_event_loop().run_in_executor(None, execute)
while rows := await asyncio.get_event_loop().run_in_executor(None, fetch):
yield rows

def _consumer(self, msg: ReplicationMessage):
payload = json.loads(msg.payload)
Expand Down

0 comments on commit 2acbc90

Please sign in to comment.