Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 44 additions & 32 deletions src/execution/evaluator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,56 +202,61 @@ impl<'a> ScopeEntry<'a> {
fn get_local_key_field<'b>(
key_val: &'b value::KeyPart,
indices: &'_ [u32],
) -> &'b value::KeyPart {
if indices.is_empty() {
) -> Result<&'b value::KeyPart> {
let result = if indices.is_empty() {
key_val
} else if let value::KeyPart::Struct(fields) = key_val {
Self::get_local_key_field(&fields[indices[0] as usize], &indices[1..])
Self::get_local_key_field(&fields[indices[0] as usize], &indices[1..])?
} else {
panic!("Only struct can be accessed by sub field");
}
bail!("Only struct can be accessed by sub field");
};
Ok(result)
}

fn get_local_field<'b>(
val: &'b value::Value<ScopeValueBuilder>,
indices: &'_ [u32],
) -> &'b value::Value<ScopeValueBuilder> {
if indices.is_empty() {
) -> Result<&'b value::Value<ScopeValueBuilder>> {
let result = if indices.is_empty() {
val
} else if let value::Value::Null = val {
val
} else if let value::Value::Struct(fields) = val {
Self::get_local_field(&fields.fields[indices[0] as usize], &indices[1..])
Self::get_local_field(&fields.fields[indices[0] as usize], &indices[1..])?
} else {
panic!("Only struct can be accessed by sub field");
}
bail!("Only struct can be accessed by sub field");
};
Ok(result)
}

fn get_value_field_builder(
&self,
field_ref: &AnalyzedLocalFieldReference,
) -> &value::Value<ScopeValueBuilder> {
) -> Result<&value::Value<ScopeValueBuilder>> {
let first_index = field_ref.fields_idx[0] as usize;
let index_base = self.key.value_field_index_base();
let val = self.value.fields[(first_index - index_base) as usize]
.get()
.unwrap();
.ok_or_else(|| anyhow!("Field {} is not set", first_index))?;
Self::get_local_field(val, &field_ref.fields_idx[1..])
}

fn get_field(&self, field_ref: &AnalyzedLocalFieldReference) -> value::Value {
fn get_field(&self, field_ref: &AnalyzedLocalFieldReference) -> Result<value::Value> {
let first_index = field_ref.fields_idx[0] as usize;
let index_base = self.key.value_field_index_base();
if first_index < index_base {
let key_val = self.key.key().unwrap();
let result = if first_index < index_base {
let key_val = self.key.key().ok_or_else(|| anyhow!("Key is not set"))?;
let key_part =
Self::get_local_key_field(&key_val[first_index], &field_ref.fields_idx[1..]);
Self::get_local_key_field(&key_val[first_index], &field_ref.fields_idx[1..])?;
key_part.clone().into()
} else {
let val = self.value.fields[(first_index - index_base) as usize]
.get()
.unwrap();
let val_part = Self::get_local_field(val, &field_ref.fields_idx[1..]);
.ok_or_else(|| anyhow!("Field {} is not set", first_index))?;
let val_part = Self::get_local_field(val, &field_ref.fields_idx[1..])?;
value::Value::from_alternative_ref(val_part)
}
};
Ok(result)
}

fn get_field_schema(
Expand Down Expand Up @@ -289,28 +294,29 @@ impl<'a> ScopeEntry<'a> {
fn assemble_value(
value_mapping: &AnalyzedValueMapping,
scoped_entries: RefList<'_, &ScopeEntry<'_>>,
) -> value::Value {
match value_mapping {
) -> Result<value::Value> {
let result = match value_mapping {
AnalyzedValueMapping::Constant { value } => value.clone(),
AnalyzedValueMapping::Field(field_ref) => scoped_entries
.headn(field_ref.scope_up_level as usize)
.unwrap()
.get_field(&field_ref.local),
.ok_or_else(|| anyhow!("Invalid scope_up_level: {}", field_ref.scope_up_level))?
.get_field(&field_ref.local)?,
AnalyzedValueMapping::Struct(mapping) => {
let fields = mapping
.fields
.iter()
.map(|f| assemble_value(f, scoped_entries))
.collect();
.collect::<Result<Vec<_>>>()?;
value::Value::Struct(value::FieldValues { fields })
}
}
};
Ok(result)
}

fn assemble_input_values<'a>(
value_mappings: &'a [AnalyzedValueMapping],
scoped_entries: RefList<'a, &ScopeEntry<'a>>,
) -> impl Iterator<Item = value::Value> + 'a {
) -> impl Iterator<Item = Result<value::Value>> + 'a {
value_mappings
.iter()
.map(move |value_mapping| assemble_value(value_mapping, scoped_entries))
Expand Down Expand Up @@ -370,8 +376,9 @@ async fn evaluate_op_scope(
}

let mut input_values = Vec::with_capacity(op.inputs.len());
input_values
.extend(assemble_input_values(&op.inputs, scoped_entries).collect::<Vec<_>>());
for value in assemble_input_values(&op.inputs, scoped_entries) {
input_values.push(value?);
}

let result = if op.function_exec_info.enable_cache {
let output_value_cell = memory.get_cache_entry(
Expand Down Expand Up @@ -416,8 +423,9 @@ async fn evaluate_op_scope(
_ => bail!("Expect target field to be a table"),
};

let target_field = head_scope.get_value_field_builder(&op.local_field_ref);
let target_field = head_scope.get_value_field_builder(&op.local_field_ref)?;
let task_futs = match target_field {
value::Value::Null => vec![],
value::Value::UTable(v) => v
.iter()
.map(|item| {
Expand Down Expand Up @@ -489,7 +497,9 @@ async fn evaluate_op_scope(
let field_values_iter = assemble_input_values(&op.input.fields, scoped_entries);
if op.has_auto_uuid_field {
field_values.push(value::Value::Null);
field_values.extend(field_values_iter);
for value in field_values_iter {
field_values.push(value?);
}
let uuid = memory.next_uuid(
op.fingerprinter
.clone()
Expand All @@ -498,7 +508,9 @@ async fn evaluate_op_scope(
)?;
field_values[0] = value::Value::Basic(value::BasicValue::Uuid(uuid));
} else {
field_values.extend(field_values_iter);
for value in field_values_iter {
field_values.push(value?);
}
};
let collector_entry = scoped_entries
.headn(op.collector_ref.scope_up_level as usize)
Expand Down Expand Up @@ -639,6 +651,6 @@ pub async fn evaluate_transient_flow(
let output_value = assemble_value(
&flow.execution_plan.output_value,
RefList::Nil.prepend(&root_scope_entry),
);
)?;
Ok(output_value)
}