Skip to content

Commit

Permalink
Merge pull request #31 from haskell-works/use-avro-0.5
Browse files Browse the repository at this point in the history
Use avro 0.5
  • Loading branch information
AlexeyRaga authored Mar 25, 2020
2 parents 6b1d0b7 + 6a1930d commit 901739c
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 119 deletions.
16 changes: 8 additions & 8 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
@@ -1,19 +1,13 @@
version: 2.1

orbs:
haskell: haskell-works/haskell-build@2.0.2
haskell: haskell-works/haskell-build@4.1.6
github: haskell-works/github-release@1.2.1
hackage: haskell-works/hackage@1.0.0

workflows:
multiple-ghc-build:
jobs:
- haskell/build-with-binary-cache:
name: GHC 8.2.2
executor: haskell/ghc-8_2_2
context: haskell-ci
binary-cache-uri: ${BINARY_CACHE_URI-"http://hw-binary-cache-us-west-2-a.s3-website-us-west-2.amazonaws.com/archive"}

- haskell/build-with-binary-cache:
name: GHC 8.4.4
executor: haskell/ghc-8_4_4
Expand All @@ -26,13 +20,19 @@ workflows:
context: haskell-ci
binary-cache-uri: ${BINARY_CACHE_URI-"http://hw-binary-cache-us-west-2-a.s3-website-us-west-2.amazonaws.com/archive"}

- haskell/build-with-binary-cache:
name: GHC 8.8.3
executor: haskell/ghc-8_8_3
context: haskell-ci
binary-cache-uri: ${BINARY_CACHE_URI-"http://hw-binary-cache-us-west-2-a.s3-website-us-west-2.amazonaws.com/archive"}

- github/release-cabal:
name: GitHub Release
context: haskell-ci
requires:
- GHC 8.2.2
- GHC 8.4.4
- GHC 8.6.5
- GHC 8.8.3
checkout: true
filters:
branches:
Expand Down
4 changes: 2 additions & 2 deletions .vscode/tasks.json
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"label": "Build",
"type": "shell",
"command": "bash",
"args": ["-lc", "cabal new-build && echo 'Done'"],
"args": ["-lc", "cabal v2-build --enable-tests && echo 'Done'"],
"group": {
"kind": "build",
"isDefault": true
Expand Down Expand Up @@ -37,7 +37,7 @@
"label": "Test",
"type": "shell",
"command": "bash",
"args": ["-lc", "cabal new-test && echo 'Done'"],
"args": ["-lc", "cabal v2-test --enable-tests --test-show-details=direct && echo 'Done'"],
"group": {
"kind": "test",
"isDefault": true
Expand Down
1 change: 1 addition & 0 deletions cabal.project
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
packages: .
11 changes: 4 additions & 7 deletions example/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ module Main where

import Control.Monad.Trans.Except
import qualified Data.Aeson as J
import qualified Data.Avro as A
import Data.Avro.Schema as S
import qualified Data.Avro.Types as AT
import Data.Monoid

import Data.Int
Expand All @@ -27,9 +24,9 @@ main = do

roundtrip :: SchemaRegistry -> ExceptT AppError IO TestMessage
roundtrip sr = do
enc <- withExceptT EncError (encode exampleMessage)
dec <- withExceptT DecError (decode enc)
enc <- withExceptT EncError (encode' exampleMessage)
dec <- withExceptT DecError (decode' enc)
return dec
where
encode msg = ExceptT $ encodeWithSchema sr (Subject "example-subject") exampleMessage
decode msg = ExceptT $ decodeWithSchema sr msg
encode' msg = ExceptT $ encode sr (Subject "example-subject") exampleMessage
decode' msg = ExceptT $ decode sr msg
54 changes: 19 additions & 35 deletions example/Message.hs
Original file line number Diff line number Diff line change
@@ -1,42 +1,26 @@
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE TemplateHaskell #-}
module Message
( TestMessage(..)
, schema'TestMessage
) where
--

import Data.Avro
import Data.Avro.Schema
import qualified Data.Avro.Types as AT
import Data.Int
import Data.Text
import Data.Avro
import Data.Avro.Deriving

data TestMessage = TestMessage Int64 Text Bool Int64 deriving (Show, Eq, Ord)
deriveAvroFromByteString [r|
{
"type": "record",
"name": "TestMessage",
"namespace": "hw.kafka.avro.test",
"fields": [
{ "name": "id", "type": "long" },
{ "name": "name", "type": "string" },
{ "name": "is_active", "type": "boolean" },
{ "name": "timestamp", "type": "long" }
]
}
|]

testMessageSchema =
let fld nm = Field nm [] Nothing Nothing
in Record (TN "TestMessage" ["hw", "kafka", "avro", "test"]) [] Nothing Nothing
[ fld "id" Long Nothing
, fld "name" String Nothing
, fld "is_active" Boolean Nothing
, fld "timestamp" Long Nothing
]

instance HasAvroSchema TestMessage where
schema = pure testMessageSchema

instance FromAvro TestMessage where
fromAvro (AT.Record _ r) =
TestMessage <$> r .: "id"
<*> r .: "name"
<*> r .: "is_active"
<*> r .: "timestamp"
fromAvro v = badValue v "TestMessage"

instance ToAvro TestMessage where
toAvro (TestMessage i s d t) =
record testMessageSchema
[ "id" .= i
, "name" .= s
, "is_active" .= d
, "timestamp" .= t
]
10 changes: 5 additions & 5 deletions hw-kafka-avro.cabal
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
cabal-version: 1.12
cabal-version: 2.4

name: hw-kafka-avro
version: 4.0.1
Expand All @@ -10,7 +10,7 @@ bug-reports: https://github.com/haskell-works/hw-kafka-avro/issues
author: Alexey Raga
maintainer: alexey.raga@gmail.com
copyright: Alexey Raga
license: BSD3
license: BSD-3-Clause
license-file: LICENSE
build-type: Simple
extra-source-files:
Expand All @@ -34,7 +34,7 @@ library
hs-source-dirs: src
build-depends:
aeson
, avro >=0.4
, avro >=0.5 && <0.6
, base >=4.7 && <5
, binary
, bytestring
Expand Down Expand Up @@ -63,7 +63,7 @@ executable kafka-avro-example
ghc-options: -threaded -rtsopts -with-rtsopts=-N
build-depends:
aeson
, avro
, avro >=0.5 && <0.6
, base
, binary
, bytestring
Expand Down Expand Up @@ -92,7 +92,7 @@ test-suite kafka-avro-test
build-depends:
QuickCheck
, aeson
, avro
, avro >=0.5 && <0.6
, base
, binary
, bytestring
Expand Down
2 changes: 1 addition & 1 deletion src/Kafka/Avro.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ propagateSchema :: MonadIO m
-> Subject
-> ByteString
-> m (Either SchemaRegistryError (Maybe SchemaId))
propagateSchema sr subj bs = do
propagateSchema sr subj bs =
case extractSchemaId bs of
Nothing -> return $ Right Nothing
Just (sid, _) -> do
Expand Down
66 changes: 32 additions & 34 deletions src/Kafka/Avro/Decode.hs
Original file line number Diff line number Diff line change
@@ -1,53 +1,57 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeApplications #-}
module Kafka.Avro.Decode
(
DecodeError(..)
, decodeWithSchema, extractSchemaId
, decode
, decodeWithSchema
, extractSchemaId
) where

import Control.Arrow (left)
import Control.Monad.IO.Class (MonadIO)
import Data.Avro as A (FromAvro, HasAvroSchema (..), Result (..), fromAvro)
import qualified Data.Avro as A (decodeWithSchema)
import qualified Data.Avro.Decode as A (decodeAvro)
import qualified Data.Avro.Deconflict as A (deconflict)
import Data.Avro.Schema (Schema)
import Data.Bits (shiftL)
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BL hiding (zipWith)
import Control.Arrow (left)
import Control.Monad.IO.Class (MonadIO)
import Control.Monad.Trans.Except
import Data.Avro (FromAvro, HasAvroSchema (..), Schema, decodeValueWithSchema, deconflict)
import Data.Bits (shiftL)
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as BL hiding (zipWith)
import Data.Int
import Data.Tagged (Tagged, untag)
import Data.Tagged (untag)
import Kafka.Avro.SchemaRegistry

data DecodeError = DecodeRegistryError SchemaRegistryError
| BadPayloadNoSchemaId
| DecodeError Schema String
| IncompatibleSchema Schema String
deriving (Show, Eq)

-- | Decodes a provided Avro-encoded value.
-- The serialised value is expected to be in a "confluent-compatible" format
-- where the "real" value bytestring is prepended with extra 5 bytes:
-- a "magic" byte and 4 bytes representing the schema ID.
decodeWithSchema :: (MonadIO m, FromAvro a)
=> SchemaRegistry
-> ByteString
-> m (Either DecodeError a)
decodeWithSchema sr bs =
decode :: forall a m. (MonadIO m, HasAvroSchema a, FromAvro a)
=> SchemaRegistry
-> ByteString
-> m (Either DecodeError a)
decode sr = decodeWithSchema sr (untag @a schema)
{-# INLINE decode #-}

decodeWithSchema :: forall a m. (MonadIO m, FromAvro a)
=> SchemaRegistry
-> Schema
-> ByteString
-> m (Either DecodeError a)
decodeWithSchema sr readerSchema bs =
case schemaData of
Left err -> return $ Left err
Right (sid, payload) -> do
res <- left DecodeRegistryError <$> loadSchema sr sid
return $ res >>= flip decodeWithDeconflict payload
Right (sid, payload) -> runExceptT $ do
writerSchema <- withError DecodeRegistryError (loadSchema sr sid)
readSchema <- withPureError (IncompatibleSchema writerSchema) $ deconflict writerSchema readerSchema
withPureError (DecodeError writerSchema) (decodeValueWithSchema readSchema payload)
where
schemaData = maybe (Left BadPayloadNoSchemaId) Right (extractSchemaId bs)

decodeWithDeconflict :: forall a. (FromAvro a) => Schema -> ByteString -> Either DecodeError a
decodeWithDeconflict writerSchema bs =
let readerSchema = untag (schema :: Tagged a Schema)
in left (DecodeError readerSchema) $ do
raw <- A.decodeAvro writerSchema bs
val <- A.deconflict writerSchema readerSchema raw
resultToEither readerSchema (fromAvro val)
withError f = withExceptT f . ExceptT
withPureError f = withError f . pure

extractSchemaId :: ByteString -> Maybe (SchemaId, ByteString)
extractSchemaId bs = do
Expand All @@ -60,9 +64,3 @@ extractSchemaId bs = do
let int = sum $ zipWith shiftL ints [0, 8, 16, 24]
return (SchemaId int, b4)

resultToEither :: Schema -> A.Result a -> Either String a
resultToEither sc res = case res of
Success a -> Right a
Error msg -> Left msg


Loading

0 comments on commit 901739c

Please sign in to comment.