2020-- only deliver messages that were not seen before. In case we are not connected
2121-- to our 'etcd' instance or not enough peers (= on a minority cluster), we
2222-- retry sending, but also store messages to broadcast in a 'PersistentQueue',
23- -- which makes the node resilient against crashes while sending. TODO: Is this
24- -- needed? performance limitation?
23+ -- which makes the node resilient against crashes while sending.
2524--
2625-- Connectivity and compatibility with other nodes on the cluster is tracked
2726-- using the key-value service as well:
@@ -93,7 +92,6 @@ import Network.GRPC.Client (
9392 )
9493import Network.GRPC.Client.StreamType.IO (biDiStreaming , nonStreaming )
9594import Network.GRPC.Common (GrpcError (.. ), GrpcException (.. ), HTTP2Settings (.. ), NextElem (.. ), def , defaultHTTP2Settings )
96- import Network.GRPC.Common.NextElem (whileNext_ )
9795import Network.GRPC.Common.Protobuf (Proto (.. ), Protobuf , defMessage , (.~) )
9896import Network.GRPC.Etcd (
9997 Compare'CompareResult (.. ),
@@ -102,6 +100,7 @@ import Network.GRPC.Etcd (
102100 Lease ,
103101 Watch ,
104102 )
103+ import Network.Socket (PortNumber )
105104import System.Directory (createDirectoryIfMissing , listDirectory , removeFile )
106105import System.Environment.Blank (getEnvironment )
107106import System.FilePath ((</>) )
@@ -175,7 +174,7 @@ withEtcdNetwork tracer protocolVersion config callback action = do
175174 traceWith tracer Reconnecting
176175 pure $ reconnectPolicy doneVar
177176
178- clientHost = Host {hostname = " 127.0.0.1" , port = clientPort }
177+ clientHost = Host {hostname = " 127.0.0.1" , port = getClientPort config }
179178
180179 grpcServer =
181180 ServerInsecure $
@@ -185,11 +184,6 @@ withEtcdNetwork tracer protocolVersion config callback action = do
185184 , addressAuthority = Nothing
186185 }
187186
188- -- NOTE: Offset client port by the same amount as configured 'port' is offset
189- -- from the default '5001'. This will result in the default client port 2379
190- -- be used by default still.
191- clientPort = 2379 + port listen - 5001
192-
193187 traceStderr p NetworkCallback {onConnectivity} =
194188 forever $ do
195189 bs <- BS. hGetLine (getStderr p)
@@ -249,6 +243,14 @@ withEtcdNetwork tracer protocolVersion config callback action = do
249243
250244 NetworkConfiguration {persistenceDir, listen, advertise, peers, whichEtcd} = config
251245
246+ -- | Get the client port corresponding to a listen address.
247+ --
248+ -- The client port used by the started etcd port is offset by the same amount as
249+ -- the listen address is offset by the default port 5001. This will result in
250+ -- the default client port 2379 be used by default still.
251+ getClientPort :: NetworkConfiguration -> PortNumber
252+ getClientPort NetworkConfiguration {listen} = 2379 + port listen - 5001
253+
252254-- | Check and write version on etcd cluster. This will retry until we are on a
253255-- majority cluster and succeed. If the version does not match a corresponding
254256-- 'Connectivity' message is sent via 'NetworkCallback'.
@@ -282,8 +284,7 @@ checkVersion tracer conn ourVersion NetworkCallback{onConnectivity} = do
282284 Right theirVersion ->
283285 unless (theirVersion == ourVersion) $
284286 onConnectivity VersionMismatch {ourVersion, theirVersion = Just theirVersion}
285- else
286- traceWith tracer $ MatchingProtocolVersion {version = ourVersion}
287+ else traceWith tracer $ MatchingProtocolVersion {version = ourVersion}
287288 where
288289 versionKey = " version"
289290
@@ -361,11 +362,13 @@ waitMessages ::
361362 NetworkCallback msg IO ->
362363 IO ()
363364waitMessages tracer conn directory NetworkCallback {deliver} = do
364- revision <- getLastKnownRevision directory
365365 withGrpcContext " waitMessages" . forever $ do
366366 -- NOTE: We have not observed the watch (subscription) fail even when peers
367367 -- leave and we end up on a minority cluster.
368368 biDiStreaming conn (rpc @ (Protobuf Watch " watch" )) $ \ send recv -> do
369+ revision <- getLastKnownRevision directory
370+ let startRevision = fromIntegral (revision + 1 )
371+ traceWith tracer WatchMessagesStartRevision {startRevision}
369372 -- NOTE: Request all keys starting with 'msg'. See also section KeyRanges
370373 -- in https://etcd.io/docs/v3.5/learning/api/#key-value-api
371374 let watchRequest =
@@ -374,34 +377,48 @@ waitMessages tracer conn directory NetworkCallback{deliver} = do
374377 & # rangeEnd .~ " msh" -- NOTE: g+1 to query prefixes
375378 & # startRevision .~ fromIntegral (revision + 1 )
376379 send . NextElem $ defMessage & # createRequest .~ watchRequest
377- whileNext_ recv process
380+ loop send recv
378381 -- Wait before re-trying
379382 threadDelay 1
380383 where
381- process res = do
382- let revision = fromIntegral $ res ^. # header . # revision
383- putLastKnownRevision directory revision
384- forM_ (res ^. # events) $ \ event -> do
385- let value = event ^. # kv . # value
386- case decodeFull' value of
387- Left err ->
388- traceWith
389- tracer
390- FailedToDecodeValue
391- { key = decodeUtf8 $ event ^. # kv . # key
392- , value = encodeBase16 value
393- , reason = show err
394- }
395- Right msg -> deliver msg
384+ loop send recv =
385+ recv >>= \ case
386+ NoNextElem -> pure ()
387+ NextElem res ->
388+ if res ^. # canceled
389+ then do
390+ let compactRevision = res ^. # compactRevision
391+ traceWith tracer WatchMessagesFallbackTo {compactRevision}
392+ putLastKnownRevision directory . fromIntegral $ (compactRevision - 1 ) `max` 0
393+ -- Gracefully close watch stream
394+ send NoNextElem
395+ else do
396+ let revision = res ^. # header . # revision
397+ putLastKnownRevision directory . fromIntegral $ revision `max` 0
398+ forM_ (res ^. # events) process
399+ loop send recv
400+
401+ process event = do
402+ let value = event ^. # kv . # value
403+ case decodeFull' value of
404+ Left err ->
405+ traceWith
406+ tracer
407+ FailedToDecodeValue
408+ { key = decodeUtf8 $ event ^. # kv . # key
409+ , value = encodeBase16 value
410+ , reason = show err
411+ }
412+ Right msg -> deliver msg
396413
397414getLastKnownRevision :: MonadIO m => FilePath -> m Natural
398415getLastKnownRevision directory = do
399416 liftIO $
400417 try (decodeFileStrict' $ directory </> " last-known-revision" ) >>= \ case
401418 Right rev -> do
402- pure $ fromMaybe 1 rev
419+ pure $ fromMaybe 0 rev
403420 Left (e :: IOException )
404- | isDoesNotExistError e -> pure 1
421+ | isDoesNotExistError e -> pure 0
405422 | otherwise -> do
406423 fail $ " Failed to load last known revision: " <> show e
407424
@@ -614,5 +631,7 @@ data EtcdLog
614631 | LowLeaseTTL { ttlRemaining :: Int64 }
615632 | NoKeepAliveResponse
616633 | MatchingProtocolVersion { version :: ProtocolVersion }
634+ | WatchMessagesStartRevision { startRevision :: Int64 }
635+ | WatchMessagesFallbackTo { compactRevision :: Int64 }
617636 deriving stock (Eq , Show , Generic )
618637 deriving anyclass (ToJSON , FromJSON )
0 commit comments