Skip to content
Merged

Dev #41

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,12 @@ libc = "0.2"
pdf-extract = "0.10.0"
lopdf = "0.34"

# DOCX processing
zip = "2.2"
roxmltree = "0.20"

# Random number generation
rand = "0.8"

# BM25 scoring
bm25 = { version = "2.3.2", features = ["parallelism"] }

# HTML parsing
scraper = "0.22"

# Python bindings
pyo3 = { version = "0.22", features = ["extension-module"] }

Expand Down
6 changes: 5 additions & 1 deletion examples/rust/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ async fn main() -> vectorless::Result<()> {
" Node: {} — {} keyword(s), top: {:?}",
node.title,
node.top_keywords.len(),
node.top_keywords.iter().take(3).map(|kw| &kw.keyword).collect::<Vec<_>>()
node.top_keywords
.iter()
.take(3)
.map(|kw| &kw.keyword)
.collect::<Vec<_>>()
);

// Show edges (connected documents)
Expand Down
21 changes: 12 additions & 9 deletions examples/rust/indexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@ async fn main() -> vectorless::Result<()> {

// 2. Index a single document with default options
println!("--- Single document (default mode) ---");
let result = engine
.index(IndexContext::from_path("./README.md"))
.await?;
let result = engine.index(IndexContext::from_path("./README.md")).await?;

for item in &result.items {
println!(" doc_id: {}", item.doc_id);
Expand Down Expand Up @@ -60,10 +58,7 @@ async fn main() -> vectorless::Result<()> {
// 3. Re-index with incremental mode — should detect no change
println!("\n--- Re-index (incremental, unchanged) ---");
let result2 = engine
.index(
IndexContext::from_path("./README.md")
.with_mode(IndexMode::Incremental),
)
.index(IndexContext::from_path("./README.md").with_mode(IndexMode::Incremental))
.await?;

for item in &result2.items {
Expand All @@ -86,8 +81,16 @@ async fn main() -> vectorless::Result<()> {
batch.failed.len()
);
for item in &batch.items {
let time = item.metrics.as_ref().map(|m| m.total_time_ms()).unwrap_or(0);
let nodes = item.metrics.as_ref().map(|m| m.nodes_processed).unwrap_or(0);
let time = item
.metrics
.as_ref()
.map(|m| m.total_time_ms())
.unwrap_or(0);
let nodes = item
.metrics
.as_ref()
.map(|m| m.nodes_processed)
.unwrap_or(0);
println!(" {} — {}ms, {} nodes", item.name, time, nodes);
}

Expand Down
5 changes: 4 additions & 1 deletion examples/rust/markdownflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
for query in queries {
println!(" Query: \"{}\"", query);

match client.query(QueryContext::new(query).with_doc_id(&doc_id)).await {
match client
.query(QueryContext::new(query).with_doc_id(&doc_id))
.await
{
Ok(result) => {
if let Some(item) = result.single() {
if item.content.is_empty() {
Expand Down
4 changes: 1 addition & 3 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,10 +171,8 @@ fn parse_format(format: &str) -> PyResult<DocumentFormat> {
match format.to_lowercase().as_str() {
"markdown" | "md" => Ok(DocumentFormat::Markdown),
"pdf" => Ok(DocumentFormat::Pdf),
"docx" | "doc" => Ok(DocumentFormat::Docx),
"html" | "htm" => Ok(DocumentFormat::Html),
_ => Err(PyErr::from(VectorlessError::new(
format!("Unknown format: {}. Supported: markdown, pdf, docx, html", format),
format!("Unknown format: {}. Supported: markdown, pdf", format),
"config",
))),
}
Expand Down
11 changes: 0 additions & 11 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,6 @@ path = "../examples/rust/basic.rs"
name = "advanced"
path = "../examples/rust/advanced.rs"

[[example]]
name = "cli_tool"
path = "../examples/rust/cli_tool.rs"

[[example]]
name = "custom_config"
path = "../examples/rust/custom_config.rs"
Expand Down Expand Up @@ -113,19 +109,12 @@ libc = { workspace = true }
pdf-extract = { workspace = true }
lopdf = { workspace = true }

# DOCX processing
zip = { workspace = true }
roxmltree = { workspace = true }

# Random number generation
rand = { workspace = true }

# BM25 scoring
bm25 = { workspace = true }

# HTML parsing
scraper = { workspace = true }

[dev-dependencies]
tempfile = { workspace = true }
tokio-test = { workspace = true }
Expand Down
121 changes: 76 additions & 45 deletions rust/src/client/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,14 @@ use crate::index::PipelineOptions;
use crate::index::incremental::{self, IndexAction};
use crate::retrieval::{PipelineRetriever, RetrieveEventReceiver};
use crate::storage::{PersistedDocument, Workspace};
use crate::utils::fingerprint::Fingerprint;
use crate::{DocumentTree, Error};

use super::events::EventEmitter;
use super::index_context::{IndexContext, IndexSource};
use super::indexer::IndexerClient;
use super::query_context::{QueryContext, QueryScope};
use super::retriever::RetrieverClient;
use super::types::{DocumentInfo, FailedItem, IndexItem, IndexMode, IndexResult, QueryResult, QueryResultItem};
use super::types::{DocumentInfo, FailedItem, IndexItem, IndexMode, IndexResult, QueryResult};
use super::workspace::WorkspaceClient;

/// The main Engine client.
Expand Down Expand Up @@ -160,7 +159,9 @@ impl Engine {
// Single source: no need for concurrency overhead
if ctx.sources.len() == 1 {
let source = &ctx.sources[0];
let (items, failed) = self.process_source(source, &ctx.options, ctx.name.as_deref()).await;
let (items, failed) = self
.process_source(source, &ctx.options, ctx.name.as_deref())
.await;
if items.is_empty() && !failed.is_empty() {
return Err(Error::Config(format!(
"All {} source(s) failed to index",
Expand All @@ -176,21 +177,26 @@ impl Engine {
}

// Multiple sources: parallel indexing
let concurrency = self.config.concurrency.max_concurrent_requests.min(ctx.sources.len());

let results: Vec<(Vec<IndexItem>, Vec<FailedItem>)> =
futures::stream::iter(&ctx.sources)
.map(|source| {
let options = ctx.options.clone();
let name = ctx.name.clone();
let engine = self.clone();
async move {
engine.process_source(source, &options, name.as_deref()).await
}
})
.buffer_unordered(concurrency)
.collect()
.await;
let concurrency = self
.config
.concurrency
.max_concurrent_requests
.min(ctx.sources.len());

let results: Vec<(Vec<IndexItem>, Vec<FailedItem>)> = futures::stream::iter(&ctx.sources)
.map(|source| {
let options = ctx.options.clone();
let name = ctx.name.clone();
let engine = self.clone();
async move {
engine
.process_source(source, &options, name.as_deref())
.await
}
})
.buffer_unordered(concurrency)
.collect()
.await;

let mut items = Vec::new();
let mut failed = Vec::new();
Expand Down Expand Up @@ -252,12 +258,18 @@ impl Engine {
doc.format.clone(),
doc.description.clone(),
doc.page_count,
).with_metrics_opt(metrics);
let persisted = self.indexer.to_persisted_with_options(doc, &pipeline_options);
)
.with_metrics_opt(metrics);
let persisted = self
.indexer
.to_persisted_with_options(doc, &pipeline_options);

if let Some(ref workspace) = self.workspace {
if let Err(e) = workspace.save(&persisted).await {
return (Vec::new(), vec![FailedItem::new(&source_label, e.to_string())]);
return (
Vec::new(),
vec![FailedItem::new(&source_label, e.to_string())],
);
}
// Clean up old document after successful save (atomic: save-first, then remove old)
if let Some(old_id) = &existing_id {
Expand All @@ -270,11 +282,17 @@ impl Engine {
}
Err(e) => {
tracing::warn!("Failed to index {}: {}", source_label, e);
(Vec::new(), vec![FailedItem::new(&source_label, e.to_string())])
(
Vec::new(),
vec![FailedItem::new(&source_label, e.to_string())],
)
}
}
}
Ok(IndexAction::IncrementalUpdate { old_tree, existing_id }) => {
Ok(IndexAction::IncrementalUpdate {
old_tree,
existing_id,
}) => {
info!("Incremental update for: {}", source_label);
match self
.indexer
Expand All @@ -291,13 +309,19 @@ impl Engine {
doc.format.clone(),
doc.description.clone(),
doc.page_count,
).with_metrics_opt(metrics);
let persisted = self.indexer.to_persisted_with_options(doc, &pipeline_options);
)
.with_metrics_opt(metrics);
let persisted = self
.indexer
.to_persisted_with_options(doc, &pipeline_options);

if let Some(ref workspace) = self.workspace {
// save() is atomic (write-lock + put), no need to remove first
if let Err(e) = workspace.save(&persisted).await {
return (Vec::new(), vec![FailedItem::new(&source_label, e.to_string())]);
return (
Vec::new(),
vec![FailedItem::new(&source_label, e.to_string())],
);
}
}

Expand All @@ -306,13 +330,19 @@ impl Engine {
}
Err(e) => {
tracing::warn!("Incremental update failed for {}: {}", source_label, e);
(Vec::new(), vec![FailedItem::new(&source_label, e.to_string())])
(
Vec::new(),
vec![FailedItem::new(&source_label, e.to_string())],
)
}
}
}
Err(e) => {
tracing::warn!("Failed to resolve action for {}: {}", source_label, e);
(Vec::new(), vec![FailedItem::new(&source_label, e.to_string())])
(
Vec::new(),
vec![FailedItem::new(&source_label, e.to_string())],
)
}
}
}
Expand Down Expand Up @@ -416,13 +446,20 @@ impl Engine {
pub async fn query_stream(&self, ctx: QueryContext) -> Result<RetrieveEventReceiver> {
let doc_id = match &ctx.scope {
QueryScope::Single(id) => id.clone(),
_ => return Err(Error::Config("query_stream requires a single doc_id".to_string())),
_ => {
return Err(Error::Config(
"query_stream requires a single doc_id".to_string(),
));
}
};

let tree = self.get_structure(&doc_id).await?;
let options = ctx.to_retrieve_options(&self.config);

let rx = self.retriever.query_stream(&tree, &ctx.query, &options).await?;
let rx = self
.retriever
.query_stream(&tree, &ctx.query, &options)
.await?;

Ok(rx)
}
Expand Down Expand Up @@ -524,15 +561,13 @@ impl Engine {
fn build_pipeline_options(
&self,
options: &super::types::IndexOptions,
format: crate::parser::DocumentFormat,
format: crate::index::parse::DocumentFormat,
) -> PipelineOptions {
use crate::index::SummaryStrategy;
PipelineOptions {
mode: match format {
crate::parser::DocumentFormat::Markdown => crate::index::IndexMode::Markdown,
crate::parser::DocumentFormat::Pdf => crate::index::IndexMode::Pdf,
crate::parser::DocumentFormat::Html => crate::index::IndexMode::Html,
crate::parser::DocumentFormat::Docx => crate::index::IndexMode::Docx,
crate::index::parse::DocumentFormat::Markdown => crate::index::IndexMode::Markdown,
crate::index::parse::DocumentFormat::Pdf => crate::index::IndexMode::Pdf,
},
generate_ids: options.generate_ids,
summary_strategy: if options.generate_summaries {
Expand Down Expand Up @@ -628,8 +663,8 @@ impl Engine {
return Ok(IndexAction::Skip(incremental::SkipInfo {
doc_id: existing_id,
name,
format: crate::parser::DocumentFormat::from_extension(&format_str)
.unwrap_or(crate::parser::DocumentFormat::Markdown),
format: crate::index::parse::DocumentFormat::from_extension(&format_str)
.unwrap_or(crate::index::parse::DocumentFormat::Markdown),
description: desc,
page_count: pages,
}));
Expand All @@ -646,17 +681,13 @@ impl Engine {
None => return Ok(IndexAction::FullIndex { existing_id: None }),
};

let format = crate::parser::DocumentFormat::from_extension(&stored_doc.meta.format)
.unwrap_or(crate::parser::DocumentFormat::Markdown);
let format = crate::index::parse::DocumentFormat::from_extension(&stored_doc.meta.format)
.unwrap_or(crate::index::parse::DocumentFormat::Markdown);
let pipeline_options = self.build_pipeline_options(options, format);

// If logic fingerprint changed, remove old doc before full reprocess
let action = incremental::resolve_action(
&current_bytes,
&stored_doc,
&pipeline_options,
format,
);
let action =
incremental::resolve_action(&current_bytes, &stored_doc, &pipeline_options, format);

// Note: if FullIndex, old doc cleanup happens in process_source()
// after successful save (save-first, then remove old).
Expand Down
6 changes: 3 additions & 3 deletions rust/src/client/index_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@

use std::path::PathBuf;

use crate::parser::DocumentFormat;
use crate::index::parse::DocumentFormat;

use super::types::{IndexMode, IndexOptions};

Expand Down Expand Up @@ -149,10 +149,10 @@ impl IndexContext {
/// Create from a directory path.
///
/// Indexes all supported files in the directory (non-recursive).
/// Supported extensions: `.md`, `.pdf`, `.docx`, `.html`, `.txt`.
/// Supported extensions: `.md`, `.pdf`, `.txt`.
pub fn from_dir(dir: impl Into<PathBuf>) -> Self {
let dir = dir.into();
let supported_extensions = ["md", "markdown", "pdf", "docx", "html", "htm", "txt"];
let supported_extensions = ["md", "markdown", "pdf", "txt"];

let mut sources = Vec::new();
if let Ok(entries) = std::fs::read_dir(&dir) {
Expand Down
Loading