Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions docs/docusaurus.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,6 @@ const config = {
position: "right",
label: "Examples",
},
{
href: "https://prospective.co/blog",
label: "Blog",
position: "right",
},
{
href: "https://github.com/finos/perspective",
label: "GitHub",
Expand Down
26 changes: 26 additions & 0 deletions docs/md/how_to/python/multithreading.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,29 @@ app = Application(
]
)
```

## `on_poll_request`

`on_poll_request` is an optional keyword argument for `Server()`, which which
can be applied in cases where overlapping `Table.update` calls can be safely
deferred.

When providing a callback function to `on_poll_request`, the `Server` will
invoke your callback when there are updates that need to be flushed, after which
you must _eventually_ call `Server.poll` (or else no updates will be processed).

The exact implementation of `on_poll_request` will depend on the context. A
simple example which batches calls via `threading.Lock`:

```python
lock = threading.Lock()

def on_poll_request(perspective_server):
if lock.acquire(blocking=False):
try:
perspective_server.poll()
finally:
lock.release()

server = Server(on_poll_request=on_poll_request)
```
6 changes: 6 additions & 0 deletions examples/python-tornado-streaming/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ def poll_target(lock, perspective_server):
lock.release()


# `on_poll_request` is an optional keyword argument for `Server()`, which
# enables an optimization which can be applied in cases when deferring
# overlapping `Table.update` calls can be done safely.
#
# In This case, `Table.update` is always called every `TICK_RATE`, so it is safe
# to defer these updates as they'll get applied when the next tick occurs.
def on_poll_request(lock, executor, perspective_server):
if lock.acquire(blocking=False):
executor.submit(poll_target, lock, perspective_server)
Expand Down
35 changes: 29 additions & 6 deletions rust/perspective-python/src/server/server_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::sync::Arc;

use futures::future::BoxFuture;
#[cfg(doc)]
use perspective_client::Table;
use perspective_client::{Client, Table};
use perspective_server::ServerResult;
use pollster::FutureExt;
use pyo3::IntoPyObjectExt;
Expand All @@ -27,6 +27,13 @@ use crate::client::client_async::AsyncClient;

/// An instance of a Perspective server. Each [`Server`] instance is separate,
/// and does not share [`Table`] (or other) data with other [`Server`]s.
///
/// # Arguments
///
/// - `on_poll_request` A callback function which the `Server` will invoke when
/// there are updates that need to be flushed, after which you must
/// _eventually_ call [`Server::poll`] (or else no updates will be processed).
/// This optimization allows batching updates, depending on context.
#[pyclass(subclass, module = "perspective")]
#[derive(Clone)]
pub struct Server {
Expand Down Expand Up @@ -72,10 +79,17 @@ impl Server {
Ok(client)
}

/// Create a new [`Session`] bound to this [`Server`].
/// Create a [`Session`] for this [`Server`], suitable for exactly one
/// [`Client`] (not necessarily in this process). A [`Session`] represents
/// the server-side state of a single client-to-server connection.
///
/// [`Server::new_session`] only needs to be called if you've implemented
/// a custom Perspective ['Client`]/[`Server`] transport.
/// # Arguments
///
/// - `session_handler` - An implementor of [`SessionHandler`] which will be
/// invoked by the [`Server`] when a response message needs to be sent to
/// the [`Client`]. The response itself should be passed to
/// [`Client::handle_response`] eventually, though it may-or-may-not be in
/// the same process.
pub fn new_session(&self, _py: Python, response_cb: Py<PyAny>) -> PySession {
let session = self
.server
Expand All @@ -90,8 +104,17 @@ impl Server {
/// [`View::on_update`] callbacks.
///
/// [`Server::poll`] only needs to be called if you've implemented
/// a custom Perspective ['Client`]/[`Server`] transport and provided the
/// `on_poll_request` constructor keyword argument.
/// a custom Perspective [`Server`] and provided the `on_poll_request`
/// constructor keyword argument.
///
/// Calling [`Session::poll`] may result in the `send_response` parameter
/// which was used to construct this (or other) [`Session`] to fire.
/// Whenever a [`Session::handle_request`] method is invoked for a
/// `perspective_server::Server`, at least one [`Session::poll`] should be
/// scheduled to clear other clients message queues.
///
/// `poll()` _must_ be called after [`Table::update`] or [`Table::remove`]
/// and `on_poll_request` is notified, or the changes will not be applied.
pub fn poll(&self, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| {
self.server
Expand Down
17 changes: 15 additions & 2 deletions rust/perspective-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ pub trait SessionHandler: Send + Sync {
pub struct Server {
pub(crate) server: Arc<ffi::Server>,
pub(crate) callbacks: Arc<RwLock<HashMap<u32, SessionCallback>>>,

pub(crate) on_poll_request: Option<OnPollRequestCallback>,
}

Expand All @@ -66,6 +65,15 @@ impl std::fmt::Debug for Server {
}

impl Server {
/// Create a new [`Server`].
///
/// # Arguments
///
/// - `on_poll_request` A callback function which the `Server` will invoke
/// when there are updates that need to be flushed, after which you must
/// _eventually_ call [`Server::poll`] (or else no updates will be
/// processed). This optimization allows batching updates, depending on
/// context.
pub fn new(on_poll_request: Option<OnPollRequestCallback>) -> Self {
let server = Arc::new(ffi::Server::new(on_poll_request.is_some()));
let callbacks = Arc::default();
Expand Down Expand Up @@ -126,21 +134,26 @@ impl Server {
.await
}

/// Create a new [`Client`] instance bound to this [`Server`] directly.
pub fn new_local_client(&self) -> LocalClient {
LocalClient::new(self)
}

/// Flush any pending messages which may have resulted from previous
/// [`Session::handle_request`] calls.
///
/// [`Server::poll`] only needs to be called if you've implemented
/// a custom Perspective [`Server`] and provided the `on_poll_request`
/// constructor keyword argument.
///
/// Calling [`Session::poll`] may result in the `send_response` parameter
/// which was used to construct this (or other) [`Session`] to fire.
/// Whenever a [`Session::handle_request`] method is invoked for a
/// `perspective_server::Server`, at least one [`Session::poll`] should be
/// scheduled to clear other clients message queues.
///
/// `poll()` _must_ be called after [`Table::update`] or [`Table::remove`]
/// and `on_poll_request` set, or these changes will not be applied.
/// and `on_poll_request` is notified, or the changes will not be applied.
pub async fn poll(&self) -> Result<(), ServerError> {
let responses = self.server.poll();
let mut results = Vec::with_capacity(responses.size());
Expand Down
Loading