File tree Expand file tree Collapse file tree 3 files changed +44
-5
lines changed
Expand file tree Collapse file tree 3 files changed +44
-5
lines changed Original file line number Diff line number Diff line change @@ -62,9 +62,15 @@ impl Connection for SqliteConnection {
6262
6363 type Options = SqliteConnectOptions ;
6464
65- fn close ( self ) -> BoxFuture < ' static , Result < ( ) , Error > > {
66- // nothing explicit to do; connection will close in drop
67- Box :: pin ( future:: ok ( ( ) ) )
65+ fn close ( mut self ) -> BoxFuture < ' static , Result < ( ) , Error > > {
66+ Box :: pin ( async move {
67+ let shutdown = self . worker . shutdown ( ) ;
68+ // Drop the statement worker and any outstanding statements, which should
69+ // cover all references to the connection handle outside of the worker thread
70+ drop ( self ) ;
71+ // Ensure the worker thread has terminated
72+ shutdown. await
73+ } )
6874 }
6975
7076 fn ping ( & mut self ) -> BoxFuture < ' _ , Result < ( ) , Error > > {
Original file line number Diff line number Diff line change @@ -30,6 +30,9 @@ enum StatementWorkerCommand {
3030 statement : Weak < StatementHandle > ,
3131 tx : oneshot:: Sender < ( ) > ,
3232 } ,
33+ Shutdown {
34+ tx : oneshot:: Sender < ( ) > ,
35+ } ,
3336}
3437
3538impl StatementWorker {
@@ -72,6 +75,13 @@ impl StatementWorker {
7275 let _ = tx. send ( ( ) ) ;
7376 }
7477 }
78+ StatementWorkerCommand :: Shutdown { tx } => {
79+ // drop the connection reference before sending confirmation
80+ // and ending the command loop
81+ drop ( conn) ;
82+ let _ = tx. send ( ( ) ) ;
83+ return ;
84+ }
7585 }
7686 }
7787
@@ -127,4 +137,25 @@ impl StatementWorker {
127137 rx. await . map_err ( |_| Error :: WorkerCrashed )
128138 }
129139 }
140+
141+ /// Send a command to the worker to shut down the processing thread.
142+ ///
143+ /// A `WorkerCrashed` error may be returned if the thread has already stopped.
144+ /// Subsequent calls to `step()`, `reset()`, or this method will fail with
145+ /// `WorkerCrashed`. Ensure that any associated statements are dropped first.
146+ pub ( crate ) fn shutdown ( & mut self ) -> impl Future < Output = Result < ( ) , Error > > {
147+ let ( tx, rx) = oneshot:: channel ( ) ;
148+
149+ let send_res = self
150+ . tx
151+ . send ( StatementWorkerCommand :: Shutdown { tx } )
152+ . map_err ( |_| Error :: WorkerCrashed ) ;
153+
154+ async move {
155+ send_res?;
156+
157+ // wait for the response
158+ rx. await . map_err ( |_| Error :: WorkerCrashed )
159+ }
160+ }
130161}
Original file line number Diff line number Diff line change @@ -206,7 +206,8 @@ async fn it_executes_with_pool() -> anyhow::Result<()> {
206206async fn it_opens_in_memory ( ) -> anyhow:: Result < ( ) > {
207207 // If the filename is ":memory:", then a private, temporary in-memory database
208208 // is created for the connection.
209- let _ = SqliteConnection :: connect ( ":memory:" ) . await ?;
209+ let conn = SqliteConnection :: connect ( ":memory:" ) . await ?;
210+ conn. close ( ) . await ?;
210211
211212 Ok ( ( ) )
212213}
@@ -215,7 +216,8 @@ async fn it_opens_in_memory() -> anyhow::Result<()> {
215216async fn it_opens_temp_on_disk ( ) -> anyhow:: Result < ( ) > {
216217 // If the filename is an empty string, then a private, temporary on-disk database will
217218 // be created.
218- let _ = SqliteConnection :: connect ( "" ) . await ?;
219+ let conn = SqliteConnection :: connect ( "" ) . await ?;
220+ conn. close ( ) . await ?;
219221
220222 Ok ( ( ) )
221223}
You can’t perform that action at this time.
0 commit comments