Skip to content

Implement efficient prefetch for parallel bitmap heap scan #258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 64 additions & 130 deletions src/backend/executor/nodeBitmapHeapscan.c
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ BitmapHeapNext(BitmapHeapScanState *node)
*/
pstate->tbmiterator = tbm_prepare_shared_iterate(tbm);
#ifdef USE_PREFETCH
node->n_prefetch_requests = 0;
node->prefetch_request_pos = 0;
if (node->prefetch_maximum > 0)
{
pstate->prefetch_iterator =
Expand All @@ -173,13 +175,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
tbm_attach_shared_iterate(dsa, pstate->tbmiterator);
node->tbmres = tbmres = NULL;

#ifdef USE_PREFETCH
if (node->prefetch_maximum > 0)
{
node->shared_prefetch_iterator =
tbm_attach_shared_iterate(dsa, pstate->prefetch_iterator);
}
#endif /* USE_PREFETCH */
}
node->initialized = true;
}
Expand All @@ -198,15 +193,24 @@ BitmapHeapNext(BitmapHeapScanState *node)
if (!pstate)
node->tbmres = tbmres = tbm_iterate(tbmiterator);
else
node->tbmres = tbmres = tbm_shared_iterate(shared_tbmiterator);
{
if (node->n_prefetch_requests != 0)
{
node->tbmres = tbmres = (TBMIterateResult *)&node->prefetch_requests[node->prefetch_request_pos];
node->n_prefetch_requests -= 1;
node->prefetch_request_pos = (node->prefetch_request_pos + 1) % MAX_IO_CONCURRENCY;
if (node->prefetch_pages != 0)
node->prefetch_pages -= 1;
}
else
node->tbmres = tbmres = tbm_shared_iterate(shared_tbmiterator);
}
if (tbmres == NULL)
{
/* no more entries in the bitmap */
break;
}

BitmapAdjustPrefetchIterator(node, tbmres);

/*
* We can skip fetching the heap page if we don't need any fields
* from the heap, and the bitmap entries don't need rechecking,
Expand Down Expand Up @@ -361,54 +365,21 @@ BitmapAdjustPrefetchIterator(BitmapHeapScanState *node,
TBMIterateResult *tbmres)
{
#ifdef USE_PREFETCH
ParallelBitmapHeapState *pstate = node->pstate;
TBMIterator *prefetch_iterator = node->prefetch_iterator;
Assert(node->pstate == NULL);

if (pstate == NULL)
if (node->prefetch_pages > 0)
{
TBMIterator *prefetch_iterator = node->prefetch_iterator;

if (node->prefetch_pages > 0)
{
/* The main iterator has closed the distance by one page */
node->prefetch_pages--;
}
else if (prefetch_iterator)
{
/* Do not let the prefetch iterator get behind the main one */
TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);

if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
elog(ERROR, "prefetch and main iterators are out of sync");
}
return;
/* The main iterator has closed the distance by one page */
node->prefetch_pages--;
}

if (node->prefetch_maximum > 0)
else if (prefetch_iterator)
{
TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
/* Do not let the prefetch iterator get behind the main one */
TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);

SpinLockAcquire(&pstate->mutex);
if (pstate->prefetch_pages > 0)
{
pstate->prefetch_pages--;
SpinLockRelease(&pstate->mutex);
}
else
{
/* Release the mutex before iterating */
SpinLockRelease(&pstate->mutex);

/*
* In case of shared mode, we can not ensure that the current
* blockno of the main iterator and that of the prefetch iterator
* are same. It's possible that whatever blockno we are
* prefetching will be processed by another process. Therefore,
* we don't validate the blockno here as we do in non-parallel
* case.
*/
if (prefetch_iterator)
tbm_shared_iterate(prefetch_iterator);
}
if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
elog(ERROR, "prefetch and main iterators are out of sync");
}
#endif /* USE_PREFETCH */
}
Expand All @@ -425,35 +396,14 @@ static inline void
BitmapAdjustPrefetchTarget(BitmapHeapScanState *node)
{
#ifdef USE_PREFETCH
ParallelBitmapHeapState *pstate = node->pstate;

if (pstate == NULL)
{
if (node->prefetch_target >= node->prefetch_maximum)
/* don't increase any further */ ;
else if (node->prefetch_target >= node->prefetch_maximum / 2)
node->prefetch_target = node->prefetch_maximum;
else if (node->prefetch_target > 0)
node->prefetch_target *= 2;
else
node->prefetch_target++;
return;
}

/* Do an unlocked check first to save spinlock acquisitions. */
if (pstate->prefetch_target < node->prefetch_maximum)
{
SpinLockAcquire(&pstate->mutex);
if (pstate->prefetch_target >= node->prefetch_maximum)
/* don't increase any further */ ;
else if (pstate->prefetch_target >= node->prefetch_maximum / 2)
pstate->prefetch_target = node->prefetch_maximum;
else if (pstate->prefetch_target > 0)
pstate->prefetch_target *= 2;
else
pstate->prefetch_target++;
SpinLockRelease(&pstate->mutex);
}
if (node->prefetch_target >= node->prefetch_maximum)
/* don't increase any further */ ;
else if (node->prefetch_target >= node->prefetch_maximum / 2)
node->prefetch_target = node->prefetch_maximum;
else if (node->prefetch_target > 0)
node->prefetch_target *= 2;
else
node->prefetch_target++;
#endif /* USE_PREFETCH */
}

Expand Down Expand Up @@ -507,56 +457,46 @@ BitmapPrefetch(BitmapHeapScanState *node, TableScanDesc scan)
PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
}
}

return;
}

if (pstate->prefetch_pages < pstate->prefetch_target)
else
{
TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;

if (prefetch_iterator)
while (1)
{
while (1)
{
TBMIterateResult *tbmpre;
bool do_prefetch = false;
bool skip_fetch;
TBMIterateResult *tbmpre;
bool do_prefetch = false;
bool skip_fetch;

/*
* Recheck under the mutex. If some other process has already
* done enough prefetching then we need not to do anything.
*/
SpinLockAcquire(&pstate->mutex);
if (pstate->prefetch_pages < pstate->prefetch_target)
{
pstate->prefetch_pages++;
do_prefetch = true;
}
SpinLockRelease(&pstate->mutex);
if (node->prefetch_pages < node->prefetch_target)
{
Assert(node->n_prefetch_requests < MAX_IO_CONCURRENCY);
node->prefetch_pages++;
do_prefetch = true;
}

if (!do_prefetch)
return;
if (!do_prefetch)
return;

tbmpre = tbm_shared_iterate(prefetch_iterator);
if (tbmpre == NULL)
{
/* No more pages to prefetch */
tbm_end_shared_iterate(prefetch_iterator);
node->shared_prefetch_iterator = NULL;
break;
}
tbmpre = tbm_shared_iterate(node->shared_tbmiterator);
if (tbmpre != NULL)
{
memcpy(&node->prefetch_requests[(node->prefetch_request_pos + node->n_prefetch_requests) % MAX_IO_CONCURRENCY], tbmpre, sizeof(TBMIteratePrefetchResult));
node->n_prefetch_requests += 1;
}
else
{
/* No more pages to prefetch */
break;
}

/* As above, skip prefetch if we expect not to need page */
skip_fetch = (node->can_skip_fetch &&
(node->tbmres ? !node->tbmres->recheck : false) &&
VM_ALL_VISIBLE(node->ss.ss_currentRelation,
tbmpre->blockno,
&node->pvmbuffer));
/* As above, skip prefetch if we expect not to need page */
skip_fetch = (node->can_skip_fetch &&
(node->tbmres ? !node->tbmres->recheck : false) &&
VM_ALL_VISIBLE(node->ss.ss_currentRelation,
tbmpre->blockno,
&node->pvmbuffer));

if (!skip_fetch)
PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
}
if (!skip_fetch)
PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
}
}
#endif /* USE_PREFETCH */
Expand Down Expand Up @@ -613,8 +553,6 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
tbm_end_iterate(node->prefetch_iterator);
if (node->shared_tbmiterator)
tbm_end_shared_iterate(node->shared_tbmiterator);
if (node->shared_prefetch_iterator)
tbm_end_shared_iterate(node->shared_prefetch_iterator);
if (node->tbm)
tbm_free(node->tbm);
if (node->vmbuffer != InvalidBuffer)
Expand All @@ -627,7 +565,6 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
node->prefetch_iterator = NULL;
node->initialized = false;
node->shared_tbmiterator = NULL;
node->shared_prefetch_iterator = NULL;
node->vmbuffer = InvalidBuffer;
node->pvmbuffer = InvalidBuffer;

Expand Down Expand Up @@ -683,8 +620,6 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
tbm_free(node->tbm);
if (node->shared_tbmiterator)
tbm_end_shared_iterate(node->shared_tbmiterator);
if (node->shared_prefetch_iterator)
tbm_end_shared_iterate(node->shared_prefetch_iterator);
if (node->vmbuffer != InvalidBuffer)
ReleaseBuffer(node->vmbuffer);
if (node->pvmbuffer != InvalidBuffer)
Expand Down Expand Up @@ -739,7 +674,6 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
scanstate->pscan_len = 0;
scanstate->initialized = false;
scanstate->shared_tbmiterator = NULL;
scanstate->shared_prefetch_iterator = NULL;
scanstate->pstate = NULL;

/*
Expand Down
16 changes: 14 additions & 2 deletions src/include/nodes/execnodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "nodes/plannodes.h"
#include "nodes/tidbitmap.h"
#include "partitioning/partdefs.h"
#include "storage/bufmgr.h"
#include "storage/condition_variable.h"
#include "utils/hsearch.h"
#include "utils/queryenvironment.h"
Expand Down Expand Up @@ -1647,6 +1648,15 @@ typedef struct ParallelBitmapHeapState
char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
} ParallelBitmapHeapState;

typedef struct TBMIteratePrefetchResult
{
BlockNumber blockno; /* page number containing tuples */
int ntuples; /* -1 indicates lossy result */
bool recheck; /* should the tuples be rechecked? */
/* Note: recheck is always true if ntuples < 0 */
OffsetNumber offsets[MaxHeapTuplesPerPage];
} TBMIteratePrefetchResult;

/* ----------------
* BitmapHeapScanState information
*
Expand All @@ -1667,7 +1677,6 @@ typedef struct ParallelBitmapHeapState
* pscan_len size of the shared memory for parallel bitmap
* initialized is node is ready to iterate
* shared_tbmiterator shared iterator
* shared_prefetch_iterator shared iterator for prefetching
* pstate shared state for parallel bitmap scan
* ----------------
*/
Expand All @@ -1691,7 +1700,10 @@ typedef struct BitmapHeapScanState
Size pscan_len;
bool initialized;
TBMSharedIterator *shared_tbmiterator;
TBMSharedIterator *shared_prefetch_iterator;
/* parallel worker private ring buffer with prefetch requests: it allows to access prefetch result from the same worker */
TBMIteratePrefetchResult prefetch_requests[MAX_IO_CONCURRENCY];
int n_prefetch_requests; /* number of used elements in prefetch_requests ring buffer */
int prefetch_request_pos; /* head position in ring buffer */
ParallelBitmapHeapState *pstate;
} BitmapHeapScanState;

Expand Down
4 changes: 2 additions & 2 deletions src/include/storage/bufmgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ extern PGDLLIMPORT int NLocBuffer;
extern PGDLLIMPORT Block *LocalBufferBlockPointers;
extern PGDLLIMPORT int32 *LocalRefCount;

/* upper limit for effective_io_concurrency */
#define MAX_IO_CONCURRENCY 1000
/* upper limit for effective_io_concurrency (better to he power of 2) */
#define MAX_IO_CONCURRENCY 1024

/* special block number for ReadBuffer() */
#define P_NEW InvalidBlockNumber /* grow the file to get a new page */
Expand Down