@@ -13,7 +13,7 @@ use std::{
13
13
convert:: { TryFrom , TryInto } ,
14
14
env, fmt,
15
15
future:: Future ,
16
- sync :: Arc ,
16
+ panic ,
17
17
} ;
18
18
use tokio:: io:: { AsyncRead , AsyncWrite } ;
19
19
use tokio_stream:: { Stream , StreamExt } ;
@@ -93,7 +93,7 @@ pub struct HandlerFn<F> {
93
93
impl < F , A , B , Error , Fut > Handler < A , B > for HandlerFn < F >
94
94
where
95
95
F : Fn ( A , Context ) -> Fut ,
96
- Fut : Future < Output = Result < B , Error > > + Send ,
96
+ Fut : Future < Output = Result < B , Error > > ,
97
97
Error : Into < Box < dyn std:: error:: Error + Send + Sync + ' static > > + fmt:: Display ,
98
98
{
99
99
type Error = Error ;
@@ -135,14 +135,13 @@ where
135
135
handler : F ,
136
136
) -> Result < ( ) , Error >
137
137
where
138
- F : Handler < A , B > + Send + Sync + ' static ,
139
- <F as Handler < A , B > >:: Fut : Future < Output = Result < B , <F as Handler < A , B > >:: Error > > + Send + ' static ,
140
- <F as Handler < A , B > >:: Error : fmt:: Display + Send + Sync + ' static ,
141
- A : for < ' de > Deserialize < ' de > + Send + Sync + ' static ,
142
- B : Serialize + Send + Sync + ' static ,
138
+ F : Handler < A , B > ,
139
+ <F as Handler < A , B > >:: Fut : Future < Output = Result < B , <F as Handler < A , B > >:: Error > > ,
140
+ <F as Handler < A , B > >:: Error : fmt:: Display ,
141
+ A : for < ' de > Deserialize < ' de > ,
142
+ B : Serialize ,
143
143
{
144
144
let client = & self . client ;
145
- let handler = Arc :: new ( handler) ;
146
145
tokio:: pin!( incoming) ;
147
146
while let Some ( event) = incoming. next ( ) . await {
148
147
trace ! ( "New event arrived (run loop)" ) ;
@@ -154,12 +153,10 @@ where
154
153
trace ! ( "{}" , std:: str :: from_utf8( & body) ?) ; // this may be very verbose
155
154
let body = serde_json:: from_slice ( & body) ?;
156
155
157
- let handler = Arc :: clone ( & handler) ;
158
156
let request_id = & ctx. request_id . clone ( ) ;
159
- #[ allow( clippy:: async_yields_async) ]
160
- let task = tokio:: spawn ( async move { handler. call ( body, ctx) } ) ;
157
+ let task = panic:: catch_unwind ( panic:: AssertUnwindSafe ( || handler. call ( body, ctx) ) ) ;
161
158
162
- let req = match task. await {
159
+ let req = match task {
163
160
Ok ( response) => match response. await {
164
161
Ok ( response) => {
165
162
trace ! ( "Ok response from handler (run loop)" ) ;
@@ -181,18 +178,21 @@ where
181
178
. into_req ( )
182
179
}
183
180
} ,
184
- Err ( err) if err . is_panic ( ) => {
181
+ Err ( err) => {
185
182
error ! ( "{:?}" , err) ; // inconsistent with other log record formats - to be reviewed
186
183
EventErrorRequest {
187
184
request_id,
188
185
diagnostic : Diagnostic {
189
186
error_type : type_name_of_val ( & err) . to_owned ( ) ,
190
- error_message : format ! ( "Lambda panicked: {}" , err) ,
187
+ error_message : if let Some ( msg) = err. downcast_ref :: < & str > ( ) {
188
+ format ! ( "Lambda panicked: {}" , msg)
189
+ } else {
190
+ format ! ( "Lambda panicked" )
191
+ } ,
191
192
} ,
192
193
}
193
194
. into_req ( )
194
195
}
195
- Err ( _) => unreachable ! ( "tokio::task should not be canceled" ) ,
196
196
} ;
197
197
let req = req?;
198
198
client. call ( req) . await . expect ( "Unable to send response to Runtime APIs" ) ;
@@ -291,11 +291,11 @@ where
291
291
/// ```
292
292
pub async fn run < A , B , F > ( handler : F ) -> Result < ( ) , Error >
293
293
where
294
- F : Handler < A , B > + Send + Sync + ' static ,
295
- <F as Handler < A , B > >:: Fut : Future < Output = Result < B , <F as Handler < A , B > >:: Error > > + Send + ' static ,
296
- <F as Handler < A , B > >:: Error : fmt:: Display + Send + Sync + ' static ,
297
- A : for < ' de > Deserialize < ' de > + Send + Sync + ' static ,
298
- B : Serialize + Send + Sync + ' static ,
294
+ F : Handler < A , B > ,
295
+ <F as Handler < A , B > >:: Fut : Future < Output = Result < B , <F as Handler < A , B > >:: Error > > ,
296
+ <F as Handler < A , B > >:: Error : fmt:: Display ,
297
+ A : for < ' de > Deserialize < ' de > ,
298
+ B : Serialize ,
299
299
{
300
300
trace ! ( "Loading config from env" ) ;
301
301
let config = Config :: from_env ( ) ?;
0 commit comments