Skip to content
This repository was archived by the owner on Nov 20, 2020. It is now read-only.

LZ4 compression support #219

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

[Dd]ebug/
[Rr]elease/
x64/
build/
[Bb]in/
[Oo]bj/
Expand Down
13 changes: 7 additions & 6 deletions .paket/Paket.Restore.targets
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@

<!-- Because ReadAllText is slow on osx/linux, try to find shasum and awk -->
<PropertyGroup>
<PaketRestoreCachedHasher Condition="'$(OS)' != 'Windows_NT' And '$(PaketRestoreCachedHasher)' == '' And Exists('/usr/bin/shasum') And Exists('/usr/bin/awk')">/usr/bin/shasum $(PaketRestoreCacheFile) | /usr/bin/awk '{ print $1 }'</PaketRestoreCachedHasher>
<PaketRestoreLockFileHasher Condition="'$(OS)' != 'Windows_NT' And '$(PaketRestoreLockFileHash)' == '' And Exists('/usr/bin/shasum') And Exists('/usr/bin/awk')">/usr/bin/shasum $(PaketLockFilePath) | /usr/bin/awk '{ print $1 }'</PaketRestoreLockFileHasher>
<PaketRestoreCachedHasher Condition="'$(OS)' != 'Windows_NT' And '$(PaketRestoreCachedHasher)' == '' And Exists('/usr/bin/shasum') And Exists('/usr/bin/awk')">/usr/bin/shasum $(PaketRestoreCacheFile) | /usr/bin/awk '{ print $1 }'</PaketRestoreCachedHasher>
<PaketRestoreLockFileHasher Condition="'$(OS)' != 'Windows_NT' And '$(PaketRestoreLockFileHash)' == '' And Exists('/usr/bin/shasum') And Exists('/usr/bin/awk')">/usr/bin/shasum $(PaketLockFilePath) | /usr/bin/awk '{ print $1 }'</PaketRestoreLockFileHasher>
</PropertyGroup>

<!-- If shasum and awk exist get the hashes -->
<Exec StandardOutputImportance="Low" Condition=" '$(PaketRestoreCachedHasher)' != '' " Command="$(PaketRestoreCachedHasher)" ConsoleToMSBuild='true'>
<Output TaskParameter="ConsoleOutput" PropertyName="PaketRestoreCachedHash" />
<Output TaskParameter="ConsoleOutput" PropertyName="PaketRestoreCachedHash" />
</Exec>
<Exec StandardOutputImportance="Low" Condition=" '$(PaketRestoreLockFileHasher)' != '' " Command="$(PaketRestoreLockFileHasher)" ConsoleToMSBuild='true'>
<Output TaskParameter="ConsoleOutput" PropertyName="PaketRestoreLockFileHash" />
<Output TaskParameter="ConsoleOutput" PropertyName="PaketRestoreLockFileHash" />
</Exec>

<PropertyGroup Condition="Exists('$(PaketRestoreCacheFile)') ">
Expand Down Expand Up @@ -127,6 +127,7 @@
<PackageReference Include="%(PaketReferencesFileLinesInfo.PackageName)">
<Version>%(PaketReferencesFileLinesInfo.PackageVersion)</Version>
<PrivateAssets Condition="%(PaketReferencesFileLinesInfo.AllPrivateAssets) == 'true'">All</PrivateAssets>
<ExcludeAssets Condition="%(PaketReferencesFileLinesInfo.AllPrivateAssets) == 'exclude'">runtime</ExcludeAssets>
</PackageReference>
</ItemGroup>

Expand Down Expand Up @@ -183,8 +184,8 @@

<ConvertToAbsolutePath Condition="@(_NuspecFiles) != ''" Paths="@(_NuspecFiles)">
<Output TaskParameter="AbsolutePaths" PropertyName="NuspecFileAbsolutePath" />
</ConvertToAbsolutePath>
</ConvertToAbsolutePath>


<!-- Call Pack -->
<PackTask Condition="$(UseNewPack)"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Please also join the [F# Open Source Group](http://fsharp.github.com)
| ----------|----------|
| GZip | Complete |
| Snappy | Complete |
| LZ4 | https://github.com/jet/kafunk/issues/125 |
| LZ4 | Complete |
| TLS | https://github.com/jet/kafunk/issues/66 |
| SASL | https://github.com/jet/kafunk/issues/139 |
| ACL | https://github.com/jet/kafunk/issues/140 |
Expand Down
3 changes: 3 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### 0.1.17-alpha1 - 12.4.2018
* LZ4 compression support is implemented

### 0.1.16 - 27.3.2018
* Tested 0.1.16-alpha01

Expand Down
8 changes: 4 additions & 4 deletions src/kafunk/AssemblyInfo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ open System.Reflection
[<assembly: AssemblyTitleAttribute("kafunk")>]
[<assembly: AssemblyProductAttribute("kafunk")>]
[<assembly: AssemblyDescriptionAttribute("F# client for Kafka")>]
[<assembly: AssemblyVersionAttribute("0.1.16")>]
[<assembly: AssemblyFileVersionAttribute("0.1.16")>]
[<assembly: AssemblyVersionAttribute("0.1.17")>]
[<assembly: AssemblyFileVersionAttribute("0.1.17")>]
do ()

module internal AssemblyVersionInformation =
let [<Literal>] AssemblyTitle = "kafunk"
let [<Literal>] AssemblyProduct = "kafunk"
let [<Literal>] AssemblyDescription = "F# client for Kafka"
let [<Literal>] AssemblyVersion = "0.1.16"
let [<Literal>] AssemblyFileVersion = "0.1.16"
let [<Literal>] AssemblyVersion = "0.1.17"
let [<Literal>] AssemblyFileVersion = "0.1.17"
5 changes: 5 additions & 0 deletions src/kafunk/AssemblyInfoVisibility.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
namespace System
open System.Runtime.CompilerServices

[<assembly: InternalsVisibleTo("kafunk.Tests")>]
do ()
56 changes: 12 additions & 44 deletions src/kafunk/Compression.fs
Original file line number Diff line number Diff line change
Expand Up @@ -134,47 +134,15 @@ module Snappy =

#endif

// TODO: implement lz4
//[<Compile(Module)>]
//module LZ4 =

// open LZ4

// //let compress ver ms =
// // Stream.compress
// // CompressionCodec.LZ4
// // (fun memStream -> upcast new LZ4Stream(memStream, LZ4StreamMode.Compress, LZ4StreamFlags.IsolateInnerStream))
// // ver
// // ms

// //let decompress ver m =
// // Stream.decompress
// // (fun memStream -> upcast new LZ4Stream(memStream, LZ4StreamMode.Decompress, LZ4StreamFlags.IsolateInnerStream))
// // ver
// // m

// let compress (value:Binary.Segment) =
// let maxLen = LZ4Codec.MaximumOutputLength value.Count
// let outBuf = Binary.zeros maxLen
// let written = LZ4Codec.Encode(value.Array, value.Offset, value.Count, outBuf.Array, outBuf.Offset, outBuf.Count)
// if written <= 0 then failwith "compression failed" else
// ArraySegment(outBuf.Array, outBuf.Offset, written)

// let decompress (value:Binary.Segment) =
// let guessedOutputLength = value.Count * 10
// //let buf = Binary.zeros outputLength
// //let decoded = LZ4Codec.Decode(m.value.Array, m.value.Offset, m.value.Count, buf.Array, buf.Offset, buf.Count, false)
// //let buf = ArraySegment(buf.Array, buf.Count, decoded)
// let buf = LZ4Codec.Decode(value.Array, value.Offset, value.Count, guessedOutputLength)
// Binary.ofArray buf

//let compress (messageVer:ApiVersion) (ms:MessageSet) =
// let buf = MessageSet.Size (messageVer,ms) |> Binary.zeros
// MessageSet.Write (messageVer,ms,BinaryZipper(buf))
// let compressed = LZ4.LZ4Codec.Wrap (buf.Array, buf.Offset, buf.Count)
// createMessage (Binary.ofArray compressed) CompressionCodec.LZ4

//let decompress (messageVer:ApiVersion) (m:Message) =
// let decompressed = LZ4.LZ4Codec.Unwrap(m.value.Array, m.value.Offset)
// let buf = Binary.ofArray decompressed
// MessageSet.Read (messageVer, 0, 0s, buf.Count, true, BinaryZipper(buf))
[<Compile(Module)>]
module LZ4 =
open Kafunk.Native

let compress (value: ArraySegment<byte>) : ArraySegment<byte> =
// TODO: consider preallocated buffer for compression
let compressedBound = Lz4Framing.compressFrameBound value.Count
let compressedBuffer = Array.zeroCreate compressedBound
Lz4Framing.compressFrame value compressedBuffer

let decompress (value: ArraySegment<byte>) : ArraySegment<byte> =
Lz4Framing.decompress value |> ArraySegment
44 changes: 44 additions & 0 deletions src/kafunk/Native/Loader.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/// Windlows pre-load dll from x86/x64 folder depending on Environment.Is64BitProcess
module internal Kafunk.Native.Loader

open System
open System.Runtime.InteropServices
open System.IO

[<DllImport("Kernel32.dll")>]
extern IntPtr private LoadLibrary(string _path)

//
// Unix
//
let RTLD_NOW = 2

[<DllImport("libdl")>]
extern IntPtr private dlopen(string _fileName, int _flags)

/// Load assembly relative to executing assembly's CodeBase.
/// This function will not work for multi-assembly configuration, but is ok for kafunk for now.
/// More elaborative loading strategies can be found here:
/// https://github.com/mellinoe/nativelibraryloader
let private resolveLibPath name =
System.Reflection.Assembly.GetExecutingAssembly().CodeBase
|> fun path -> (new Uri(path)).LocalPath
|> Path.GetDirectoryName
|> fun path -> Path.Combine(path, name)

let private loadWin name =
let path = resolveLibPath name
let ptr = LoadLibrary path

if ptr = IntPtr.Zero then
failwithf "Failed to load native dll '%s'" name

let load name = lazy(
match (Environment.Is64BitProcess, Environment.OSVersion.Platform) with
| (true, PlatformID.Win32NT) -> loadWin (sprintf "x64\\%s.dll" name)
| (false, PlatformID.Win32NT) -> loadWin (sprintf "x86\\%s.dll" name)
| _ -> ()
)



172 changes: 172 additions & 0 deletions src/kafunk/Native/lz4.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
module internal Kafunk.Native.Lz4Framing
/// For C API details, see:
/// https://github.com/lz4/lz4/blob/dev/lib/lz4frame.h
module private native =
open System
open System.Runtime.InteropServices

let LZ4F_VERSION = nativeint 100

type LZ4F_errorCode_t = uint64

// Shortcut these enums to int because we do not use them at this time
type LZ4F_blockSizeID_t = int
type LZ4F_blockMode_t = int
type LZ4F_contentChecksum_t = int
type LZ4F_frameType_t = int
type LZ4F_blockChecksum_t = int

[<StructLayout(LayoutKind.Sequential)>]
type LZ4F_frameInfo_t =
struct
val blockSizeID: LZ4F_blockSizeID_t
val blockMode: LZ4F_blockMode_t
val contentChecksumFlag: LZ4F_contentChecksum_t
val frameType: LZ4F_frameType_t
val mutable contentSize: uint64
val dictID: uint32
val blockChecksumFlag: LZ4F_blockChecksum_t
end

[<StructLayout(LayoutKind.Sequential)>]
type LZ4F_preferences_t =
struct
val mutable frameInfo: LZ4F_frameInfo_t
val compressionLevel: int32
val autoFlush: uint32
val reserved1: uint32
val reserved2: uint32
val reserved3: uint32
val reserved4: uint32
end

[<StructLayout(LayoutKind.Sequential)>]
type LZ4F_decompressOptions_t =
struct
val mutable stableDst: uint32
val reserved1: uint32
val reserved2: uint32
val reserved3: uint32
end


[<DllImport("liblz4", CallingConvention=CallingConvention.Cdecl)>]
extern nativeint LZ4F_compressFrameBound(nativeint _srcSize, IntPtr _preferencesPtr);

[<DllImport("liblz4", CallingConvention=CallingConvention.Cdecl)>]
extern nativeint LZ4F_compressFrame(nativeint _dstBuffer, nativeint _dstCapacity,
nativeint _srcBuffer, nativeint _srcSize,
LZ4F_preferences_t& _preferences);

[<DllImport("liblz4", CallingConvention=CallingConvention.Cdecl)>]
extern uint32 LZ4F_isError(nativeint _code);

[<DllImport("liblz4", CallingConvention=CallingConvention.Cdecl)>]
extern nativeint LZ4F_getErrorName(nativeint _code);

[<DllImport("liblz4", CallingConvention=CallingConvention.Cdecl)>]
extern nativeint LZ4F_createDecompressionContext(nativeint& _dctxPtr, nativeint _version);

[<DllImport("liblz4", CallingConvention=CallingConvention.Cdecl)>]
extern nativeint LZ4F_freeDecompressionContext(nativeint _dctx);

[<DllImport("liblz4", CallingConvention=CallingConvention.Cdecl)>]
extern nativeint LZ4F_decompress(nativeint _dctx, nativeint _dstBuffer, nativeint& _dstSize,
nativeint _srcBuffer, nativeint& _srcSizePtr, nativeint _optionsPtr);

[<DllImport("liblz4", CallingConvention=CallingConvention.Cdecl)>]
extern nativeint LZ4F_getFrameInfo(nativeint _dctx, LZ4F_frameInfo_t& _frameInfoPtr, nativeint _srcBuffer, nativeint& _srcSizePtr);

open System
open FSharp.NativeInterop
open native
#nowarn "9"

let private ensureNativeIsLoaded = Loader.load "liblz4"

//
// liblz4 error reporting
//
let private isError code =
LZ4F_isError(code) <> 0u

let private getErrorName code =
let stringAddr = LZ4F_getErrorName(code)
System.Runtime.InteropServices.Marshal.PtrToStringAnsi(stringAddr)


let failIfError funcName code =
if isError code then
let error = getErrorName code
failwithf "LZ4 native call '%s' failed: '%s'" funcName error
else
code

//
// Public API
//

let compressFrameBound (srcSize: int) : int =
ensureNativeIsLoaded.Value
LZ4F_compressFrameBound((nativeint srcSize), IntPtr.Zero)
|> int

let compressFrame (src: ArraySegment<byte>) (dst: byte[]) =
ensureNativeIsLoaded.Value

let mutable compressParams = LZ4F_preferences_t()
compressParams.frameInfo.contentSize <- (uint64 src.Count)

use srcPtr = fixed src.Array
use dstPtr = fixed dst

let res =
LZ4F_compressFrame(
dstPtr |> NativePtr.toNativeInt,
(nativeint dst.Length),
NativePtr.add srcPtr src.Offset |> NativePtr.toNativeInt, (nativeint src.Count),
&compressParams)
|> failIfError "LZ4F_compressFrame"
|> int

new ArraySegment<byte>(dst, 0, res)

let decompress (src: ArraySegment<byte>): byte[] =
ensureNativeIsLoaded.Value

let mutable ctx = IntPtr.Zero
do LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION) |> failIfError "LZ4F_createDecompressionContext" |> ignore
try
// read frame info to get uncompressed size
let mutable frameInfo = LZ4F_frameInfo_t()
let mutable srcSize = nativeint src.Count
use srcPtr = fixed src.Array
let srcAddr = NativePtr.add srcPtr src.Offset |> NativePtr.toNativeInt

do LZ4F_getFrameInfo(ctx, &frameInfo, srcAddr, &srcSize) |> failIfError "LZ4F_getFrameInfo" |> ignore

let decompressedSize = frameInfo.contentSize
if decompressedSize = 0UL then
Array.empty
else
// LZ4F_getFrameInfo have updated srcSize to consumed bytes
let dataAddr = srcAddr + srcSize
srcSize <- (nativeint src.Count) - srcSize

let decompressed = Array.zeroCreate<byte> (int decompressedSize)
use decompressedPtr = fixed decompressed
let decompressedAddr = decompressedPtr |> NativePtr.toNativeInt
let mutable dstSize = nativeint decompressed.Length

let before = sprintf "dstSize: %d; srcSize: %d" dstSize srcSize
let res = LZ4F_decompress(ctx, decompressedAddr, &dstSize, dataAddr, &srcSize, IntPtr.Zero) |> failIfError "LZ4F_decompress"
let after = sprintf "dstSize: %d; srcSize: %d" dstSize srcSize
if res <> nativeint 0 then
failwithf "Expected LZ4F_decompress to return 0 but got %d. Buffer too small?\n %s\n %s" res before after
else
decompressed
finally
// protect context from leaking
do LZ4F_freeDecompressionContext(ctx) |> failIfError "LZ4F_freeDecompressionContext" |> ignore


4 changes: 2 additions & 2 deletions src/kafunk/Protocol.fs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ module Protocol =
match compression with
| None -> value
| GZIP -> Compression.GZip.compress value
//| LZ4 -> Compression.LZ4.compress value
| LZ4 -> Compression.LZ4.compress value
#if !NETSTANDARD2_0
| Snappy -> Compression.Snappy.compress value
#endif
Expand All @@ -92,7 +92,7 @@ module Protocol =
match compression with
| None -> value
| GZIP -> Compression.GZip.decompress value
//| LZ4 -> Compression.LZ4.decompress value
| LZ4 -> Compression.LZ4.decompress value
#if !NETSTANDARD2_0
| Snappy -> Compression.Snappy.decompress value
#endif
Expand Down
Loading