Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQSWatcher: Ignore failures in deleting recieved messages #3783

Merged
merged 4 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions libs/types-common-aws/default.nix
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
, lib
, proto-lens
, resourcet
, safe
, text
, time
, unliftio
Expand All @@ -33,7 +32,6 @@ mkDerivation {
lens
proto-lens
resourcet
safe
text
time
unliftio
Expand Down
19 changes: 9 additions & 10 deletions libs/types-common-aws/src/Util/Test/SQS.hs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import Data.List (delete)
import Data.ProtoLens
import Data.Text.Encoding qualified as Text
import Imports
import Safe (headDef)
import UnliftIO (Async, async, throwIO)
import UnliftIO (Async, async)
import UnliftIO.Async qualified as Async
import UnliftIO.Exception
import UnliftIO.Resource (MonadResource, ResourceT)
import UnliftIO.Timeout (timeout)

Expand Down Expand Up @@ -142,27 +142,26 @@ receive n url =
. set SQS.receiveMessage_maxNumberOfMessages (Just n)
. set SQS.receiveMessage_visibilityTimeout (Just 1)

fetchMessage :: (Message a, MonadReader AWS.Env m, MonadResource m) => Text -> String -> (String -> Maybe a -> IO ()) -> m ()
fetchMessage url label callback = do
msgs <- fromMaybe [] . view SQS.receiveMessageResponse_messages <$> sendEnv (receive 1 url)
events <- mapM (parseDeleteMessage url) msgs
liftIO $ callback label (headDef Nothing events)

deleteMessage :: (MonadReader AWS.Env m, MonadResource m) => Text -> SQS.Message -> m ()
deleteMessage url m = do
for_
(m ^. SQS.message_receiptHandle)
(void . sendEnv . SQS.newDeleteMessage url)

parseDeleteMessage :: (Message a, MonadReader AWS.Env m, MonadResource m) => Text -> SQS.Message -> m (Maybe a)
parseDeleteMessage :: (Message a, MonadReader AWS.Env m, MonadResource m, MonadUnliftIO m) => Text -> SQS.Message -> m (Maybe a)
parseDeleteMessage url m = do
let decodedMessage = decodeMessage <=< (B64.decode . Text.encodeUtf8)
evt <- case decodedMessage <$> (m ^. SQS.message_body) of
Just (Right e) -> pure (Just e)
_ -> do
liftIO $ print ("Failed to parse SQS message or event" :: String)
liftIO $ putStrLn "Failed to parse SQS message or event"
pure Nothing
deleteMessage url m
`catch` \case
(fromException @SomeAsyncException -> Just asyncExc) ->
throwIO asyncExc
e ->
liftIO $ putStrLn $ "Failed to delete message, this error will be ignored. Message: " <> show m <> ", Exception: " <> displayException e
pure evt

sendEnv :: (MonadReader AWS.Env m, MonadResource m, AWS.AWSRequest a) => a -> m (AWS.AWSResponse a)
Expand Down
1 change: 0 additions & 1 deletion libs/types-common-aws/types-common-aws.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ library
, lens >=4.10
, proto-lens
, resourcet
, safe >=0.3
, text >=0.11
, time
, unliftio
Expand Down
Loading