@@ -17,7 +17,7 @@ use datatypes::data_types;
1717use datatypes:: { ARKANSAS_PORT , CONGO_PORT , DANUBE_PORT , PARANA_PORT , TAGUS_PORT } ;
1818use futures:: prelude:: * ;
1919use futures:: select;
20- use prost:: Message ;
20+ use prost:: Message as _ ;
2121use rand:: random;
2222use std:: sync:: Arc ;
2323use zenoh_flow:: prelude:: * ;
@@ -86,36 +86,29 @@ impl Node for Geneva {
8686 async fn iteration ( & self ) -> Result < ( ) > {
8787 select ! {
8888 msg = self . input_danube. recv( ) . fuse( ) => {
89- if let Ok ( ( msg, _ts) ) = msg {
90- if let zenoh_flow:: prelude:: Message :: Data ( inner_data) = msg {
91- self . state. lock( ) . await . danube_last_val = ( * inner_data) . clone( ) ;
92- }
89+ if let Ok ( ( Message :: Data ( inner_data) , _ts) ) = msg {
90+ self . state. lock( ) . await . danube_last_val = ( * inner_data) . clone( ) ;
9391 }
9492 } ,
9593 msg = self . input_tagus. recv( ) . fuse( ) => {
96- if let Ok ( ( msg, _ts) ) = msg {
97- if let zenoh_flow:: prelude:: Message :: Data ( inner_data) = msg {
94+ if let Ok ( ( Message :: Data ( inner_data) , _ts) ) = msg {
9895 self . state. lock( ) . await . tagus_last_val = ( * inner_data) . clone( ) ;
9996 }
100- }
10197 } ,
10298 msg = self . input_congo. recv( ) . fuse( ) => {
103- if let Ok ( ( msg, _ts) ) = msg {
104- if let zenoh_flow:: prelude:: Message :: Data ( inner_data) = msg {
99+ if let Ok ( ( Message :: Data ( inner_data) , _ts) ) = msg {
105100 self . state. lock( ) . await . congo_last_val = ( * inner_data) . clone( ) ;
106- } }
101+ }
107102 } ,
108103 msg = self . input_parana. recv( ) . fuse( ) => {
109- if let Ok ( ( msg, _ts) ) = msg {
110- if let zenoh_flow:: prelude:: Message :: Data ( inner_data) = msg {
104+ if let Ok ( ( Message :: Data ( inner_data) , _ts) ) = msg {
111105 let value = data_types:: String {
112106 value: format!( "geneva/arkansas:{}" , inner_data. value) ,
113107 } ;
114108
115109 self . output_arkansas. send( value, None ) . await ?;
116110 }
117111 }
118- }
119112 }
120113 Ok ( ( ) )
121114 }
0 commit comments