From 4f939f430d4bfc95a3cfc4f0979001bc778bcd0c Mon Sep 17 00:00:00 2001 From: mkaruza Date: Fri, 27 Jan 2023 15:42:39 +0100 Subject: [PATCH] [columnar] Critical fixes in columnar storage * When parallel scan is executing we need to flush in-memory changes (stripe, row_mask) in beginning of custom scan node, because updates & deletes that happen cannot be done in parallel mode. * alter_table_set_access_method need to flush changes after renaming is done. * Difference of numbers in stripe id is not always 1. So assigning next stripe id for workes should work for this scenario. --- .../backend/columnar/columnar_customscan.c | 54 +++- .../src/backend/columnar/columnar_metadata.c | 10 +- .../src/backend/columnar/columnar_reader.c | 93 +++---- .../columnar/sql/columnar--11.1-4--11.1-5.sql | 2 + .../alter_table_set_access_method/11.1-5.sql | 231 ++++++++++++++++++ .../alter_table_set_access_method/latest.sql | 5 + .../backend/columnar/write_state_interface.c | 50 ++++ columnar/src/include/columnar/columnar.h | 9 +- .../expected/columnar_update_delete.out | 43 ++++ .../regress/sql/columnar_update_delete.sql | 28 +++ 10 files changed, 463 insertions(+), 62 deletions(-) create mode 100644 columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/11.1-5.sql diff --git a/columnar/src/backend/columnar/columnar_customscan.c b/columnar/src/backend/columnar/columnar_customscan.c index 7fcb8a15..cf46300c 100644 --- a/columnar/src/backend/columnar/columnar_customscan.c +++ b/columnar/src/backend/columnar/columnar_customscan.c @@ -21,6 +21,7 @@ #include "access/amapi.h" #include "access/skey.h" +#include "access/xact.h" #include "catalog/pg_am.h" #include "catalog/pg_statistic.h" #include "commands/defrem.h" @@ -77,6 +78,10 @@ typedef struct ColumnarScanState List *vectorizedQualList; List *constructedVectorizedQualList; } vectorization; + + /* Scan snapshot*/ + Snapshot snapshot; + bool snapshotRegisteredByUs; } ColumnarScanState; typedef bool (*PathPredicate)(Path *path); @@ -1820,6 +1825,23 @@ ColumnarScan_BeginCustomScan(CustomScanState *cscanstate, EState *estate, int ef columnarScanState->attrNeeded = ColumnarAttrNeeded(&cscanstate->ss, columnarScanState->vectorization.vectorizedQualList); + /* + * If we have pending changes that need to be flushed (row_mask after update/delete) + * or new stripe we need to to them here because sequential columnar scan + * can have parallel execution and updated are not allowed in parallel mode. + */ + columnarScanState->snapshot = estate->es_snapshot; + columnarScanState->snapshotRegisteredByUs = false; + Oid relationOid = cscanstate->ss.ss_currentRelation->rd_node.relNode; + + if(!IsInParallelMode()) + { + RowMaskFlushWriteStateForRelfilenode(relationOid, GetCurrentSubTransactionId()); + + FlushWriteStateWithNewSnapshot(relationOid, &columnarScanState->snapshot, + &columnarScanState->snapshotRegisteredByUs); + } + /* scan slot is already initialized */ } @@ -2207,7 +2229,7 @@ ColumnarScanNext(ColumnarScanState *columnarScanState) * executing a scan that was planned to be parallel. */ scandesc = columnar_beginscan_extended(node->ss.ss_currentRelation, - estate->es_snapshot, + columnarScanState->snapshot, 0, NULL, NULL, flags, columnarScanState->attrNeeded, columnarScanState->qual, @@ -2274,10 +2296,8 @@ ColumnarScan_ExecCustomScan(CustomScanState *node) static void ColumnarScan_EndCustomScan(CustomScanState *node) { - /* - * get information from node - */ TableScanDesc scanDesc = node->ss.ss_currentScanDesc; + ColumnarScanState *columnarScanState = (ColumnarScanState *) node; /* * Cleanup BMS of selected scan attributes @@ -2305,6 +2325,12 @@ ColumnarScan_EndCustomScan(CustomScanState *node) { table_endscan(scanDesc); } + + /* Unregister snapshot */ + if (columnarScanState->snapshotRegisteredByUs) + { + UnregisterSnapshot(columnarScanState->snapshot); + } } @@ -2381,7 +2407,15 @@ static Size Columnar_EstimateDSMCustomScan(CustomScanState *node, ParallelContext *pcxt) { - return sizeof(ParallelColumnarScanData); + Size nbytes; + + ColumnarScanState *columnarScanState = (ColumnarScanState *) node; + + nbytes = offsetof(ParallelColumnarScanData, snapshotData); + nbytes = add_size(nbytes, EstimateSnapshotSpace(columnarScanState->snapshot)); + nbytes = MAXALIGN(nbytes); + + return nbytes; } @@ -2393,6 +2427,15 @@ Columnar_InitializeDSMCustomScan(CustomScanState *node, ParallelColumnarScan pscan = (ParallelColumnarScan) coordinate; ColumnarScanState *columnarScanState = (ColumnarScanState *) node; + /* + * Serialize scan snapshot for workers so they see changes + * if we have flushed stripe / row_mask during this scan. + */ + SerializeSnapshot(columnarScanState->snapshot, pscan->snapshotData); + + /* Initialize parallel scan mutex */ + SpinLockInit(&pscan->mutex); + /* Stripe numbers are starting from index 1 */ pg_atomic_init_u64(&pscan->nextStripeId, 1); @@ -2429,6 +2472,7 @@ Columnar_InitializeWorkerCustomScan(CustomScanState *node, ColumnarScanState *columnarScanState = (ColumnarScanState *) node; ParallelColumnarScan pscan = (ParallelColumnarScan) coordinate; columnarScanState->parallelColumnarScan = pscan; + columnarScanState->snapshot = RestoreSnapshot(pscan->snapshotData); } diff --git a/columnar/src/backend/columnar/columnar_metadata.c b/columnar/src/backend/columnar/columnar_metadata.c index 111410f1..e7f72323 100644 --- a/columnar/src/backend/columnar/columnar_metadata.c +++ b/columnar/src/backend/columnar/columnar_metadata.c @@ -1040,7 +1040,8 @@ FindStripeWithMatchingFirstRowNumber(Relation relation, uint64 rowNumber, StripeMetadata * FindNextStripeForParallelWorker(Relation relation, Snapshot snapshot, - uint32 nextStripeId) + uint64 nextStripeId, + uint64 * nextHigherStripeId) { StripeMetadata *foundStripeMetadata = NULL; @@ -1066,8 +1067,15 @@ FindNextStripeForParallelWorker(Relation relation, if (HeapTupleIsValid(heapTuple)) { foundStripeMetadata = BuildStripeMetadata(columnarStripes, heapTuple); + if (foundStripeMetadata->id == nextStripeId) break; + + if (foundStripeMetadata->id > nextStripeId) + { + *nextHigherStripeId = foundStripeMetadata->id; + break; + } } else { diff --git a/columnar/src/backend/columnar/columnar_reader.c b/columnar/src/backend/columnar/columnar_reader.c index 309d44fb..53394ddc 100644 --- a/columnar/src/backend/columnar/columnar_reader.c +++ b/columnar/src/backend/columnar/columnar_reader.c @@ -236,19 +236,25 @@ ColumnarBeginRead(Relation relation, TupleDesc tupleDescriptor, if (!randomAccess) { /* - * Flush any pending in-memory changes to row_mask metadata - * for next scan. + * If have parallel columnar scan we don't flush states. */ - RowMaskFlushWriteStateForRelfilenode(readState->relation->rd_node.relNode, - GetCurrentSubTransactionId()); + if (readState->parallelColumnarScan == NULL) + { + /* + * Flush any pending in-memory changes to row_mask metadata + * for next scan. + */ + RowMaskFlushWriteStateForRelfilenode(readState->relation->rd_node.relNode, + GetCurrentSubTransactionId()); - /* - * When doing random access (i.e.: index scan), we don't need to flush - * pending writes until we need to read them. - * columnar_index_fetch_tuple would do so when needed. - */ - ColumnarReadFlushPendingWrites(readState); + /* + * When doing random access (i.e.: index scan), we don't need to flush + * pending writes until we need to read them. + * columnar_index_fetch_tuple would do so when needed. + */ + ColumnarReadFlushPendingWrites(readState); + } /* * AdvanceStripeRead sets currentStripeMetadata for the first stripe @@ -290,49 +296,9 @@ ColumnarReadFlushPendingWrites(ColumnarReadState *readState) Assert(!readState->snapshotRegisteredByUs); Oid relfilenode = readState->relation->rd_node.relNode; - FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId()); - - if (readState->snapshot == InvalidSnapshot || !IsMVCCSnapshot(readState->snapshot)) - { - return; - } - - /* - * If we flushed any pending writes, then we should guarantee that - * those writes are visible to us too. For this reason, if given - * snapshot is an MVCC snapshot, then we set its curcid to current - * command id. - * - * For simplicity, we do that even if we didn't flush any writes - * since we don't see any problem with that. - * - * XXX: We should either not update cid if we are executing a FETCH - * (from cursor) command, or we should have a better way to deal with - * pending writes, see the discussion in - * https://github.com/citusdata/citus/issues/5231. - */ - PushCopiedSnapshot(readState->snapshot); - /* now our snapshot is the active one */ - UpdateActiveSnapshotCommandId(); - Snapshot newSnapshot = GetActiveSnapshot(); - RegisterSnapshot(newSnapshot); - - /* - * To be able to use UpdateActiveSnapshotCommandId, we pushed the - * copied snapshot to the stack. However, we don't need to keep it - * there since we will anyway rely on ColumnarReadState->snapshot - * during read operation. - * - * Note that since we registered the snapshot already, we guarantee - * that PopActiveSnapshot won't free it. - */ - PopActiveSnapshot(); - - readState->snapshot = newSnapshot; - - /* not forget to unregister it when finishing read operation */ - readState->snapshotRegisteredByUs = true; + FlushWriteStateWithNewSnapshot(relfilenode, &readState->snapshot, + &readState->snapshotRegisteredByUs); } @@ -753,13 +719,30 @@ AdvanceStripeRead(ColumnarReadState *readState) readState->stripeReadState->chunkGroupsFiltered; } + SpinLockAcquire(&readState->parallelColumnarScan->mutex); + /* Fetch atomic next stripe id to be read by this scan. */ - uint32 nextStripeId = + uint64 nextStripeId = pg_atomic_fetch_add_u64(&readState->parallelColumnarScan->nextStripeId, 1); + uint64 nextHigherStripeId = nextStripeId; + readState->currentStripeMetadata = FindNextStripeForParallelWorker(readState->relation, - readState->snapshot, - nextStripeId); + readState->snapshot, + nextStripeId, + &nextHigherStripeId); + + /* + * There exists higher stripe id than this one so adjust and + * add +1 for next workers. + */ + if (nextHigherStripeId != nextStripeId) + { + pg_atomic_write_u64(&readState->parallelColumnarScan->nextStripeId, + nextHigherStripeId + 1); + } + + SpinLockRelease(&readState->parallelColumnarScan->mutex); } if (readState->currentStripeMetadata && diff --git a/columnar/src/backend/columnar/sql/columnar--11.1-4--11.1-5.sql b/columnar/src/backend/columnar/sql/columnar--11.1-4--11.1-5.sql index cf1d2d65..5e013f3b 100644 --- a/columnar/src/backend/columnar/sql/columnar--11.1-4--11.1-5.sql +++ b/columnar/src/backend/columnar/sql/columnar--11.1-4--11.1-5.sql @@ -1,5 +1,7 @@ -- columnar--11.1-4--11.1-5.sql +#include "udfs/alter_table_set_access_method/11.1-5.sql" + SET search_path TO columnar; CREATE SEQUENCE row_mask_seq START WITH 1 INCREMENT BY 1; diff --git a/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/11.1-5.sql b/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/11.1-5.sql new file mode 100644 index 00000000..dfa08f47 --- /dev/null +++ b/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/11.1-5.sql @@ -0,0 +1,231 @@ +DROP FUNCTION IF EXISTS columnar.alter_table_set_access_method; + +CREATE OR REPLACE FUNCTION columnar.alter_table_set_access_method(t TEXT, method TEXT) + RETURNS BOOLEAN LANGUAGE plpgsql +AS $func$ + +DECLARE + + tbl_exists BOOLEAN; + tbl_schema TEXT = 'public'; + tbl_name TEXT; + tbl_array TEXT[] = (parse_ident(t)); + tbl_oid INT; + tbl_am_oid INT; + temp_tbl_name TEXT; + + trigger_list_definition TEXT[]; + trigger TEXT; + + index_list_definition TEXT[]; + idx TEXT; + + constraint_list_name_and_definition TEXT[]; + constraint_name_and_definition TEXT; + constraint_name_and_definition_split TEXT[]; + +BEGIN + + CASE + WHEN CARDINALITY(tbl_array) = 1 THEN + SELECT tbl_array[1] INTO tbl_name; + WHEN CARDINALITY(tbl_array) = 2 THEN + SELECT tbl_array[1] INTO tbl_schema; + SELECT tbl_array[2] INTO tbl_name; + ELSE + RAISE WARNING 'Argument should provided as table or schema.table.'; + RETURN 0; + END CASE; + + -- Allow only convert to columnar / heap access method + + IF method NOT IN ('columnar', 'heap') THEN + RAISE WARNING 'Cannot convert table: Allowed access methods are heap and columnar.'; + RETURN 0; + END IF; + + -- Check if table exists + + SELECT EXISTS + (SELECT FROM pg_catalog.pg_tables WHERE schemaname = tbl_schema AND tablename = tbl_name) + INTO tbl_exists; + + IF tbl_exists = False THEN + RAISE WARNING 'Table %.% does not exist.', tbl_schema, tbl_name; + RETURN 0; + END IF; + + -- Get table OID + + EXECUTE FORMAT('SELECT %L::regclass::oid'::text, tbl_schema || '.' || tbl_name) INTO tbl_oid; + + -- Get table AM oid + + SELECT relam FROM pg_class WHERE oid = tbl_oid INTO tbl_am_oid; + + -- Check that table is heap or columnar + + IF (tbl_am_oid != (SELECT oid FROM pg_am WHERE amname = 'columnar')) AND + (tbl_am_oid != (SELECT oid FROM pg_am WHERE amname = 'heap')) THEN + RAISE WARNING 'Cannot convert table: table %.% is not heap or colummnar', tbl_schema, tbl_name; + RETURN 0; + END IF; + + -- Check that we can convert only from 'heap' to 'columnar' and vice versa + + IF tbl_am_oid = (SELECT oid FROM pg_am WHERE amname = method) THEN + RAISE WARNING 'Cannot convert table: conversion to same access method.'; + RETURN 0; + END IF; + + -- Check if table has FOREIGN KEY + + IF (SELECT COUNT(1) FROM pg_constraint WHERE contype = 'f' AND conrelid = tbl_oid) THEN + RAISE WARNING 'Cannot convert table: table %.% has a FOREIGN KEY constraint.', tbl_schema, tbl_name; + RETURN 0; + END IF; + + -- Check if table is REFERENCED by FOREIGN KEY + + IF (SELECT COUNT(1) FROM pg_constraint WHERE contype = 'f' AND confrelid = tbl_oid) THEN + RAISE WARNING 'Cannot convert table: table %.% is referenced by FOREIGN KEY.', tbl_schema, tbl_name; + RETURN 0; + END IF; + + -- Check if table has identity columns + + IF (SELECT COUNT(1) FROM pg_attribute WHERE attrelid = tbl_oid AND attidentity <> '') THEN + RAISE WARNING 'Cannot convert table: table %.% must not use GENERATED ... AS IDENTITY.', tbl_schema, tbl_name; + RETURN 0; + END IF; + + -- Collect triggers definitions + + SELECT ARRAY_AGG(pg_get_triggerdef(oid)) FROM pg_trigger + WHERE tgrelid = tbl_oid INTO trigger_list_definition; + + -- Collect constraint names and definitions (delimiter is `?`) + -- Search for constraints that depend on index AM which is supported by columnar AM + + SELECT ARRAY_AGG(pg_constraint.conname || '?' || pg_get_constraintdef(pg_constraint.oid)) + + FROM pg_constraint, pg_class + + WHERE + pg_constraint.conindid = pg_class.oid + AND + pg_constraint.conrelid = tbl_oid + AND + pg_class.relam IN (SELECT oid FROM pg_am WHERE amname IN ('btree', 'hash')) + + INTO constraint_list_name_and_definition; + + -- Collect index definitions which are not constraints + + SELECT ARRAY_AGG(indexdef) FROM pg_indexes + + WHERE + + schemaname = tbl_schema AND tablename = tbl_name + + AND + + indexname::regclass::oid IN + ( + SELECT indexrelid FROM pg_index + + WHERE + indexrelid IN + (SELECT indexname::regclass::oid FROM pg_indexes + WHERE schemaname = tbl_schema AND tablename = tbl_name) + + AND + + indexrelid NOT IN + (SELECT conindid FROM pg_constraint + WHERE pg_constraint.conrelid = tbl_oid) + ) + + INTO index_list_definition; + + -- Generate random name for new table + + SELECT 't_' || substr(md5(random()::text), 0, 25) INTO temp_tbl_name; + + -- Create new table + + EXECUTE FORMAT(' + CREATE TABLE %I (LIKE %I.%I + INCLUDING GENERATED + INCLUDING DEFAULTS + ) USING %s'::text, temp_tbl_name, tbl_schema, tbl_name, method); + + -- Insert all data from original table + + EXECUTE FORMAT('INSERT INTO %I SELECT * FROM %I.%I'::text, temp_tbl_name, tbl_schema, tbl_name); + + -- Drop original table + + EXECUTE FORMAT('DROP TABLE %I'::text, tbl_name); + + -- Rename new table to original name + + EXECUTE FORMAT('ALTER TABLE %I RENAME TO %I;'::text, temp_tbl_name, tbl_name); + + -- Since we inserted rows before they are not flushed so trigger flushing + + EXECUTE FORMAT('SELECT COUNT(1) FROM %I LIMIT 1;'::text, tbl_name); + + -- Set indexes + + IF CARDINALITY(index_list_definition) <> 0 THEN + FOREACH idx IN ARRAY index_list_definition + LOOP + BEGIN + EXECUTE idx; + EXCEPTION WHEN feature_not_supported THEN + RAISE WARNING 'Index `%` cannot be created.', idx; + END; + END LOOP; + END IF; + + -- Set constraints + + IF CARDINALITY(constraint_list_name_and_definition) <> 0 THEN + FOREACH constraint_name_and_definition IN ARRAY constraint_list_name_and_definition + LOOP + SELECT string_to_array(constraint_name_and_definition, '?') INTO constraint_name_and_definition_split; + BEGIN + EXECUTE 'ALTER TABLE ' || tbl_name || ' ADD CONSTRAINT ' + || constraint_name_and_definition_split[1] || ' ' + || constraint_name_and_definition_split[2]; + EXCEPTION WHEN feature_not_supported THEN + RAISE WARNING 'Constraint `%` cannot be added.', constraint_name_and_definition_split[2]; + END; + END LOOP; + END IF; + + -- Set triggers + + IF CARDINALITY(trigger_list_definition) <> 0 THEN + FOREACH trigger IN ARRAY trigger_list_definition + LOOP + BEGIN + EXECUTE trigger; + EXCEPTION WHEN feature_not_supported THEN + RAISE WARNING 'Trigger `%` cannot be applied.', trigger; + RAISE WARNING + 'Foreign keys and AFTER ROW triggers are not supported for columnar tables.' + ' Consider an AFTER STATEMENT trigger instead.'; + END; + END LOOP; + END IF; + + RETURN 1; + +END; + +$func$; + +COMMENT ON FUNCTION columnar.alter_table_set_access_method(t text, method text) + IS 'alters a table''s access method'; \ No newline at end of file diff --git a/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/latest.sql b/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/latest.sql index dfb8e20b..f84c9b66 100644 --- a/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/latest.sql +++ b/columnar/src/backend/columnar/sql/udfs/alter_table_set_access_method/latest.sql @@ -172,6 +172,11 @@ BEGIN EXECUTE FORMAT('ALTER TABLE %I RENAME TO %I;'::text, temp_tbl_name, tbl_name); + -- Since we inserted rows before they are not flushed so trigger flushing + -- by running columnar scan + + EXECUTE FORMAT('SELECT COUNT(1) FROM %I LIMIT 1;'::text, tbl_name); + -- Set indexes IF CARDINALITY(index_list_definition) <> 0 THEN diff --git a/columnar/src/backend/columnar/write_state_interface.c b/columnar/src/backend/columnar/write_state_interface.c index e110acd9..f77aafab 100644 --- a/columnar/src/backend/columnar/write_state_interface.c +++ b/columnar/src/backend/columnar/write_state_interface.c @@ -12,9 +12,59 @@ */ #include "postgres.h" +#include "access/xact.h" #include "columnar/columnar.h" +void +FlushWriteStateWithNewSnapshot(Oid relfilenode, + Snapshot * snapshot, + bool * snapshotRegisteredByUs) +{ + FlushWriteStateForRelfilenode(relfilenode, GetCurrentSubTransactionId()); + + if (*snapshot == InvalidSnapshot || !IsMVCCSnapshot(*snapshot)) + { + return; + } + + /* + * If we flushed any pending writes, then we should guarantee that + * those writes are visible to us too. For this reason, if given + * snapshot is an MVCC snapshot, then we set its curcid to current + * command id. + * + * For simplicity, we do that even if we didn't flush any writes + * since we don't see any problem with that. + * + * XXX: We should either not update cid if we are executing a FETCH + * (from cursor) command, or we should have a better way to deal with + * pending writes, see the discussion in + * https://github.com/citusdata/citus/issues/5231. + */ + PushCopiedSnapshot(*snapshot); + + /* now our snapshot is the active one */ + UpdateActiveSnapshotCommandId(); + Snapshot newSnapshot = GetActiveSnapshot(); + RegisterSnapshot(newSnapshot); + + /* + * To be able to use UpdateActiveSnapshotCommandId, we pushed the + * copied snapshot to the stack. However, we don't need to keep it + * there since we will anyway rely on ColumnarReadState->snapshot + * during read operation. + * + * Note that since we registered the snapshot already, we guarantee + * that PopActiveSnapshot won't free it. + */ + PopActiveSnapshot(); + + *snapshot = newSnapshot; + *snapshotRegisteredByUs = true; +} + + /* * Called when current subtransaction is committed. */ diff --git a/columnar/src/include/columnar/columnar.h b/columnar/src/include/columnar/columnar.h index 8a423513..50ad50d5 100644 --- a/columnar/src/include/columnar/columnar.h +++ b/columnar/src/include/columnar/columnar.h @@ -21,6 +21,7 @@ #include "storage/bufpage.h" #include "storage/lockdefs.h" #include "storage/relfilenode.h" +#include "storage/s_lock.h" #include "utils/relcache.h" #include "utils/snapmgr.h" @@ -203,7 +204,9 @@ typedef enum StripeWriteStateEnum /* Parallel Custom Scan shared data */ typedef struct ParallelColumnarScanData { + slock_t mutex; pg_atomic_uint64 nextStripeId; /* Fetch next stripe id to be read and increment */ + char snapshotData[FLEXIBLE_ARRAY_MEMBER]; } ParallelColumnarScanData; typedef struct ParallelColumnarScanData *ParallelColumnarScan; @@ -334,7 +337,8 @@ extern StripeMetadata * FindStripeWithMatchingFirstRowNumber(Relation relation, Snapshot snapshot); extern StripeMetadata * FindNextStripeForParallelWorker(Relation relation, Snapshot snapshot, - uint32 nextStripeId); + uint64 nextStripeId, + uint64 * nextHigherStripeId); extern StripeWriteStateEnum StripeWriteState(StripeMetadata *stripeMetadata); extern uint64 StripeGetHighestRowNumber(StripeMetadata *stripeMetadata); extern StripeMetadata * FindStripeWithHighestRowNumber(Relation relation, @@ -351,6 +355,9 @@ extern bytea * ReadChunkRowMask(RelFileNode relfilenode, Snapshot snapshot, extern Datum create_table_row_mask(PG_FUNCTION_ARGS); /* write_state_interface.c */ +extern void FlushWriteStateWithNewSnapshot(Oid relfilenode, + Snapshot * snapshot, + bool * snapShotRegisteredByUs); extern void FlushWriteStateForAllRels(SubTransactionId currentSubXid, SubTransactionId parentSubXid); extern void DiscardWriteStateForAllRels(SubTransactionId currentSubXid, diff --git a/columnar/src/test/regress/expected/columnar_update_delete.out b/columnar/src/test/regress/expected/columnar_update_delete.out index 58db9128..b1d32eae 100644 --- a/columnar/src/test/regress/expected/columnar_update_delete.out +++ b/columnar/src/test/regress/expected/columnar_update_delete.out @@ -80,3 +80,46 @@ SELECT * FROM parent; ALTER TABLE parent DETACH PARTITION p0; DROP TABLE p0; DROP TABLE parent; +-- delete / update in transaction +CREATE TABLE columnar_transaction_update(i INT, j INT) using columnar; +INSERT INTO columnar_transaction_update VALUES (1, 10); +INSERT INTO columnar_transaction_update VALUES (2, 20); +INSERT INTO columnar_transaction_update VALUES (3, 30); +INSERT INTO columnar_transaction_update SELECT g, g * 10 FROM generate_series(4, 100) g; +INSERT INTO columnar_transaction_update SELECT g, g * 10 FROM generate_series(101, 1000) g; +INSERT INTO columnar_transaction_update SELECT g, g * 10 FROM generate_series(1001, 20000) g; +SELECT COUNT(*) from columnar_transaction_update; + count +------- + 20000 +(1 row) + +START TRANSACTION; +DELETE FROM columnar_transaction_update where i % 2 = 0; +SELECT COUNT(*) from columnar_transaction_update; + count +------- + 10000 +(1 row) + +UPDATE columnar_transaction_update SET j = -1 WHERE i % 3 = 0; +SELECT COUNT(*) from columnar_transaction_update WHERE j = -1; + count +------- + 3333 +(1 row) + +COMMIT; +SELECT COUNT(*) from columnar_transaction_update; + count +------- + 10000 +(1 row) + +SELECT COUNT(*) from columnar_transaction_update WHERE j = -1; + count +------- + 3333 +(1 row) + +DROP TABLE columnar_transaction_update; diff --git a/columnar/src/test/regress/sql/columnar_update_delete.sql b/columnar/src/test/regress/sql/columnar_update_delete.sql index eea58685..13928eda 100644 --- a/columnar/src/test/regress/sql/columnar_update_delete.sql +++ b/columnar/src/test/regress/sql/columnar_update_delete.sql @@ -75,3 +75,31 @@ DROP TABLE p0; DROP TABLE parent; +-- delete / update in transaction + +CREATE TABLE columnar_transaction_update(i INT, j INT) using columnar; + +INSERT INTO columnar_transaction_update VALUES (1, 10); +INSERT INTO columnar_transaction_update VALUES (2, 20); +INSERT INTO columnar_transaction_update VALUES (3, 30); + +INSERT INTO columnar_transaction_update SELECT g, g * 10 FROM generate_series(4, 100) g; +INSERT INTO columnar_transaction_update SELECT g, g * 10 FROM generate_series(101, 1000) g; +INSERT INTO columnar_transaction_update SELECT g, g * 10 FROM generate_series(1001, 20000) g; + +SELECT COUNT(*) from columnar_transaction_update; + +START TRANSACTION; + +DELETE FROM columnar_transaction_update where i % 2 = 0; +SELECT COUNT(*) from columnar_transaction_update; + +UPDATE columnar_transaction_update SET j = -1 WHERE i % 3 = 0; +SELECT COUNT(*) from columnar_transaction_update WHERE j = -1; + +COMMIT; + +SELECT COUNT(*) from columnar_transaction_update; +SELECT COUNT(*) from columnar_transaction_update WHERE j = -1; + +DROP TABLE columnar_transaction_update;