Skip to content

Async Stream to compress/uncompress gzip, bzip, zstd, parquet, orc

Notifications You must be signed in to change notification settings

zebrabug/async-stream

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

21 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

async-stream

Simple library to compress/uncompress Async streams using file iterator and readers.

It supports the following compression format:

  • gzip
  • bzip2
  • snappy
  • zstandard
  • parquet (experimental)
  • orc (experimental)

Getting started

Install the library as follows:

pip install asyncstream

Compress a regular file to gzip (examples/simple_compress_gzip.py):

import asyncstream
import asyncio


async def run():
    async with asyncstream.open('samples/animals.txt', 'rb') as fd:
        async with asyncstream.open('samples/animals.txt.gz',  compression='gzip') as gzfd:
            async for line in fd:
                await gzfd.write(line)

if __name__ == '__main__':
    asyncio.run(run())

or you can also open from an async file descriptor using aiofiles (examples/simple_compress_gzip_with_aiofiles.py):

pip install aiofiles

And then run the following code:

import aiofiles
import asyncstream
import asyncio


async def run():
    async with aiofiles.open('examples/animals.txt', 'rb') as fd:
        async with aiofiles.open('/tmp/animals.txt.gz', 'wb') as wfd:
            async with asyncstream.open(wfd, 'wb', compression='gzip') as gzfd:
                async for line in fd:
                    await gzfd.write(line)


if __name__ == '__main__':
    asyncio.run(run())

You can also uncompress an S3 file on the fly using aiobotocore:

pip install aiobotocore

And then run the following code (examples/simple_uncompress_bzip2_from_s3.py):

import aiobotocore
import asyncstream
import asyncio

async def run():
    session = aiobotocore.get_session()
    async with session.create_client('s3') as s3:
        obj = await s3.get_object(Bucket='test-bucket', Key='path/to/file.gz')
        async with asyncstream.open(obj['Body'], 'rt', compression='bzip2') as fd:
            async for line in fd:
                print(line)
    

if __name__ == '__main__':
    asyncio.run(run())

Convert a gzip file to a snappy file

import asyncstream
import asyncio

async def run():
    async with asyncstream.open('samples/animals.txt.gz', 'rb', compression='gzip') as inc_fd:
        async with asyncstream.open('samples/animals.txt.snappy', 'wb', compression='snappy') as outc_fd:
            async for line in inc_fd:
                await outc_fd.write(line)


if __name__ == '__main__':
    asyncio.run(run())

Use an async reader and writer to filter and update data on the fly

import asyncstream
import asyncio

async def run():
   async with asyncstream.open('/tmp/animals.txt.bz2', 'rb') as in_fd:
       async with asyncstream.open('/tmp/animals.txt.snappy', 'wb') as out_fd:
           async with asyncstream.reader(in_fd) as reader:
               async with asyncstream.writer(out_fd) as writer:
                   async for name, color, age in reader:
                       if color != 'white':
                           await writer.writerow([name, color, age * 2])

asyncio.run(run())

Simple parquet encoding

You will need to install pyarrow and pandas:

pip install pyarrow pandas

To compress using snappy, you can install snappy:

pip install snappy

The code below converts a csv file and convert it to parquet

import asyncstream
import asyncio

async def run():
    async with asyncstream.open('examples/animals.txt', 'rb') as fd:
        async with asyncstream.open('output.parquet', 'wb', encoding='parquet', compression='snappy') as wfd:
            async with asyncstream.writer(wfd) as writer:
                async for line in fd:
                    await writer.write(line)

asyncio.run(run())

Simple parquet decoding

import asyncstream
import asyncio

async def run():
    async with asyncstream.open('output.parquet', 'rb', encoding='parquet') as fd:
            async with asyncstream.reader(fd) as reader:
                async for line in reader:
                    print(line)

asyncio.run(run())

Simple orc encoding

import asyncstream
import asyncio

async def run():
    async with asyncstream.open('examples/animals.txt', 'rb') as fd:
        async with asyncstream.open('output.orc.snappy', 'wb', encoding='orc', compression='snappy') as wfd:
            async with asyncstream.writer(wfd) as writer:
                async for line in fd:
                    await writer.write(line)

asyncio.run(run())

Simple orc decoding

import asyncstream
import asyncio

async def run():
    async with asyncstream.open('output.orc.snappy', 'rb', encoding='orc') as fd:
            async with asyncstream.reader(fd) as reader:
                async for line in reader:
                    print(line)

asyncio.run(run())

Other compression scheme are supported: zlib, brotli.

Compression supported

Compression Status
gzip / zlib
bzip2
snappy
zstd

Parquet

Compression Status
none
brotli
bzip2
gzip
snappy
zstd
zlib

Orc

Compression Status
none
bzip2
gzip / zlib
snappy
zlib
zstd

About

Async Stream to compress/uncompress gzip, bzip, zstd, parquet, orc

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • Python 100.0%