2222--  only deliver messages that were not seen before. In case we are not connected 
2323--  to our 'etcd' instance or not enough peers (= on a minority cluster), we 
2424--  retry sending, but also store messages to broadcast in a 'PersistentQueue', 
25- --  which makes the node resilient against crashes while sending. TODO: Is this 
26- --  needed? performance limitation? 
25+ --  which makes the node resilient against crashes while sending. 
2726-- 
2827--  Connectivity and compatibility with other nodes on the cluster is tracked 
2928--  using the key-value service as well: 
@@ -92,7 +91,6 @@ import Network.GRPC.Client (
9291 )
9392import  Network.GRPC.Client.StreamType.IO  (biDiStreaming , nonStreaming )
9493import  Network.GRPC.Common  (GrpcError  (.. ), GrpcException  (.. ), HTTP2Settings  (.. ), NextElem  (.. ), def , defaultHTTP2Settings )
95- import  Network.GRPC.Common.NextElem  (whileNext_ )
9694import  Network.GRPC.Common.Protobuf  (Proto  (.. ), Protobuf , defMessage , (.~) )
9795import  Network.GRPC.Etcd  (
9896  Compare'CompareResult  (.. ),
@@ -101,6 +99,7 @@ import Network.GRPC.Etcd (
10199  Lease ,
102100  Watch ,
103101 )
102+ import  Network.Socket  (PortNumber )
104103import  System.Directory  (createDirectoryIfMissing , listDirectory , removeFile )
105104import  System.Environment.Blank  (getEnvironment )
106105import  System.FilePath  (takeDirectory , (</>) )
@@ -175,7 +174,7 @@ withEtcdNetwork tracer protocolVersion config callback action = do
175174        traceWith tracer Reconnecting 
176175        pure  $  reconnectPolicy doneVar
177176
178-   clientHost =  Host {hostname =  " 127.0.0.1"  , port =  clientPort }
177+   clientHost =  Host {hostname =  " 127.0.0.1"  , port =  getClientPort config }
179178
180179  grpcServer = 
181180    ServerInsecure  $ 
@@ -185,11 +184,6 @@ withEtcdNetwork tracer protocolVersion config callback action = do
185184        , addressAuthority =  Nothing 
186185        }
187186
188-   --  NOTE: Offset client port by the same amount as configured 'port' is offset
189-   --  from the default '5001'. This will result in the default client port 2379
190-   --  be used by default still.
191-   clientPort =  2379  +  port listen -  5001 
192- 
193187  traceStderr p NetworkCallback {onConnectivity} = 
194188    forever $  do 
195189      bs <-  BS. hGetLine (getStderr p)
@@ -249,6 +243,14 @@ withEtcdNetwork tracer protocolVersion config callback action = do
249243
250244  NetworkConfiguration {persistenceDir, listen, advertise, peers, whichEtcd} =  config
251245
246+ --  |  Get the client port corresponding to a listen address. 
247+ -- 
248+ --  The client port used by the started etcd port is offset by the same amount as 
249+ --  the listen address is offset by the default port 5001. This will result in 
250+ --  the default client port 2379 be used by default still. 
251+ getClientPort  ::  NetworkConfiguration  ->  PortNumber 
252+ getClientPort NetworkConfiguration {listen} =  2379  +  port listen -  5001 
253+ 
252254--  |  Return the path of the etcd binary. Will either install it first, or just 
253255--  assume there is one available on the system path. 
254256getEtcdBinary  ::  FilePath   ->  WhichEtcd  ->  IO   FilePath 
@@ -297,8 +299,7 @@ checkVersion tracer conn ourVersion NetworkCallback{onConnectivity} = do
297299        Right   theirVersion -> 
298300          unless (theirVersion ==  ourVersion) $ 
299301            onConnectivity VersionMismatch {ourVersion, theirVersion =  Just  theirVersion}
300-     else 
301-       traceWith tracer $  MatchingProtocolVersion {version =  ourVersion}
302+     else  traceWith tracer $  MatchingProtocolVersion {version =  ourVersion}
302303 where 
303304  versionKey =  " version" 
304305
@@ -371,11 +372,13 @@ waitMessages ::
371372  NetworkCallback  msg  IO   -> 
372373  IO   () 
373374waitMessages tracer conn directory NetworkCallback {deliver} =  do 
374-   revision <-  getLastKnownRevision directory
375375  withGrpcContext " waitMessages"   .  forever $  do 
376376    --  NOTE: We have not observed the watch (subscription) fail even when peers
377377    --  leave and we end up on a minority cluster.
378378    biDiStreaming conn (rpc @ (Protobuf  Watch  " watch"  )) $  \ send recv ->  do 
379+       revision <-  getLastKnownRevision directory
380+       let  startRevision =  fromIntegral  (revision +  1 )
381+       traceWith tracer WatchMessagesStartRevision {startRevision}
379382      --  NOTE: Request all keys starting with 'msg'. See also section KeyRanges
380383      --  in https://etcd.io/docs/v3.5/learning/api/#key-value-api
381384      let  watchRequest = 
@@ -384,34 +387,48 @@ waitMessages tracer conn directory NetworkCallback{deliver} = do
384387              &  # rangeEnd .~  " msh"   --  NOTE: g+1 to query prefixes
385388              &  # startRevision .~  fromIntegral  (revision +  1 )
386389      send .  NextElem  $  defMessage &  # createRequest .~  watchRequest
387-       whileNext_ recv process 
390+       loop send recv 
388391    --  Wait before re-trying
389392    threadDelay 1 
390393 where 
391-   process res =  do 
392-     let  revision =  fromIntegral  $  res ^.  # header .  # revision
393-     putLastKnownRevision directory revision
394-     forM_ (res ^.  # events) $  \ event ->  do 
395-       let  value =  event ^.  # kv .  # value
396-       case  decodeFull' value of 
397-         Left   err -> 
398-           traceWith
399-             tracer
400-             FailedToDecodeValue 
401-               { key =  decodeUtf8 $  event ^.  # kv .  # key
402-               , value =  encodeBase16 value
403-               , reason =  show  err
404-               }
405-         Right   msg ->  deliver msg
394+   loop send recv = 
395+     recv >>=  \ case 
396+       NoNextElem  ->  pure  () 
397+       NextElem  res -> 
398+         if  res ^.  # canceled
399+           then  do 
400+             let  compactRevision =  res ^.  # compactRevision
401+             traceWith tracer WatchMessagesFallbackTo {compactRevision}
402+             putLastKnownRevision directory .  fromIntegral  $  (compactRevision -  1 ) `max`  0 
403+             --  Gracefully close watch stream
404+             send NoNextElem 
405+           else  do 
406+             let  revision =  res ^.  # header .  # revision
407+             putLastKnownRevision directory .  fromIntegral  $  revision `max`  0 
408+             forM_ (res ^.  # events) process
409+             loop send recv
410+ 
411+   process event =  do 
412+     let  value =  event ^.  # kv .  # value
413+     case  decodeFull' value of 
414+       Left   err -> 
415+         traceWith
416+           tracer
417+           FailedToDecodeValue 
418+             { key =  decodeUtf8 $  event ^.  # kv .  # key
419+             , value =  encodeBase16 value
420+             , reason =  show  err
421+             }
422+       Right   msg ->  deliver msg
406423
407424getLastKnownRevision  ::  MonadIO  m  =>  FilePath   ->  m  Natural 
408425getLastKnownRevision directory =  do 
409426  liftIO $ 
410427    try (decodeFileStrict' $  directory </>  " last-known-revision"  ) >>=  \ case 
411428      Right   rev ->  do 
412-         pure  $  fromMaybe 1  rev
429+         pure  $  fromMaybe 0  rev
413430      Left   (e ::  IOException )
414-         |  isDoesNotExistError e ->  pure  1 
431+         |  isDoesNotExistError e ->  pure  0 
415432        |  otherwise  ->  do 
416433            fail  $  " Failed to load last known revision: "   <>  show  e
417434
@@ -614,5 +631,7 @@ data EtcdLog
614631  | LowLeaseTTL  { ttlRemaining  ::  DiffTime } 
615632  | NoKeepAliveResponse 
616633  | MatchingProtocolVersion  { version  ::  ProtocolVersion } 
634+   | WatchMessagesStartRevision  { startRevision  ::  Int64 } 
635+   | WatchMessagesFallbackTo  { compactRevision  ::  Int64 } 
617636  deriving  stock  (Eq , Show , Generic )
618637  deriving  anyclass  (ToJSON )
0 commit comments