Skip to content

Commit c344843

Browse files
committed
add comments part 2
1 parent 1b0501c commit c344843

File tree

1 file changed

+70
-31
lines changed

1 file changed

+70
-31
lines changed

datafusion-examples/examples/embedding_parquet_indexes.rs

Lines changed: 70 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -104,10 +104,71 @@ use tempfile::TempDir;
104104
/// ```
105105
/// DistinctIndexTable is a custom TableProvider that reads Parquet files
106106
107+
#[derive(Debug, Clone)]
108+
struct DistinctIndex {
109+
inner: HashSet<String>,
110+
}
111+
112+
impl DistinctIndex {
113+
// Init from iterator of distinct values
114+
pub fn new<I: IntoIterator<Item = String>>(iter: I) -> Self {
115+
Self {
116+
inner: iter.into_iter().collect(),
117+
}
118+
}
119+
120+
// serialize the distinct index to a writer
121+
fn serialize<W: Write + Send>(
122+
&self,
123+
arrow_writer: &mut ArrowWriter<W>,
124+
) -> Result<()> {
125+
let distinct: HashSet<_> = self.inner.iter().collect();
126+
let serialized = distinct
127+
.into_iter()
128+
.map(|s| s.as_str())
129+
.collect::<Vec<&str>>()
130+
.join("\n");
131+
let index_bytes = serialized.into_bytes();
132+
133+
// Set the offset for the index
134+
let offset = arrow_writer.bytes_written();
135+
let index_len = index_bytes.len() as u64;
136+
137+
println!("Writing custom index at offset: {offset}, length: {index_len}");
138+
// Write the index magic and length to the file
139+
arrow_writer.write_all(b"IDX1")?;
140+
arrow_writer.write_all(&index_len.to_le_bytes())?;
141+
142+
// Write the index bytes
143+
arrow_writer.write_all(&index_bytes)?;
144+
145+
// Append metadata about the index to the Parquet file footer
146+
arrow_writer.append_key_value_metadata(KeyValue::new(
147+
"distinct_index_offset".to_string(),
148+
offset.to_string(),
149+
));
150+
arrow_writer.append_key_value_metadata(KeyValue::new(
151+
"distinct_index_length".to_string(),
152+
index_bytes.len().to_string(),
153+
));
154+
Ok(())
155+
}
156+
157+
// create a new distinct index from the specified bytes
158+
fn new_from_bytes(serialized: &[u8]) -> Result<Self> {
159+
let s = String::from_utf8(serialized.to_vec())
160+
.map_err(|e| ParquetError::General(e.to_string()))?;
161+
162+
Ok(Self {
163+
inner: s.lines().map(|s| s.to_string()).collect(),
164+
})
165+
}
166+
}
167+
107168
#[derive(Debug)]
108169
struct DistinctIndexTable {
109170
schema: SchemaRef,
110-
index: HashMap<String, HashSet<String>>,
171+
index: HashMap<String, DistinctIndex>,
111172
dir: PathBuf,
112173
}
113174

@@ -154,10 +215,6 @@ fn write_file_with_index(path: &Path, values: &[&str]) -> Result<()> {
154215
let arr: ArrayRef = Arc::new(StringArray::from(values.to_vec()));
155216
let batch = RecordBatch::try_new(schema.clone(), vec![arr])?;
156217

157-
let distinct: HashSet<_> = values.iter().copied().collect();
158-
let serialized = distinct.into_iter().collect::<Vec<_>>().join("\n");
159-
let index_bytes = serialized.into_bytes();
160-
161218
let file = File::create(path)?;
162219

163220
let mut writer = IndexedParquetWriter::try_new(file, schema.clone())?;
@@ -167,35 +224,18 @@ fn write_file_with_index(path: &Path, values: &[&str]) -> Result<()> {
167224
// Close row group
168225
writer.writer.flush()?;
169226

170-
// Set the offset for the index
171-
let offset = writer.writer.bytes_written();
172-
let index_len = index_bytes.len() as u64;
227+
let distinct_index: DistinctIndex =
228+
DistinctIndex::new(values.iter().map(|s| s.to_string()));
173229

174-
println!("Writing custom index at offset: {offset}, length: {index_len}");
175-
// Write the index magic and length to the file
176-
writer.writer.write_all(b"IDX1")?;
177-
writer.writer.write_all(&index_len.to_le_bytes())?;
178-
179-
// Write the index bytes
180-
writer.writer.write_all(&index_bytes)?;
181-
182-
// Append metadata about the index to the Parquet file footer
183-
writer.writer.append_key_value_metadata(KeyValue::new(
184-
"distinct_index_offset".to_string(),
185-
offset.to_string(),
186-
));
187-
writer.writer.append_key_value_metadata(KeyValue::new(
188-
"distinct_index_length".to_string(),
189-
index_bytes.len().to_string(),
190-
));
230+
distinct_index.serialize(&mut writer.writer)?;
191231

192232
writer.writer.close()?;
193233

194234
println!("Finished writing file to {}", path.display());
195235
Ok(())
196236
}
197237

198-
fn read_distinct_index(path: &Path) -> Result<HashSet<String>, ParquetError> {
238+
fn read_distinct_index(path: &Path) -> Result<DistinctIndex, ParquetError> {
199239
let mut file = File::open(path)?;
200240

201241
let file_size = file.metadata()?.len();
@@ -245,10 +285,9 @@ fn read_distinct_index(path: &Path) -> Result<HashSet<String>, ParquetError> {
245285
let mut index_buf = vec![0u8; length];
246286
file.read_exact(&mut index_buf)?;
247287

248-
let s =
249-
String::from_utf8(index_buf).map_err(|e| ParquetError::General(e.to_string()))?;
250-
251-
Ok(s.lines().map(|s| s.to_string()).collect())
288+
let index = DistinctIndex::new_from_bytes(&index_buf)
289+
.map_err(|e| ParquetError::General(e.to_string()))?;
290+
Ok(index)
252291
}
253292

254293
/// Implement TableProvider for DistinctIndexTable, using the distinct index to prune files
@@ -295,7 +334,7 @@ impl TableProvider for DistinctIndexTable {
295334
let keep: Vec<String> = self
296335
.index
297336
.iter()
298-
.filter(|(_f, set)| target.as_ref().is_none_or(|v| set.contains(v)))
337+
.filter(|(_f, set)| target.as_ref().is_none_or(|v| set.inner.contains(v)))
299338
.map(|(f, _)| f.clone())
300339
.collect();
301340

0 commit comments

Comments
 (0)