Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
dist
.cabal-sandbox
*DS*
.stack-work/
2 changes: 1 addition & 1 deletion aws-kinesis.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ Library
base64-bytestring >= 1.0,
blaze-builder >= 0.3,
bytestring >= 0.10,
conduit >= 1.1,
conduit >= 1.2.1,
conduit-extra >= 1.1,
deepseq >= 1.3,
http-conduit >= 2.1,
Expand Down
5 changes: 5 additions & 0 deletions src/Aws/Kinesis/Commands/CreateStream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
--
-- <http://docs.aws.amazon.com/kinesis/2013-12-02/APIReference/API_CreateStream.html>

{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
Expand Down Expand Up @@ -127,7 +128,11 @@ instance NFData CreateStreamResponse

instance ResponseConsumer r CreateStreamResponse where
type ResponseMetadata CreateStreamResponse = KinesisMetadata
#if MIN_VERSION_aws(0,15,0)
responseConsumer _ _ = kinesisResponseConsumer
#else
responseConsumer _ = kinesisResponseConsumer
#endif

instance FromJSON CreateStreamResponse where
parseJSON _ = return CreateStreamResponse
Expand Down
5 changes: 5 additions & 0 deletions src/Aws/Kinesis/Commands/DeleteStream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
--
-- <http://docs.aws.amazon.com/kinesis/2013-12-02/APIReference/API_DeleteStream.html>

{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
Expand Down Expand Up @@ -99,7 +100,11 @@ instance FromJSON DeleteStreamResponse where

instance ResponseConsumer r DeleteStreamResponse where
type ResponseMetadata DeleteStreamResponse = KinesisMetadata
#if MIN_VERSION_aws(0,15,0)
responseConsumer _ _ = kinesisResponseConsumer
#else
responseConsumer _ = kinesisResponseConsumer
#endif

instance SignQuery DeleteStream where
type ServiceConfiguration DeleteStream = KinesisConfiguration
Expand Down
4 changes: 4 additions & 0 deletions src/Aws/Kinesis/Commands/DescribeStream.hs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ instance NFData DescribeStreamResponse

instance ResponseConsumer r DescribeStreamResponse where
type ResponseMetadata DescribeStreamResponse = KinesisMetadata
#if MIN_VERSION_aws(0,15,0)
responseConsumer _ _ = kinesisResponseConsumer
#else
responseConsumer _ = kinesisResponseConsumer
#endif

instance FromJSON DescribeStreamResponse where
parseJSON = withObject "DescribeStreamResponse" $ \o -> DescribeStreamResponse
Expand Down
4 changes: 4 additions & 0 deletions src/Aws/Kinesis/Commands/GetRecords.hs
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,11 @@ instance FromJSON GetRecordsResponse where

instance ResponseConsumer r GetRecordsResponse where
type ResponseMetadata GetRecordsResponse = KinesisMetadata
#if MIN_VERSION_aws(0,15,0)
responseConsumer _ _ = kinesisResponseConsumer
#else
responseConsumer _ = kinesisResponseConsumer
#endif

instance SignQuery GetRecords where
type ServiceConfiguration GetRecords = KinesisConfiguration
Expand Down
4 changes: 4 additions & 0 deletions src/Aws/Kinesis/Commands/GetShardIterator.hs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,11 @@ instance FromJSON GetShardIteratorResponse where

instance ResponseConsumer r GetShardIteratorResponse where
type ResponseMetadata GetShardIteratorResponse = KinesisMetadata
#if MIN_VERSION_aws(0,15,0)
responseConsumer _ _ = kinesisResponseConsumer
#else
responseConsumer _ = kinesisResponseConsumer
#endif

instance SignQuery GetShardIterator where
type ServiceConfiguration GetShardIterator = KinesisConfiguration
Expand Down
4 changes: 4 additions & 0 deletions src/Aws/Kinesis/Commands/ListStreams.hs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,11 @@ instance FromJSON ListStreamsResponse where

instance ResponseConsumer r ListStreamsResponse where
type ResponseMetadata ListStreamsResponse = KinesisMetadata
#if MIN_VERSION_aws(0,15,0)
responseConsumer _ _ = kinesisResponseConsumer
#else
responseConsumer _ = kinesisResponseConsumer
#endif

instance SignQuery ListStreams where
type ServiceConfiguration ListStreams = KinesisConfiguration
Expand Down
5 changes: 5 additions & 0 deletions src/Aws/Kinesis/Commands/MergeShards.hs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
--
-- <http://docs.aws.amazon.com/kinesis/2013-12-02/APIReference/API_MergeShards.html>

{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE OverloadedStrings #-}
Expand Down Expand Up @@ -124,7 +125,11 @@ instance NFData MergeShardsResponse

instance ResponseConsumer r MergeShardsResponse where
type ResponseMetadata MergeShardsResponse = KinesisMetadata
#if MIN_VERSION_aws(0,15,0)
responseConsumer _ _ = kinesisResponseConsumer
#else
responseConsumer _ = kinesisResponseConsumer
#endif

instance FromJSON MergeShardsResponse where
parseJSON _ = return MergeShardsResponse
Expand Down
4 changes: 4 additions & 0 deletions src/Aws/Kinesis/Commands/PutRecord.hs
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ instance FromJSON PutRecordResponse where

instance ResponseConsumer r PutRecordResponse where
type ResponseMetadata PutRecordResponse = KinesisMetadata
#if MIN_VERSION_aws(0,15,0)
responseConsumer _ _ = kinesisResponseConsumer
#else
responseConsumer _ = kinesisResponseConsumer
#endif

instance SignQuery PutRecord where
type ServiceConfiguration PutRecord = KinesisConfiguration
Expand Down
5 changes: 5 additions & 0 deletions src/Aws/Kinesis/Commands/PutRecords.hs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
--
-- <http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html>

{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleInstances #-}
Expand Down Expand Up @@ -208,7 +209,11 @@ instance Transaction PutRecords PutRecordsResponse where

instance ResponseConsumer r PutRecordsResponse where
type ResponseMetadata PutRecordsResponse = KinesisMetadata
#if MIN_VERSION_aws(0,15,0)
responseConsumer _ _ = kinesisResponseConsumer
#else
responseConsumer _ = kinesisResponseConsumer
#endif

instance AsMemoryResponse PutRecordsResponse where
type MemoryResponse PutRecordsResponse = PutRecordsResponse
Expand Down
5 changes: 5 additions & 0 deletions src/Aws/Kinesis/Commands/SplitShard.hs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
--
-- <http://docs.aws.amazon.com/kinesis/2013-12-02/APIReference/API_SplitShard.html>

{-# LANGUAGE CPP #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
Expand Down Expand Up @@ -138,7 +139,11 @@ instance NFData SplitShardResponse

instance ResponseConsumer r SplitShardResponse where
type ResponseMetadata SplitShardResponse = KinesisMetadata
#if MIN_VERSION_aws(0,15,0)
responseConsumer _ _ = kinesisResponseConsumer
#else
responseConsumer _ = kinesisResponseConsumer
#endif

instance FromJSON SplitShardResponse where
parseJSON _ = return SplitShardResponse
Expand Down
16 changes: 10 additions & 6 deletions src/Aws/Kinesis/Core.hs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,12 @@ import Data.Aeson
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB
import qualified Data.ByteString.Char8 as B8
import Data.Conduit (($$+-))
import Data.Conduit (runConduit, (.|))
import Data.Conduit.Binary (sinkLbs)
import Data.IORef
import Data.Maybe
import Data.Monoid
import Data.Monoid as Monoid
import Data.Semigroup as Semigroup
import Data.String
import Data.Time.Clock
import Data.Typeable
Expand Down Expand Up @@ -213,9 +214,12 @@ instance Loggable KinesisMetadata where
"Kinesis: request ID=" <> fromMaybe "<none>" rid
<> ", x-amz-id-2=" <> fromMaybe "<none>" id2

instance Monoid KinesisMetadata where
instance Semigroup.Semigroup KinesisMetadata where
KinesisMetadata id1 r1 <> KinesisMetadata id2 r2 = KinesisMetadata (id1 <|> id2) (r1 <|> r2)

instance Monoid.Monoid KinesisMetadata where
mempty = KinesisMetadata Nothing Nothing
KinesisMetadata id1 r1 `mappend` KinesisMetadata id2 r2 = KinesisMetadata (id1 <|> id2) (r1 <|> r2)
mappend = (<>)

-- -------------------------------------------------------------------------- --
-- Kinesis Configuration
Expand Down Expand Up @@ -306,7 +310,7 @@ jsonResponseConsumer
:: FromJSON a
=> HTTPResponseConsumer a
jsonResponseConsumer res = do
doc <- HTTP.responseBody res $$+- sinkLbs
doc <- runConduit (HTTP.responseBody res .| sinkLbs)
case eitherDecode (if doc == mempty then "{}" else doc) of
Left err -> throwM . KinesisResponseJsonError $ T.pack err
Right v -> return v
Expand Down Expand Up @@ -335,7 +339,7 @@ kinesisResponseConsumer metadata resp = do
--
errorResponseConsumer :: HTTPResponseConsumer a
errorResponseConsumer resp = do
doc <- HTTP.responseBody resp $$+- sinkLbs
doc <- runConduit (HTTP.responseBody resp .| sinkLbs)
if HTTP.responseStatus resp == HTTP.status400
then kinesisError doc
else throwM KinesisOtherError
Expand Down
6 changes: 6 additions & 0 deletions stack.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
resolver: lts-13.15
packages:
- .
extra-deps:
- aws-0.21.1
- aws-general-0.2.2
67 changes: 27 additions & 40 deletions tests/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,12 @@ import Aws
import Aws.Kinesis

import Control.Error
import Control.Exception
import Control.Monad
import Control.Monad.IO.Class

import qualified Data.ByteString as B
import qualified Data.List as L
import Data.Monoid
import Data.Monoid as Monoid
import Data.Proxy
import qualified Data.Text as T

Expand Down Expand Up @@ -120,25 +119,13 @@ simpleKinesis command = do
simpleKinesisT
:: (AsMemoryResponse a, Transaction r a, ServiceConfiguration r ~ KinesisConfiguration, MonadIO m)
=> r
-> EitherT T.Text m (MemoryResponse a)
-> ExceptT T.Text m (MemoryResponse a)
simpleKinesisT = tryT . simpleKinesis

testStreamName :: StreamName -> StreamName
testStreamName = either (error . T.unpack) id
. streamName . T.take 128 . testData . streamNameText

-- |
--
withStream
:: StreamName -- ^ Stream Name
-> Int -- ^ Shard count
-> IO a
-> IO a
withStream stream shardCount = bracket_ createStream deleteStream
where
createStream = simpleKinesis $ CreateStream shardCount stream
deleteStream = void $ simpleKinesis (DeleteStream stream)

-- | The function 'withResource' from "Tasty" synchronizes the aquired
-- resource through a 'TVar'. We don't need that for a stream. So instead
-- of passing the 'IO StreamName' from 'withResource' we directly pass
Expand Down Expand Up @@ -167,12 +154,12 @@ waitActiveT
-- The actual maximal number of seconds is closest smaller
-- power of two.
-> StreamName
-> EitherT T.Text IO StreamDescription
-> ExceptT T.Text IO StreamDescription
waitActiveT sec stream = retryT maxRetry $ do
DescribeStreamResponse d <- simpleKinesisT
$ DescribeStream Nothing Nothing stream
unless (streamDescriptionStreamStatus d == StreamStatusActive)
$ left "Stream is not active"
$ throwE "Stream is not active"
return d
where
maxRetry = floor $ logBase 2 (fromIntegral sec :: Double)
Expand Down Expand Up @@ -201,37 +188,37 @@ test_jsonRoundtrips = testGroup "JSON encoding roundtrips"
test_stream1 :: TestTree
test_stream1 = withStreamTest defaultStreamName 1 $ \stream ->
testGroup "Perform a series of tests on a single stream"
[ eitherTOnceTest0 "list streams" (prop_streamList stream)
, eitherTOnceTest0 "describe stream" (prop_streamDescribe 1 stream)
, eitherTOnceTest2 "put and get stream" (prop_streamPutGet stream)
[ exceptTOnceTest0 "list streams" (prop_streamList stream)
, exceptTOnceTest0 "describe stream" (prop_streamDescribe 1 stream)
, exceptTOnceTest2 "put and get stream" (prop_streamPutGet stream)
]

prop_streamList :: StreamName -> EitherT T.Text IO ()
prop_streamList :: StreamName -> ExceptT T.Text IO ()
prop_streamList stream = do
ListStreamsResponse _ streams <- simpleKinesisT $ ListStreams Nothing Nothing
unless (stream `elem` streams) $
left $ "stream " <> streamNameText stream <> " is not listed"
throwE $ "stream " Monoid.<> streamNameText stream <> " is not listed"

prop_streamDescribe
:: Int -- ^ expected number of shards
-> StreamName
-> EitherT T.Text IO ()
-> ExceptT T.Text IO ()
prop_streamDescribe shardNum stream = do
desc <- waitActiveT 64 stream

unless (streamDescriptionStreamName desc == stream)
. left $ "unexpected stream name in description: "
. throwE $ "unexpected stream name in description: "
<> streamNameText (streamDescriptionStreamName desc)

let l = length $ streamDescriptionShards desc
unless (l == shardNum)
. left $ "unexpected number of shards in stream description: " <> sshow l
. throwE $ "unexpected number of shards in stream description: " <> sshow l

prop_streamPutGet
:: StreamName
-> B.ByteString -- ^ Message data
-> PartitionKey
-> EitherT T.Text IO ()
-> ExceptT T.Text IO ()
prop_streamPutGet stream dat key = do
desc <- waitActiveT 64 stream

Expand All @@ -246,7 +233,7 @@ prop_streamPutGet stream dat key = do
}

let shardIds = map shardShardId shards
unless (putShard `elem` shardIds) . left
unless (putShard `elem` shardIds) . throwE
$ "unexpected shard id: expected on of " <> sshow shardIds <> "; got " <> sshow putShard

record <- retryT 5 $ do
Expand All @@ -261,21 +248,21 @@ prop_streamPutGet stream dat key = do
, getRecordsShardIterator = it
}
case records of
[] -> left "no record found in stream"
[] -> throwE "no record found in stream"
[r] -> return r
t -> left $ "unexpected records found in stream: " <> sshow t
t -> throwE $ "unexpected records found in stream: " <> sshow t

let getData = recordData record
unless (getData == dat) . left
unless (getData == dat) . throwE
$ "data does not match: expected " <> sshow dat <> "; got " <> sshow getData

let getSeqNr = recordSequenceNumber record
unless (getSeqNr == putSeqNr) . left
unless (getSeqNr == putSeqNr) . throwE
$ "sequence numbers don't match: expected " <> sshow putSeqNr
<> "; got " <> sshow getSeqNr

let getPartKey = recordPartitionKey record
unless (getPartKey == key) . left
unless (getPartKey == key) . throwE
$ "partition keys don't match: expected " <> sshow key
<> "; got " <> sshow getPartKey

Expand All @@ -284,20 +271,20 @@ prop_streamPutGet stream dat key = do

test_createStream :: TestTree
test_createStream = testGroup "Stream creation"
[ eitherTOnceTest1 "create list delete" prop_createListDelete
[ exceptTOnceTest1 "create list delete" prop_createListDelete
]

prop_createListDelete
:: StreamName -- ^ stream name
-> EitherT T.Text IO ()
-> ExceptT T.Text IO ()
prop_createListDelete stream = do
CreateStreamResponse <- simpleKinesisT $ CreateStream 1 tstream
handleT (\e -> deleteStream >> left e) $ do
ListStreamsResponse _ allStreams <- simpleKinesisT
$ ListStreams Nothing Nothing
unless (tstream `elem` allStreams)
. left $ "stream " <> streamNameText tstream <> " not listed"
deleteStream
flip catchE (\e -> deleteStream >> throwE e) $ do
ListStreamsResponse _ allStreams <- simpleKinesisT
$ ListStreams Nothing Nothing
unless (tstream `elem` allStreams)
. throwE $ "stream " <> streamNameText tstream <> " not listed"
deleteStream
where
deleteStream = void $ simpleKinesisT (DeleteStream tstream)
tstream = testStreamName stream
Expand Down
Loading