diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml new file mode 100644 index 0000000..22f2200 --- /dev/null +++ b/.github/workflows/ci.yaml @@ -0,0 +1,21 @@ +name: CI + +on: [push, pull_request] + +jobs: + test: + + runs-on: ubuntu-latest + strategy: + matrix: + pg-version: ['12', '13', '14', '15', '16'] + + steps: + - uses: actions/checkout@v3 + - uses: cachix/install-nix-action@v18 + with: + nix_path: nixpkgs=channel:nixos-unstable + - name: Run tests + run: nix-shell --run "with-pg-${{ matrix.pg-version }} make installcheck" + - if: ${{ failure() }} + run: cat output/regression.diffs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8c5f64f --- /dev/null +++ b/.gitignore @@ -0,0 +1,7 @@ +*.zst +*.csv +*.o +.history +results/ +regression.diffs +regression.out diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..485425a --- /dev/null +++ b/LICENSE @@ -0,0 +1,20 @@ +Copyright (c) 2023 Steve Chavez + +Permission is hereby granted, free of charge, to any person obtaining +a copy of this software and associated documentation files (the +"Software"), to deal in the Software without restriction, including +without limitation the rights to use, copy, modify, merge, publish, +distribute, sublicense, and/or sell copies of the Software, and to +permit persons to whom the Software is furnished to do so, subject to +the following conditions: + +The above copyright notice and this permission notice shall be included +in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY +CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, +TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..9746d61 --- /dev/null +++ b/Makefile @@ -0,0 +1,27 @@ +EXTENSION = bzip +EXTVERSION = 0.1.0 + +all: sql/$(EXTENSION)--$(EXTVERSION).sql $(EXTENSION).control + +sql/$(EXTENSION)--$(EXTVERSION).sql: sql/$(EXTENSION).sql + cp $< $@ + +$(EXTENSION).control: + sed "s/@EXTVERSION@/$(EXTVERSION)/g" $(EXTENSION).control.in > $(EXTENSION).control + +DATA = $(wildcard sql/*--*.sql) + +MODULE_big = $(EXTENSION) +OBJS = src/pg_bzip.o + +TESTS = $(wildcard test/sql/*.sql) +REGRESS = $(patsubst test/sql/%.sql,%,$(TESTS)) +REGRESS_OPTS = --inputdir=test + +PG_CONFIG = pg_config +SHLIB_LINK = -lbz2 + +PG_CFLAGS = -std=c99 -Wno-declaration-after-statement -Wall -Werror -Wshadow + +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) diff --git a/README.md b/README.md new file mode 100644 index 0000000..f06705e --- /dev/null +++ b/README.md @@ -0,0 +1,87 @@ +# pg_bzip + +## Motivation + +If you obtain data compressed as bzip2, whether through HTTP (with [pgsql-http](https://github.com/pramsey/pgsql-http)) or from a file +(with [pgsql-fio](https://github.com/csimsek/pgsql-fio) or the native [pg_read_binary_file](https://pgpedia.info/p/pg_read_binary_file.html)), it's convenient to +decompress it in SQL directly. This extension is just for that, it provides functions to decompress and compress data using bzip2. + +## Functions + +- `bzcat(data bytea) returns bytea` + +This function mimics the [bzcat](https://linux.die.net/man/1/bzcat) command, which decompresses data using bzip2. + +```sql +select convert_from(bzcat(pg_read_binary_file('/path/to/all_movies.csv.bz2')), 'utf8') as contents; + + contents +-------------------------------------------------------------------------------------------------------------------------------------------- + "id","name","parent_id","date" + + "2","Ariel","8384","1988-10-21" + + "3","Varjoja paratiisissa","8384","1986-10-17" + + "4","État de siège",\N,"1972-12-30" + + "5","Four Rooms",\N,"1995-12-22" + + "6","Judgment Night",\N,"1993-10-15" + + "8","Megacities - Life in Loops",\N,"2006-01-01" + + "9","Sonntag, im August",\N,"2004-09-22" + + "11","Star Wars: Episode IV – A New Hope","10","1977-05-25" + + "12","Finding Nemo","112246","2003-05-30" + + ... + .... + ..... +``` + +- `bzip2(data bytea, compression_level int default 9) returns bytea` + +This function is a simplified version of the [bzip2](https://linux.die.net/man/1/bzip2) command. It compresses data using bzip2. + +For this example we'll use `fio_writefile` from [pgsql-fio](https://github.com/csimsek/pgsql-fio), which offers a convenient way to write a file from SQL. + +```sql +select fio_writefile('/home/stevechavez/Projects/pg_bzip/my_text.bz2', bzip2(repeat('my secret text to be compressed', 1000)::bytea)) as writesize; + + writesize +----------- + 109 +``` + +## Installation + +bzip2 is required. Under Debian/Ubuntu you can get it with + +``` +sudo apt install libbz2-dev +``` + +Then on this repo + +``` +make && make install +``` + +Now on SQL you can do: + +``` +CREATE EXTENSION bzip; +``` + +`pg_bzip` is tested to work on PostgreSQL >= 12. + +## Development + +[Nix](https://nixos.org/download.html) is used to get an isolated and reproducible enviroment with multiple postgres versions. + +``` +# enter the Nix environment +$ nix-shell + +# to run the tests +$ with-pg-16 make installcheck + +# to interact with the isolated pg +$ with-pg-16 psql + +# you can choose the pg version +$ with-pg-15 psql +``` diff --git a/bzip.control.in b/bzip.control.in new file mode 100644 index 0000000..775c4d5 --- /dev/null +++ b/bzip.control.in @@ -0,0 +1,2 @@ +default_version = '@EXTVERSION@' +relocatable = false diff --git a/nix/pgExtension.nix b/nix/pgExtension.nix new file mode 100644 index 0000000..1049a4b --- /dev/null +++ b/nix/pgExtension.nix @@ -0,0 +1,15 @@ +{ stdenv, postgresql, bzip2, extensionName }: + +stdenv.mkDerivation { + name = extensionName; + + buildInputs = [ postgresql bzip2 ]; + + src = ../.; + + installPhase = '' + install -D *.so -t $out/lib + install -D -t $out/share/postgresql/extension sql/*.sql + install -D -t $out/share/postgresql/extension ${extensionName}.control + ''; +} diff --git a/nix/pgScript.nix b/nix/pgScript.nix new file mode 100644 index 0000000..07159ab --- /dev/null +++ b/nix/pgScript.nix @@ -0,0 +1,29 @@ +{ postgresql, writeShellScriptBin, options ? "" } : + +let + ver = builtins.head (builtins.splitVersion postgresql.version); + script = '' + export PATH=${postgresql}/bin:"$PATH" + + tmpdir="$(mktemp -d)" + + export PGDATA="$tmpdir" + export PGHOST="$tmpdir" + export PGUSER=postgres + export PGDATABASE=postgres + + trap 'pg_ctl stop -m i && rm -rf "$tmpdir"' sigint sigterm exit + + PGTZ=UTC initdb --no-locale --encoding=UTF8 --nosync -U "$PGUSER" + + default_options="-F -c listen_addresses=\"\" -k $PGDATA" + + pg_ctl start -o "$default_options" -o "${options}" + + cp ${../test/samples/all_movies.csv} $tmpdir/all_movies.csv + cp ${../test/samples/all_movies.csv.bz2} $tmpdir/all_movies.csv.bz2 + + "$@" + ''; +in +writeShellScriptBin "with-pg-${ver}" script diff --git a/nix/pgsql-fio.nix b/nix/pgsql-fio.nix new file mode 100644 index 0000000..1339f26 --- /dev/null +++ b/nix/pgsql-fio.nix @@ -0,0 +1,20 @@ +{ stdenv, lib, fetchFromGitHub, postgresql }: + +stdenv.mkDerivation rec { + name = "pgsql-fio"; + + buildInputs = [ postgresql ]; + + src = fetchFromGitHub { + owner = "csimsek"; + repo = name; + rev = "9f6133c7ac4c50a14cf983943cb9916f994034bd"; + hash = "sha256-uoWoFfm8iM/FzBtIH5SF6TPRhDXDMVftueWjMYggiJY="; + }; + + installPhase = '' + install -D fio.so -t $out/lib + install -D fio--1.0.sql -t $out/share/postgresql/extension + install -D fio.control -t $out/share/postgresql/extension + ''; +} diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..7e34bf9 --- /dev/null +++ b/shell.nix @@ -0,0 +1,29 @@ +with import (builtins.fetchTarball { + name = "2023-09-16"; + url = "https://github.com/NixOS/nixpkgs/archive/ae5b96f3ab6aabb60809ab78c2c99f8dd51ee678.tar.gz"; + sha256 = "11fpdcj5xrmmngq0z8gsc3axambqzvyqkfk23jn3qkx9a5x56xxk"; +}) {}; +mkShell { + buildInputs = + let + extensionName = "bzip"; + supportedPgVersions = [ + postgresql_16 + postgresql_15 + postgresql_14 + postgresql_13 + postgresql_12 + ]; + pgWExtension = { postgresql }: + postgresql.withPackages (p: [ + (callPackage ./nix/pgExtension.nix { inherit postgresql extensionName; }) + (callPackage ./nix/pgsql-fio.nix { inherit postgresql; }) # only used for manual tests where writing to a file is required + ]); + extAll = map (x: callPackage ./nix/pgScript.nix { postgresql = pgWExtension { postgresql = x;}; }) supportedPgVersions; + in + extAll; + + shellHook = '' + export HISTFILE=.history + ''; +} diff --git a/sql/bzip.sql b/sql/bzip.sql new file mode 100644 index 0000000..ff3733f --- /dev/null +++ b/sql/bzip.sql @@ -0,0 +1,13 @@ +create or replace function bzcat(data bytea) +returns bytea +language 'c' +immutable +strict +as 'bzip'; + +create or replace function bzip2(data bytea, compression_level int default 9) +returns bytea +language 'c' +immutable +strict +as 'bzip'; diff --git a/src/pg_bzip.c b/src/pg_bzip.c new file mode 100644 index 0000000..064d2f3 --- /dev/null +++ b/src/pg_bzip.c @@ -0,0 +1,144 @@ +/*system*/ +#include +#include + +/*postgres*/ +#include +#include +#include +#include + +/* bzip2 https://sourceware.org/bzip2/manual/manual.html */ +#include + +#define BZ2_VERBOSITY 0 + +PG_MODULE_MAGIC; + +PG_FUNCTION_INFO_V1(bzcat); +PG_FUNCTION_INFO_V1(bzip2); + +/* + *custom memory allocators for bzip2 + */ +static void * +pg_bz2alloc(void *opaque, int n, int m) { + return palloc(n * m); +} + +static void +pg_bz2free(void *opaque, void *p) { + pfree(p); +} + +/* compress using bzip2 */ +Datum bzip2(PG_FUNCTION_ARGS){ + bytea *arg_data = PG_GETARG_BYTEA_P(0); + int32 compression_level = PG_GETARG_INT32(1); + + if (compression_level < 1 || compression_level > 9) + ereport(ERROR, + errmsg("compression level out of range: %d", compression_level), + errdetail("the compression level should be an int between 1 and 9 inclusive")); + + char buffer[BZ_MAX_UNUSED]; + int status; + + bz_stream stream = + { .next_in = VARDATA(arg_data) + , .avail_in = VARSIZE_ANY_EXHDR(arg_data) + , .next_out = buffer + , .avail_out = BZ_MAX_UNUSED + , .bzalloc = pg_bz2alloc + , .bzfree = pg_bz2free + , .opaque = NULL + }; + + status = BZ2_bzCompressInit(&stream, + compression_level, + BZ2_VERBOSITY, + 0); // according to the man pages (on --repetitive-fast --repetitive-best), the workFactor is unused, so we just leave it at 0 which will take the default. + + if ( status != BZ_OK ) { + ereport(ERROR, errmsg("bzip2 compression initialization failed")); + } + + StringInfoData si; + initStringInfo(&si); + + do { + // when avail_in is 0, the compression will be finished but the whole data won't necessarily be transferred because of the buffer size. + // changing the mode to BZ_FINISH transfers the remaining data to the buffer + status = BZ2_bzCompress(&stream, (stream.avail_in) ? BZ_RUN : BZ_FINISH); + + // avail_out indicates how much of the buffer is empty, so we do a substraction to append the buffer part that is filled + // only during the last iteration avail_out is actually non-zero + appendBinaryStringInfo(&si, (char*)buffer, BZ_MAX_UNUSED - stream.avail_out); + + // avail_out and next_out will be reset (0) after one iteration so we set them again + stream.avail_out = BZ_MAX_UNUSED; + stream.next_out = buffer; + } while (status == BZ_RUN_OK || status == BZ_FINISH_OK); + + if ( status != BZ_STREAM_END) { + BZ2_bzCompressEnd(&stream); + ereport(ERROR, errmsg("bzip2 compression failed")); + } + + BZ2_bzCompressEnd(&stream); + + bytea *result = palloc(si.len + VARHDRSZ); + memcpy(VARDATA(result), si.data, si.len); + SET_VARSIZE(result, si.len + VARHDRSZ); + PG_FREE_IF_COPY(arg_data, 0); + PG_RETURN_POINTER(result); +} + +/* decompress bzip2 */ +Datum bzcat(PG_FUNCTION_ARGS){ + bytea *arg_data = PG_GETARG_BYTEA_P(0); + + char buffer[BZ_MAX_UNUSED]; + int status; + + bz_stream stream = + { .next_in = VARDATA(arg_data) + , .avail_in = VARSIZE_ANY_EXHDR(arg_data) + , .next_out = buffer + , .avail_out = BZ_MAX_UNUSED + , .bzalloc = pg_bz2alloc + , .bzfree = pg_bz2free + , .opaque = NULL + }; + + status = BZ2_bzDecompressInit(&stream, BZ2_VERBOSITY, 0); + + if ( status != BZ_OK ) { + ereport(ERROR, errmsg("bzip2 decompression initialization failed")); + } + + StringInfoData si; + initStringInfo(&si); + + do { + status = BZ2_bzDecompress(&stream); + + appendBinaryStringInfo(&si, (char*)buffer, BZ_MAX_UNUSED - stream.avail_out); + + stream.avail_out = BZ_MAX_UNUSED; + stream.next_out = buffer; + } while (status == BZ_OK); + + if ( status != BZ_STREAM_END ){ + BZ2_bzDecompressEnd(&stream); + ereport(ERROR, errmsg("bzip2 decompression failed")); + } + + BZ2_bzDecompressEnd(&stream); + + bytea *result = palloc(si.len + VARHDRSZ); + memcpy(VARDATA(result), si.data, si.len); + SET_VARSIZE(result, si.len + VARHDRSZ); + PG_FREE_IF_COPY(arg_data, 0); + PG_RETURN_POINTER(result); +} diff --git a/test/expected/basic.out b/test/expected/basic.out new file mode 100644 index 0000000..abf05fc --- /dev/null +++ b/test/expected/basic.out @@ -0,0 +1,17 @@ +create extension if not exists bzip; +SELECT encode(bzcat(bzip2('this will be compresssed and then uncompressed')), 'escape') as result; + result +------------------------------------------------ + this will be compresssed and then uncompressed +(1 row) + +with repeated_str as ( + select repeat('some text', 100000) as val +) +select convert_from(bzcat(bzip2(val::bytea)), 'utf8') = val as succesful_compression_decompression +from repeated_str; + succesful_compression_decompression +------------------------------------- + t +(1 row) + diff --git a/test/expected/samples.out b/test/expected/samples.out new file mode 100644 index 0000000..35d5edc --- /dev/null +++ b/test/expected/samples.out @@ -0,0 +1,14 @@ +create extension if not exists bzip; +NOTICE: extension "bzip" already exists, skipping +select pg_read_file('./all_movies.csv') = convert_from(bzcat(pg_read_binary_file('./all_movies.csv.bz2')), 'utf8') as successful_decompression; + successful_decompression +-------------------------- + t +(1 row) + +select pg_read_binary_file('./all_movies.csv.bz2') = bzip2(pg_read_binary_file('./all_movies.csv')) as successful_compression; + successful_compression +------------------------ + t +(1 row) + diff --git a/test/samples/all_movies.csv.bz2 b/test/samples/all_movies.csv.bz2 new file mode 100644 index 0000000..102b426 Binary files /dev/null and b/test/samples/all_movies.csv.bz2 differ diff --git a/test/sql/basic.sql b/test/sql/basic.sql new file mode 100644 index 0000000..fff7eab --- /dev/null +++ b/test/sql/basic.sql @@ -0,0 +1,9 @@ +create extension if not exists bzip; + +SELECT encode(bzcat(bzip2('this will be compresssed and then uncompressed')), 'escape') as result; + +with repeated_str as ( + select repeat('some text', 100000) as val +) +select convert_from(bzcat(bzip2(val::bytea)), 'utf8') = val as succesful_compression_decompression +from repeated_str; diff --git a/test/sql/samples.sql b/test/sql/samples.sql new file mode 100644 index 0000000..eda2827 --- /dev/null +++ b/test/sql/samples.sql @@ -0,0 +1,5 @@ +create extension if not exists bzip; + +select pg_read_file('./all_movies.csv') = convert_from(bzcat(pg_read_binary_file('./all_movies.csv.bz2')), 'utf8') as successful_decompression; + +select pg_read_binary_file('./all_movies.csv.bz2') = bzip2(pg_read_binary_file('./all_movies.csv')) as successful_compression;