Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 28 additions & 44 deletions implants/imix/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct AsyncTask {
pub start_time: DateTime<Utc>,
pub grpc_task: Task,
pub print_reciever: Receiver<String>,
pub error_reciever: Receiver<String>,
}

async fn handle_exec_tome(
Expand Down Expand Up @@ -54,6 +55,7 @@ async fn handle_exec_tome(
pub async fn handle_exec_timeout_and_response(
task: Task,
print_channel_sender: Sender<String>,
error_channel_sender: Sender<String>,
timeout: Option<Duration>,
) -> Result<(), Error> {
// Tasks will be forcebly stopped after 1 week.
Expand All @@ -80,15 +82,18 @@ pub async fn handle_exec_timeout_and_response(
print_channel_sender
.clone()
.send(format!("---[RESULT]----\n{}\n---------", tome_result.0))?;
print_channel_sender
print_channel_sender // Temporary - pending UI updates
.clone()
.send(format!("---[ERROR]----\n{}\n--------", tome_result.1))?;
error_channel_sender.clone().send(tome_result.1)?;
Ok(())
}

#[cfg(test)]
mod tests {
use super::handle_exec_tome;
use crate::tasks::drain_sender;

use super::{handle_exec_timeout_and_response, handle_exec_tome};
use anyhow::Result;
use c2::pb::Task;
use std::collections::HashMap;
Expand Down Expand Up @@ -125,51 +130,30 @@ print(sys.shell(input_params["cmd"])["stdout"])
Ok(())
}

#[test]
fn imix_handle_exec_tome_error() -> Result<()> {
let test_tome_input = Task {
id: 123,
eldritch: r#"
aoeu
#[tokio::test]
async fn imix_handle_exec_tome_error() -> Result<()> {
let (print_sender, print_reciever) = channel::<String>();
let (error_sender, error_reciever) = channel::<String>();
let _res = handle_exec_timeout_and_response(
Task {
id: 123,
eldritch: r#"print(no_var)
"#
.to_string(),
parameters: HashMap::new(),
file_names: Vec::new(),
quest_name: "test_quest".to_string(),
};

let runtime: tokio::runtime::Runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let (sender, receiver) = channel::<String>();
.to_string(),
parameters: HashMap::from([]),
file_names: Vec::from([]),
quest_name: "Poggers".to_string(),
},
print_sender,
error_sender,
None,
)
.await?;

let exec_future = handle_exec_tome(test_tome_input, sender.clone());
let (eld_output, eld_error) = runtime.block_on(exec_future)?;
let task_channel_error = drain_sender(&error_reciever)?;
let _task_channel_output = drain_sender(&print_reciever)?;

let mut index = 0;
loop {
let cmd_output = match receiver.recv_timeout(Duration::from_millis(500)) {
Ok(local_res_string) => local_res_string,
Err(local_err) => {
match local_err.to_string().as_str() {
"channel is empty and sending half is closed" => {
break;
}
"timed out waiting on channel" => break,
_ => eprint!("Error: {}", local_err),
}
break;
}
};
assert_eq!(cmd_output, "".to_string());

index = index + 1;
}

assert_eq!(eld_output, "".to_string());
assert_eq!(eld_error, "[eldritch] Eldritch eval_module failed:\nerror: Variable `aoeu` not found\n --> 123:2:1\n |\n2 | aoeu\n | ^^^^\n |\n".to_string());
assert!(task_channel_error.contains(&"Variable `no_var` not found".to_string()));
Ok(())
}

Expand Down
92 changes: 52 additions & 40 deletions implants/imix/src/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,37 @@ use anyhow::{Context, Result};
use c2::pb::c2_manual_client::TavernClient;
use c2::pb::{
Agent, Beacon, ClaimTasksRequest, Host, ReportTaskOutputRequest, ReportTaskOutputResponse,
Task, TaskOutput,
Task, TaskError, TaskOutput,
};
use chrono::Utc;
use std::sync::mpsc::channel;
use std::sync::mpsc::{channel, Receiver};
use tokio::task;
use tonic::Status;

pub fn drain_sender(reciever: &Receiver<String>) -> Result<String> {
let mut channel_res: Vec<String> = Vec::new();
loop {
let new_res_line = match reciever.recv_timeout(Duration::from_millis(100)) {
Ok(local_res_string) => local_res_string,
Err(local_err) => {
match local_err.to_string().as_str() {
"channel is empty and sending half is closed" => {
break;
}
"timed out waiting on channel" => {
break;
}
_ => eprint!("Error: {}", local_err),
}
break;
}
};
// let appended_line = format!("{}{}", res.to_owned(), new_res_line);
channel_res.push(new_res_line);
}
Ok(channel_res.join(""))
}

pub async fn get_new_tasks(
agent_properties: AgentProperties,
imix_config: Config,
Expand Down Expand Up @@ -63,8 +87,13 @@ pub async fn start_new_tasks(
eprintln!("Launching:\n{:?}", task.clone().eldritch);

let (sender, receiver) = channel::<String>();
let exec_with_timeout =
handle_exec_timeout_and_response(task.clone(), sender.clone(), None);
let (error_sender, error_receiver) = channel::<String>();
let exec_with_timeout = handle_exec_timeout_and_response(
task.clone(),
sender.clone(),
error_sender.clone(),
None,
);

#[cfg(debug_assertions)]
eprintln!(
Expand All @@ -80,6 +109,7 @@ pub async fn start_new_tasks(
start_time: Utc::now(),
grpc_task: task.clone(),
print_reciever: receiver,
error_reciever: error_receiver,
},
) {
Some(_old_task) => {
Expand All @@ -106,38 +136,9 @@ fn queue_task_output(
async_task: &AsyncTask,
task_id: TaskID,
running_task_res_map: &mut HashMap<TaskID, Vec<TaskOutput>>,
loop_start_time: Instant,
) {
let mut task_channel_output: Vec<String> = Vec::new();

loop {
#[cfg(debug_assertions)]
eprintln!(
"[{}]: Task # {} recieving output",
(Instant::now() - loop_start_time).as_millis(),
task_id
);
let new_res_line = match async_task
.print_reciever
.recv_timeout(Duration::from_millis(100))
{
Ok(local_res_string) => local_res_string,
Err(local_err) => {
match local_err.to_string().as_str() {
"channel is empty and sending half is closed" => {
break;
}
"timed out waiting on channel" => {
break;
}
_ => eprint!("Error: {}", local_err),
}
break;
}
};
// let appended_line = format!("{}{}", res.to_owned(), new_res_line);
task_channel_output.push(new_res_line);
}
) -> Result<()> {
let task_channel_output = drain_sender(&async_task.print_reciever)?;
let task_channel_error = drain_sender(&async_task.error_reciever)?;

let task_is_finished = async_task.future_join_handle.is_finished();
let task_response_exec_finished_at = match task_is_finished {
Expand All @@ -147,6 +148,14 @@ fn queue_task_output(

// If the task is finished or there's new data queue a new task result.
if task_is_finished || task_channel_output.len() > 0 {
let task_error = if task_channel_error.len() > 0 {
Some(TaskError {
msg: task_channel_error,
})
} else {
None
};

let task_response = TaskOutput {
id: async_task.grpc_task.id.clone(),
exec_started_at: Some(prost_types::Timestamp {
Expand All @@ -160,15 +169,18 @@ fn queue_task_output(
}),
None => None,
},
output: task_channel_output.join(""),
error: None,
output: task_channel_output,
error: task_error,
};

running_task_res_map
.entry(task_id)
.and_modify(|cur_list| cur_list.push(task_response.clone()))
.and_modify(|cur_list| {
cur_list.push(task_response.clone());
})
.or_insert(vec![task_response]);
}
Ok(())
}

pub async fn submit_task_output(
Expand All @@ -189,7 +201,7 @@ pub async fn submit_task_output(
);

// Loop over each line of output from the task and append it the the channel output.
queue_task_output(async_task, *task_id, running_task_res_map, loop_start_time);
queue_task_output(async_task, *task_id, running_task_res_map)?;
}

// Iterate over queued task results and send them back to the server
Expand Down