Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion BinaryOptionsToolsV2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ mod validator;

// use config::PyConfig;
use logs::{LogBuilder, Logger, StreamLogsIterator, StreamLogsLayer, start_tracing};
use pocketoption::{RawPocketOption, RawStreamIterator, StreamIterator};
use pocketoption::{RawPocketOption, RawStreamIterator, StreamIterator, RawHandle, RawHandler};
use pyo3::prelude::*;
use validator::RawValidator;

Expand All @@ -25,6 +25,8 @@ fn BinaryOptionsTools(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<StreamIterator>()?;
m.add_class::<RawStreamIterator>()?;
m.add_class::<RawValidator>()?;
m.add_class::<RawHandle>()?;
m.add_class::<RawHandler>()?;
// m.add_class::<PyConfig>()?;

m.add_function(wrap_pyfunction!(start_tracing, m)?)?;
Expand Down
164 changes: 164 additions & 0 deletions BinaryOptionsToolsV2/src/pocketoption.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ pub struct RawStreamIterator {
stream: Arc<Mutex<Fuse<BoxStream<'static, PocketResult<String>>>>>,
}

#[pyclass]
#[derive(Clone)]
pub struct RawHandle {
handle: binary_options_tools::pocketoption::modules::raw::RawHandle,
}

#[pyclass]
pub struct RawHandler {
handler: Arc<Mutex<binary_options_tools::pocketoption::modules::raw::RawHandler>>,
}

#[pymethods]
impl RawPocketOption {
#[new]
Expand Down Expand Up @@ -559,6 +570,39 @@ impl RawPocketOption {
async move { Ok(client.server_time().await.timestamp()) },
)
}

/// Get a handle to the Raw module for custom message processing.
/// Returns a RawHandle that can be used to create RawHandlers.
pub fn raw_handle<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let client = self.client.clone();
future_into_py(py, async move {
let handle = client.raw_handle().await.map_err(BinaryErrorPy::from)?;
Python::with_gil(|py| RawHandle { handle }.into_py_any(py))
})
}

/// Create a RawHandler bound to a validator.
/// Returns a RawHandler that can send and receive messages.
pub fn create_raw_handler<'py>(
&self,
py: Python<'py>,
validator: Bound<'py, RawValidator>,
keep_alive_message: Option<String>,
) -> PyResult<Bound<'py, PyAny>> {
let client = self.client.clone();
let validator = validator.get().clone();
future_into_py(py, async move {
let crate_validator: CrateValidator = validator.into();
let keep_alive = keep_alive_message.map(|msg| {
binary_options_tools::pocketoption::modules::raw::Outgoing::Text(msg)
});
let handler = client
.create_raw_handler(crate_validator, keep_alive)
.await
.map_err(BinaryErrorPy::from)?;
Python::with_gil(|py| RawHandler { handler: Arc::new(Mutex::new(handler)) }.into_py_any(py))
})
}
}

#[pymethods]
Expand Down Expand Up @@ -616,3 +660,123 @@ impl RawStreamIterator {
})
}
}

#[pymethods]
impl RawHandle {
/// Create a new RawHandler bound to the given validator
pub fn create<'py>(
&self,
py: Python<'py>,
validator: Bound<'py, RawValidator>,
keep_alive_message: Option<String>,
) -> PyResult<Bound<'py, PyAny>> {
let handle = self.handle.clone();
let validator = validator.get().clone();
future_into_py(py, async move {
let crate_validator: CrateValidator = validator.into();
let keep_alive = keep_alive_message.map(|msg| {
binary_options_tools::pocketoption::modules::raw::Outgoing::Text(msg)
});
let handler = handle
.create(crate_validator, keep_alive)
.await
.map_err(BinaryErrorPy::from)?;
Python::with_gil(|py| RawHandler { handler: Arc::new(Mutex::new(handler)) }.into_py_any(py))
})
}

/// Remove an existing handler by ID
pub fn remove<'py>(&self, py: Python<'py>, id: String) -> PyResult<Bound<'py, PyAny>> {
let handle = self.handle.clone();
future_into_py(py, async move {
let uuid = Uuid::parse_str(&id).map_err(BinaryErrorPy::from)?;
let existed = handle.remove(uuid).await.map_err(BinaryErrorPy::from)?;
Ok(existed)
})
}
}

#[pymethods]
impl RawHandler {
/// Get the handler's ID
pub fn id(&self) -> String {
let handler = self.handler.blocking_lock();
handler.id().to_string()
}

/// Send a text message
pub fn send_text<'py>(
&self,
py: Python<'py>,
text: String,
) -> PyResult<Bound<'py, PyAny>> {
let handler = self.handler.clone();
future_into_py(py, async move {
let handler = handler.lock().await;
handler.send_text(text).await.map_err(BinaryErrorPy::from)?;
Ok(())
})
}

/// Send a binary message
pub fn send_binary<'py>(
&self,
py: Python<'py>,
data: Vec<u8>,
) -> PyResult<Bound<'py, PyAny>> {
let handler = self.handler.clone();
future_into_py(py, async move {
let handler = handler.lock().await;
handler.send_binary(data).await.map_err(BinaryErrorPy::from)?;
Ok(())
})
}

/// Send a message and wait for the next matching response
pub fn send_and_wait<'py>(
&self,
py: Python<'py>,
message: String,
) -> PyResult<Bound<'py, PyAny>> {
let handler = self.handler.clone();
future_into_py(py, async move {
let handler = handler.lock().await;
let msg = binary_options_tools::pocketoption::modules::raw::Outgoing::Text(message);
let response = handler
.send_and_wait(msg)
.await
.map_err(BinaryErrorPy::from)?;
Ok(arc_message_to_string(&response))
})
}

/// Wait for the next message that matches this handler's validator
pub fn wait_next<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let handler = self.handler.clone();
future_into_py(py, async move {
let handler = handler.lock().await;
let response = handler.wait_next().await.map_err(BinaryErrorPy::from)?;
Ok(arc_message_to_string(&response))
})
}

/// Subscribe to messages matching this handler's validator
/// Returns an iterator that yields matching messages
pub fn subscribe<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let handler = self.handler.blocking_lock();
let receiver = handler.subscribe();

// Create a boxed stream that yields String values
let boxed_stream = async_stream::stream! {
while let Ok(msg) = receiver.recv().await {
let msg_str = arc_message_to_string(&msg);
yield Ok(msg_str);
}
}
.boxed()
.fuse();

let stream = Arc::new(Mutex::new(boxed_stream));
RawStreamIterator { stream }.into_bound_py_any(py)
}
}
Loading