Skip to content

Commit 850c943

Browse files
refactor(rust): Take sync parameter in Writeable::close() (#25475)
1 parent c169eaa commit 850c943

File tree

7 files changed

+43
-65
lines changed

7 files changed

+43
-65
lines changed

crates/polars-io/src/utils/file.rs

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -128,20 +128,31 @@ impl Writeable {
128128
}
129129
}
130130

131-
pub fn sync_on_close(&mut self, sync_on_close: SyncOnCloseType) -> std::io::Result<()> {
131+
pub fn sync_all(&self) -> io::Result<()> {
132132
match self {
133-
Writeable::Dyn(d) => {
134-
crate::utils::sync_on_close::sync_on_close_dyn(sync_on_close, d.as_mut())
135-
},
136-
Writeable::Local(file) => {
137-
crate::utils::sync_on_close::sync_on_close(sync_on_close, file)
138-
},
133+
Self::Dyn(v) => v.sync_all(),
134+
Self::Local(v) => v.sync_all(),
139135
#[cfg(feature = "cloud")]
140-
Writeable::Cloud(_) => Ok(()),
136+
Self::Cloud(v) => v.sync_all(),
141137
}
142138
}
143139

144-
pub fn close(self) -> std::io::Result<()> {
140+
pub fn sync_data(&self) -> io::Result<()> {
141+
match self {
142+
Self::Dyn(v) => v.sync_data(),
143+
Self::Local(v) => v.sync_data(),
144+
#[cfg(feature = "cloud")]
145+
Self::Cloud(v) => v.sync_data(),
146+
}
147+
}
148+
149+
pub fn close(self, sync: SyncOnCloseType) -> std::io::Result<()> {
150+
match sync {
151+
SyncOnCloseType::All => self.sync_all()?,
152+
SyncOnCloseType::Data => self.sync_data()?,
153+
SyncOnCloseType::None => {},
154+
}
155+
145156
match self {
146157
Self::Dyn(mut v) => v.close(),
147158
Self::Local(v) => close_file(v),
@@ -236,22 +247,29 @@ mod async_writeable {
236247
Writeable::try_new(path, cloud_options).and_then(|x| x.try_into_async_writeable())
237248
}
238249

239-
pub async fn sync_on_close(
240-
&mut self,
241-
sync_on_close: SyncOnCloseType,
242-
) -> std::io::Result<()> {
250+
pub async fn sync_all(&mut self) -> io::Result<()> {
243251
match self {
244-
Self::Dyn(d) => task::block_in_place(|| {
245-
crate::utils::sync_on_close::sync_on_close_dyn(sync_on_close, d.0.as_mut())
246-
}),
247-
Self::Local(file) => {
248-
crate::utils::sync_on_close::tokio_sync_on_close(sync_on_close, file).await
249-
},
252+
Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_all()),
253+
Self::Local(v) => v.sync_all().await,
254+
Self::Cloud(_) => Ok(()),
255+
}
256+
}
257+
258+
pub async fn sync_data(&mut self) -> io::Result<()> {
259+
match self {
260+
Self::Dyn(v) => task::block_in_place(|| v.0.as_ref().sync_data()),
261+
Self::Local(v) => v.sync_data().await,
250262
Self::Cloud(_) => Ok(()),
251263
}
252264
}
253265

254-
pub async fn close(self) -> PolarsResult<()> {
266+
pub async fn close(mut self, sync: SyncOnCloseType) -> PolarsResult<()> {
267+
match sync {
268+
SyncOnCloseType::All => self.sync_all().await?,
269+
SyncOnCloseType::Data => self.sync_data().await?,
270+
SyncOnCloseType::None => {},
271+
}
272+
255273
match self {
256274
Self::Dyn(mut v) => {
257275
v.shutdown().await.map_err(PolarsError::from)?;
Lines changed: 0 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,3 @@
1-
use std::{fs, io};
2-
3-
use crate::utils::file::WriteableTrait;
4-
51
#[derive(Clone, Copy, PartialEq, Eq, Debug, Default, Hash)]
62
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
73
#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
@@ -15,34 +11,3 @@ pub enum SyncOnCloseType {
1511
/// Synce the file contents and the metadata.
1612
All,
1713
}
18-
19-
pub fn sync_on_close(sync_on_close: SyncOnCloseType, file: &mut fs::File) -> io::Result<()> {
20-
match sync_on_close {
21-
SyncOnCloseType::None => Ok(()),
22-
SyncOnCloseType::Data => file.sync_data(),
23-
SyncOnCloseType::All => file.sync_all(),
24-
}
25-
}
26-
27-
pub fn sync_on_close_dyn(
28-
sync_on_close: SyncOnCloseType,
29-
file: &mut dyn WriteableTrait,
30-
) -> io::Result<()> {
31-
match sync_on_close {
32-
SyncOnCloseType::None => Ok(()),
33-
SyncOnCloseType::Data => file.sync_data(),
34-
SyncOnCloseType::All => file.sync_all(),
35-
}
36-
}
37-
38-
#[cfg(feature = "tokio")]
39-
pub async fn tokio_sync_on_close(
40-
sync_on_close: SyncOnCloseType,
41-
file: &mut tokio::fs::File,
42-
) -> io::Result<()> {
43-
match sync_on_close {
44-
SyncOnCloseType::None => Ok(()),
45-
SyncOnCloseType::Data => file.sync_data().await,
46-
SyncOnCloseType::All => file.sync_all().await,
47-
}
48-
}

crates/polars-mem-engine/src/planner/lp.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -372,8 +372,7 @@ fn create_physical_plan_impl(
372372
_ => panic!("enable filetype feature"),
373373
}
374374

375-
file.sync_on_close(unified_sink_args.sync_on_close)?;
376-
file.close()?;
375+
file.close(unified_sink_args.sync_on_close)?;
377376

378377
Ok(None)
379378
}),

crates/polars-stream/src/nodes/io_sinks/csv.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,8 +110,7 @@ impl SinkNode for CsvSinkNode {
110110
}
111111
}
112112

113-
file.sync_on_close(sink_options.sync_on_close).await?;
114-
file.close().await?;
113+
file.close(sink_options.sync_on_close).await?;
115114

116115
PolarsResult::Ok(())
117116
});

crates/polars-stream/src/nodes/io_sinks/ipc.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,8 +169,7 @@ impl SinkNode for IpcSinkNode {
169169
writer.finish()?;
170170
drop(writer);
171171

172-
file.sync_on_close(sink_options.sync_on_close)?;
173-
file.close()?;
172+
file.close(sink_options.sync_on_close)?;
174173

175174
PolarsResult::Ok(())
176175
});

crates/polars-stream/src/nodes/io_sinks/json.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,7 @@ impl SinkNode for NDJsonSinkNode {
8080
}
8181
}
8282

83-
file.sync_on_close(sink_options.sync_on_close).await?;
84-
file.close().await?;
83+
file.close(sink_options.sync_on_close).await?;
8584

8685
PolarsResult::Ok(())
8786
});

crates/polars-stream/src/nodes/io_sinks/parquet.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,7 @@ impl SinkNode for ParquetSinkNode {
163163
let file_size = writer.finish()?;
164164
drop(writer);
165165

166-
file.sync_on_close(sink_options.sync_on_close)?;
167-
file.close()?;
166+
file.close(sink_options.sync_on_close)?;
168167

169168
output_file_size.store(file_size);
170169
PolarsResult::Ok(())

0 commit comments

Comments
 (0)