Skip to content

Commit

Permalink
feat: Make hotcache creation optional
Browse files Browse the repository at this point in the history
  • Loading branch information
ppodolsky committed Sep 14, 2023
1 parent d1bf9e1 commit 6493a4e
Show file tree
Hide file tree
Showing 14 changed files with 48 additions and 151 deletions.
3 changes: 3 additions & 0 deletions aiosumma/aiosumma/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ async def attach_index(
async def commit_index(
self,
index_name: str,
with_hotcache: bool,
request_id: Optional[str] = None,
session_id: Optional[str] = None,
) -> index_service_pb.CommitIndexResponse:
Expand All @@ -120,6 +121,7 @@ async def commit_index(
Args:
index_name: index name
with_hotcache: create hot cache after commit that improves startup performance but makes committing slower
request_id: request id
session_id: session id
Returns:
Expand All @@ -128,6 +130,7 @@ async def commit_index(
return await self.stubs['index_api'].commit_index(
index_service_pb.CommitIndexRequest(
index_name=index_name,
with_hotcache=with_hotcache,
),
metadata=setup_metadata(session_id, request_id),
)
Expand Down
55 changes: 2 additions & 53 deletions docs/apis/grpc-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ parent: APIs
- [MergePolicy](#summa-proto-MergePolicy)
- [MergeSegmentsRequest](#summa-proto-MergeSegmentsRequest)
- [MergeSegmentsResponse](#summa-proto-MergeSegmentsResponse)
- [PrimaryKey](#summa-proto-PrimaryKey)
- [RemoteEngineConfig](#summa-proto-RemoteEngineConfig)
- [RemoteEngineConfig.HeadersTemplateEntry](#summa-proto-RemoteEngineConfig-HeadersTemplateEntry)
- [SetIndexAliasRequest](#summa-proto-SetIndexAliasRequest)
Expand All @@ -94,7 +93,6 @@ parent: APIs
- [CollectorOutput](#summa-proto-CollectorOutput)
- [CountCollector](#summa-proto-CountCollector)
- [CountCollectorOutput](#summa-proto-CountCollectorOutput)
- [CustomOrder](#summa-proto-CustomOrder)
- [DisjunctionMaxQuery](#summa-proto-DisjunctionMaxQuery)
- [DocumentsCollectorOutput](#summa-proto-DocumentsCollectorOutput)
- [EmptyQuery](#summa-proto-EmptyQuery)
Expand All @@ -104,7 +102,6 @@ parent: APIs
- [FacetCollectorOutput](#summa-proto-FacetCollectorOutput)
- [FacetCollectorOutput.FacetCountsEntry](#summa-proto-FacetCollectorOutput-FacetCountsEntry)
- [Highlight](#summa-proto-Highlight)
- [Key](#summa-proto-Key)
- [MatchQuery](#summa-proto-MatchQuery)
- [MatchQueryBooleanShouldMode](#summa-proto-MatchQueryBooleanShouldMode)
- [MatchQueryDisjuctionMaxMode](#summa-proto-MatchQueryDisjuctionMaxMode)
Expand Down Expand Up @@ -466,6 +463,7 @@ Store the state of index to the storage
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| index_name | [string](#string) | | |
| with_hotcache | [bool](#bool) | | |



Expand Down Expand Up @@ -684,6 +682,7 @@ Request a stream of all documents from the index
| ----- | ---- | ----- | ----------- |
| index_name | [string](#string) | | |
| fields | [string](#string) | repeated | |
| query_filter | [Query](#summa-proto-Query) | optional | |



Expand Down Expand Up @@ -1058,22 +1057,6 @@ Merge policy that describes how to merge committed segments



<a name="summa-proto-PrimaryKey"></a>

### PrimaryKey
Possible primary keys


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| str | [string](#string) | | |
| i64 | [int64](#int64) | | |






<a name="summa-proto-RemoteEngineConfig"></a>

### RemoteEngineConfig
Expand Down Expand Up @@ -1457,24 +1440,6 @@ Collectors and CollectorOutputs



<a name="summa-proto-CustomOrder"></a>

### CustomOrder



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| key | [Empty](#summa-proto-Empty) | | |
| count | [Empty](#summa-proto-Empty) | | |
| sub_aggregation | [string](#string) | | |
| order | [Order](#summa-proto-Order) | | |






<a name="summa-proto-DisjunctionMaxQuery"></a>

### DisjunctionMaxQuery
Expand Down Expand Up @@ -1612,22 +1577,6 @@ Collectors and CollectorOutputs



<a name="summa-proto-Key"></a>

### Key



| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| str | [string](#string) | | |
| f64 | [double](#double) | | |






<a name="summa-proto-MatchQuery"></a>

### MatchQuery
Expand Down
1 change: 1 addition & 0 deletions examples/proto_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ async fn main() -> Result<(), tonic::Status> {
index_api_client
.commit_index(proto::CommitIndexRequest {
index_name: "test_index".to_string(),
with_hotcache: false,
})
.await?;
let search_response = search_api_client
Expand Down
23 changes: 10 additions & 13 deletions summa-core/src/components/default_tokenizers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -338,19 +338,16 @@ pub fn default_tokenizers() -> [(String, TextAnalyzer); 7] {
.build();
let summa_dict_tokenizer = TextAnalyzer::builder(DictTokenizer::new()).build();
let summa_html_tokenizer = TextAnalyzer::builder(HtmlTokenizer::new(
HashSet::from_iter(vec!["formula".to_string(), "figure".to_string(), "math".to_string(), "ref".to_string()].into_iter()),
HashSet::from_iter(
vec![
"sup".to_string(),
"sub".to_string(),
"i".to_string(),
"b".to_string(),
"u".to_string(),
"scp".to_string(),
"tt".to_string(),
]
.into_iter(),
),
HashSet::from_iter(vec!["formula".to_string(), "figure".to_string(), "math".to_string(), "ref".to_string()]),
HashSet::from_iter(vec![
"sup".to_string(),
"sub".to_string(),
"i".to_string(),
"b".to_string(),
"u".to_string(),
"scp".to_string(),
"tt".to_string(),
]),
))
.filter(RemoveLongFilter::limit(100))
.filter(LowerCaser)
Expand Down
20 changes: 9 additions & 11 deletions summa-core/src/components/index_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -525,11 +525,10 @@ impl IndexHolder {
span.in_scope(|| {
let store_reader = segment_reader.get_store_reader(1)?;
for document in store_reader.iter(segment_reader.alive_bitset()) {
let Ok(document) = document
else {
info!(action = "broken_document", document = ?document);
return Ok::<_, Error>(());
};
let Ok(document) = document else {
info!(action = "broken_document", document = ?document);
return Ok::<_, Error>(());
};
if let Some(document) = documents_modifier(document) {
if tx.blocking_send(document).is_err() {
info!(action = "documents_client_dropped");
Expand Down Expand Up @@ -565,11 +564,10 @@ impl IndexHolder {
span.in_scope(|| {
for doc_address in docs {
let document = searcher.doc(doc_address);
let Ok(document) = document
else {
info!(action = "broken_document", document = ?document);
return Ok::<_, Error>(());
};
let Ok(document) = document else {
info!(action = "broken_document", document = ?document);
return Ok::<_, Error>(());
};
if let Some(document) = documents_modifier(document) {
if tx.blocking_send(document).is_err() {
info!(action = "documents_client_dropped");
Expand Down Expand Up @@ -598,7 +596,7 @@ pub mod tests {
use serde_json::json;
use summa_proto::proto;
use summa_proto::proto::ConflictStrategy;
use tantivy::collector::Count;
use tantivy::collector::{Count, TopDocs};
use tantivy::query::{AllQuery, TermQuery};
use tantivy::schema::IndexRecordOption;
use tantivy::{doc, Document, IndexBuilder, Term};
Expand Down
25 changes: 1 addition & 24 deletions summa-core/src/components/index_writer_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ impl IndexWriterHolder {
}

/// Delete index by its unique fields
pub(super) fn resolve_conflicts(&self, document: &mut Document, conflict_strategy: proto::ConflictStrategy) -> SummaResult<Option<u64>> {
pub(super) fn resolve_conflicts(&self, document: &Document, conflict_strategy: proto::ConflictStrategy) -> SummaResult<Option<u64>> {
if self.unique_fields.is_empty() || matches!(conflict_strategy, proto::ConflictStrategy::DoNothing) {
return Ok(None);
}
Expand Down Expand Up @@ -461,27 +461,4 @@ impl IndexWriterHolder {
}
Ok(opstamp)
}

pub fn lock_files(&mut self, with_hotcache: bool) -> SummaResult<Vec<String>> {
let mut segment_files = vec![".managed.json".to_string(), "meta.json".to_string()];
let opstamp = self.commit_and_prepare(with_hotcache)?;
if with_hotcache {
segment_files.push(format!("hotcache.{}.bin", opstamp))
}
segment_files.extend(self.get_index_files()?);
Ok(segment_files)
}

/// Get segments
fn get_index_files(&self) -> SummaResult<impl Iterator<Item = String> + '_> {
Ok(self.index().searchable_segments()?.into_iter().flat_map(|segment| {
tantivy::SegmentComponent::iterator()
.filter_map(|segment_component| {
let filepath = segment.meta().relative_path(*segment_component);
let file_name = filepath.to_string_lossy().to_string();
self.index().directory().exists(&filepath).expect("cannot parse").then_some(file_name)
})
.collect::<Vec<_>>()
}))
}
}
14 changes: 3 additions & 11 deletions summa-core/src/components/query_parser/summa_ql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,7 @@ impl QueryParser {
}

if field_type.value_type() != Type::Json && !full_path.is_empty() {
return Err(QueryParserError::NonJsonFieldWithPath(format!(
"{}.{}",
field_entry.name().to_string(),
full_path
)));
return Err(QueryParserError::NonJsonFieldWithPath(format!("{}.{}", field_entry.name(), full_path)));
}

if !(field_type.is_indexed() || matches!(pre_term.as_rule(), Rule::range) && field_type.is_fast()) {
Expand Down Expand Up @@ -713,9 +709,7 @@ impl QueryParser {
let field_boost = self.query_parser_config.0.field_boosts.get(field_entry.name()).copied();
match field_entry.field_type() {
FieldType::Str(ref str_option) => {
let Some(option) = str_option.get_indexing_options() else {
return None
};
let option = str_option.get_indexing_options()?;
let terms = match self.parse_words(field, full_path, option, &top_level_phrase) {
Ok(terms) => terms,
Err(err) => return Some(Err(err)),
Expand All @@ -726,9 +720,7 @@ impl QueryParser {
})
}
FieldType::JsonObject(ref json_option) => {
let Some(option) = json_option.get_text_indexing_options() else {
return None
};
let option = str_option.get_indexing_options()?;
let terms = match self.parse_words(field, full_path, option, &top_level_phrase) {
Ok(terms) => terms,
Err(err) => return Some(Err(err)),
Expand Down
2 changes: 1 addition & 1 deletion summa-core/src/directories/caching_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl FileStats {

pub fn inc_gen(&self, path: &Path, new_len: Option<u64>) -> RwLockWriteGuard<'_, RawRwLock, HashMap<PathBuf, FileStat>> {
let mut write_lock = self.0.write();
write_lock.entry(path.to_path_buf()).or_insert_with(Default::default).inc_gen(new_len);
write_lock.entry(path.to_path_buf()).or_default().inc_gen(new_len);
write_lock
}

Expand Down
9 changes: 1 addition & 8 deletions summa-proto/proto/index_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,6 @@ service IndexApi {
rpc warmup_index (WarmupIndexRequest) returns (WarmupIndexResponse) {}
}

// Possible primary keys
message PrimaryKey {
oneof value {
string str = 1;
int64 i64 = 2;
}
}

// Merge policy that describes how to merge committed segments
message MergePolicy {
oneof merge_policy {
Expand Down Expand Up @@ -87,6 +79,7 @@ message AttachIndexResponse {
// Store the state of index to the storage
message CommitIndexRequest {
string index_name = 1;
bool with_hotcache = 2;
}

// Returned data from the commit command
Expand Down
18 changes: 0 additions & 18 deletions summa-proto/proto/query.proto
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
syntax = "proto3";
package summa.proto;

import "utils.proto";

message TermFieldMapperConfig {
repeated string fields = 1;
}
Expand Down Expand Up @@ -131,22 +129,6 @@ message ExistsQuery {
string field = 1;
}

message CustomOrder {
oneof order_target {
Empty key = 1;
Empty count = 2;
string sub_aggregation = 3;
};
Order order = 4;
}

message Key {
oneof key {
string str = 1;
double f64 = 2;
}
}

enum Occur {
should = 0;
must = 1;
Expand Down
7 changes: 5 additions & 2 deletions summa-server/src/apis/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ impl proto::index_api_server::IndexApi for IndexApiImpl {

async fn commit_index(&self, proto_request: Request<proto::CommitIndexRequest>) -> Result<Response<proto::CommitIndexResponse>, Status> {
let now = Instant::now();
let index_holder = self.index_service.get_index_holder(&proto_request.into_inner().index_name).await?;
self.index_service.commit_and_restart_consumption(&index_holder).await?;
let proto_request = proto_request.into_inner();
let index_holder = self.index_service.get_index_holder(&proto_request.index_name).await?;
self.index_service
.commit_and_restart_consumption(&index_holder, proto_request.with_hotcache)
.await?;
Ok(Response::new(proto::CommitIndexResponse {
elapsed_secs: now.elapsed().as_secs_f64(),
}))
Expand Down
2 changes: 1 addition & 1 deletion summa-server/src/components/consumer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ impl ConsumerManager {
consumer_thread.stop().await?;
let stopped_consumption = StoppedConsumption { consumer_thread };
let mut index_writer_holder = index_holder.index_writer_holder()?.clone().write_owned().await;
tokio::task::spawn_blocking(move || index_writer_holder.commit_and_prepare(true)).await??;
tokio::task::spawn_blocking(move || index_writer_holder.commit_and_prepare(false)).await??;
stopped_consumption.commit_offsets().await?;
Ok(())
}))
Expand Down
2 changes: 2 additions & 0 deletions summa-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ mod tests {
index_api_client
.commit_index(proto::CommitIndexRequest {
index_name: "test_index".to_string(),
with_hotcache: false,
})
.await?;
index_api_client
Expand All @@ -218,6 +219,7 @@ mod tests {
index_api_client
.commit_index(proto::CommitIndexRequest {
index_name: "test_index".to_string(),
with_hotcache: false,
})
.await?;
Ok(r)
Expand Down
Loading

0 comments on commit 6493a4e

Please sign in to comment.