Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ target_include_directories(${PROJECT_NAME}

)
target_link_libraries(${PROJECT_NAME} PRIVATE Threads::Threads cluon-static)
if(UNIX)
if(NOT "${CMAKE_SYSTEM_NAME}" STREQUAL "Darwin")
target_link_libraries(${PROJECT_NAME} PRIVATE rt)
endif()
endif()
set_target_properties(${PROJECT_NAME} PROPERTIES OUTPUT_NAME _pycluon)
set_target_properties(${PROJECT_NAME} PROPERTIES CXX_STANDARD 14 CXX_STANDARD_REQUIRED true)
set_target_properties(${PROJECT_NAME} PROPERTIES CXX_STANDARD 17 CXX_STANDARD_REQUIRED true)

add_dependencies(${PROJECT_NAME} cluon-msc)
add_dependencies(${PROJECT_NAME} cluon-OD4toStdout)
Expand Down
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ So far, `pycluon` wraps the following concepts from libcluon:
* UDPReceiver
* TCPConnection
* TCPServer
* SharedMemory

It also bundles the following command-line applications:
* protoc
Expand All @@ -29,6 +30,7 @@ It also bundles the following command-line applications:
| 0.1.1 | 0.0.140 |
| 0.1.2 | 0.0.140 |
| 0.1.3 | 0.0.140 |
| 0.2.0 | 0.0.140 |

## Installation

Expand Down Expand Up @@ -83,4 +85,31 @@ while session.is_running():
time.sleep(0.01)
```

**Write to a shared memory area**
```python
from datetime import datetime
from pycluon import SharedMemory

sm = SharedMemory("frame.argb", 640*480)

sm.lock()
sm.timestamp = datetime.now()
sm.data = b"<bytes>"
sm.unlock()
sm.notify_all()
```

**Read from an existing shared memory area**
```python
from pycluon import SharedMemory

sm = SharedMemory("frame.argb")

sm.wait() # Wait for notification from writing process
sm.lock()
print(sm.timestamp)
print(sm.data)
sm.unlock()
```

See the [tests](tests/test_libcluon_wrappers.py#L87-L143) for usage of `UDPSender`, `UDPReceiver`, `TCPConnection` and `TCPServer`.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def read(fname):

setup(
name="pycluon",
version="0.1.3",
version="0.2.0",
license="Apache License 2.0",
description="A python wrapper around libcluon",
long_description=read("README.md"),
Expand Down
113 changes: 70 additions & 43 deletions src/pycluon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,36 @@
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>

#include <chrono>
#include <filesystem>
#include <string>
#include <thread>
#include <vector>

#include "cluon/Envelope.hpp"
#include "cluon/OD4Session.hpp"
#include "cluon/SharedMemory.hpp"
#include "cluon/TCPConnection.hpp"
#include "cluon/TCPServer.hpp"
#include "cluon/Time.hpp"
#include "cluon/UDPReceiver.hpp"
#include "cluon/UDPSender.hpp"
#include "cluon/cluonDataStructures.hpp"

namespace py = pybind11;
using namespace pybind11::literals;

std::chrono::system_clock::time_point to_timepoint(
const cluon::data::TimeStamp& ts) {
return std::chrono::system_clock::time_point(
std::chrono::microseconds(cluon::time::toMicroseconds(ts)));
}

cluon::data::TimeStamp from_timepoint(
const std::chrono::system_clock::time_point& tp) {
return cluon::time::convert(tp);
}

PYBIND11_MODULE(_pycluon, m) {
m.doc() = R"docs(
A python wrapper around libcluon
Expand All @@ -53,59 +70,31 @@ PYBIND11_MODULE(_pycluon, m) {
self.serializedData(serialized_data);
})
.def_property(
"sent",
"sent_at",
[](cluon::data::Envelope& envelope) {
double seconds = static_cast<double>(envelope.sent().seconds());
double microseconds =
static_cast<double>(envelope.sent().microseconds());
return seconds + microseconds / 1e6;
return to_timepoint(envelope.sent());
},
[](cluon::data::Envelope& envelope, double timestamp) {
size_t seconds = static_cast<size_t>(timestamp);
size_t microseconds = static_cast<size_t>(
(timestamp - static_cast<double>(seconds)) * 1e6);

cluon::data::TimeStamp tmp;
tmp.seconds(seconds);
tmp.microseconds(microseconds);
envelope.sent(std::move(tmp));
[](cluon::data::Envelope& envelope,
const std::chrono::system_clock::time_point& tp) {
envelope.sent(from_timepoint(tp));
})
.def_property(
"received",
"received_at",
[](cluon::data::Envelope& envelope) {
double seconds = static_cast<double>(envelope.received().seconds());
double microseconds =
static_cast<double>(envelope.received().microseconds());
return seconds + microseconds / 1e6;
return to_timepoint(envelope.received());
},
[](cluon::data::Envelope& envelope, double timestamp) {
size_t seconds = static_cast<size_t>(timestamp);
size_t microseconds = static_cast<size_t>(
(timestamp - static_cast<double>(seconds)) * 1e6);

cluon::data::TimeStamp tmp;
tmp.seconds(seconds);
tmp.microseconds(microseconds);
envelope.received(std::move(tmp));
[](cluon::data::Envelope& envelope,
const std::chrono::system_clock::time_point& tp) {
envelope.received(from_timepoint(tp));
})
.def_property(
"sampled",
"sampled_at",
[](cluon::data::Envelope& envelope) {
double seconds =
static_cast<double>(envelope.sampleTimeStamp().seconds());
double microseconds =
static_cast<double>(envelope.sampleTimeStamp().microseconds());
return seconds + microseconds / 1e6;
return to_timepoint(envelope.sampleTimeStamp());
},
[](cluon::data::Envelope& envelope, double timestamp) {
size_t seconds = static_cast<size_t>(timestamp);
size_t microseconds = static_cast<size_t>(
(timestamp - static_cast<double>(seconds)) * 1e6);

cluon::data::TimeStamp tmp;
tmp.seconds(seconds);
tmp.microseconds(microseconds);
envelope.sampleTimeStamp(std::move(tmp));
[](cluon::data::Envelope& envelope,
const std::chrono::system_clock::time_point& tp) {
envelope.sampleTimeStamp(from_timepoint(tp));
})
.def_property(
"sender_stamp",
Expand Down Expand Up @@ -165,4 +154,42 @@ PYBIND11_MODULE(_pycluon, m) {
std::shared_ptr<cluon::TCPConnection>)>>(),
"port"_a, "new_connection_delegate"_a)
.def("is_running", &cluon::TCPServer::isRunning);

// SharedMemory
py::class_<cluon::SharedMemory>(m, "SharedMemory")
.def(py::init<const std::string&, uint32_t>(), "name"_a, "size"_a = 0)
.def("is_locked", &cluon::SharedMemory::isLocked)
.def("lock", &cluon::SharedMemory::lock)
.def("unlock", &cluon::SharedMemory::unlock)
.def("wait", &cluon::SharedMemory::wait,
py::call_guard<py::gil_scoped_release>())
.def("notify_all", &cluon::SharedMemory::notifyAll)
.def("valid", &cluon::SharedMemory::valid)
.def("name", &cluon::SharedMemory::name)
.def_property(
"timestamp",
[](cluon::SharedMemory& self) {
auto [flag, ts] = self.getTimeStamp();
if (!flag) {
throw pybind11::buffer_error(
"The shared memory area is not locked!");
}
return to_timepoint(ts);
},
[](cluon::SharedMemory& self,
const std::chrono::system_clock::time_point& tp) {
if (!self.setTimeStamp(from_timepoint(tp))) {
throw pybind11::buffer_error(
"The shared memory area is not locked!");
}
})

.def_property(
"data",
[](cluon::SharedMemory& self) {
return py::bytes(std::string(self.data(), self.size()));
},
[](cluon::SharedMemory& self, py::bytes data) {
strcpy(self.data(), std::string(data).c_str());
});
}
86 changes: 77 additions & 9 deletions tests/test_libcluon_wrappers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import time
import gc
import sys
import threading
from datetime import datetime

import pytest

from pycluon._pycluon import (
Envelope,
Expand All @@ -8,6 +13,7 @@
UDPReceiver,
TCPConnection,
TCPServer,
SharedMemory,
)


Expand All @@ -22,17 +28,17 @@ def test_envelope_setters_getters():
e.serialized_data = b"muppet"
assert e.serialized_data == b"muppet"

assert e.sent == 0.0
e.sent = 347238.438274
assert e.sent == 347238.438274
assert isinstance(e.sent_at, datetime)
e.sent_at = datetime.fromtimestamp(347238.438274)
assert e.sent_at.timestamp() == 347238.438274

assert e.received == 0.0
e.received = 347238.438274
assert e.received == 347238.438274
assert isinstance(e.received_at, datetime)
e.received_at = datetime.fromtimestamp(347238.438274)
assert e.received_at.timestamp() == 347238.438274

assert e.sampled == 0.0
e.sampled = 347238.438274
assert e.sampled == 347238.438274
assert isinstance(e.sampled_at, datetime)
e.sampled_at = datetime.fromtimestamp(347238.438274)
assert e.sampled_at.timestamp() == 347238.438274

assert e.sender_stamp == 0
e.sender_stamp = 53
Expand Down Expand Up @@ -103,6 +109,7 @@ def receive_callback(data, sender, timestamp):
time.sleep(0.1)

assert received["data"] == "test"
assert isinstance(received["timestamp"], datetime)


def test_TCP_ping():
Expand Down Expand Up @@ -141,3 +148,64 @@ def on_message(data, timestamp):
gc.collect()

assert CALLED_ON_CONNECTION_LOST


@pytest.mark.skipif(
sys.platform == "win32" or sys.platform == "darwin",
reason="See issue https://github.com/MO-RISE/pycluon/issues/11",
)
def test_shared_memory():
sm = SharedMemory("trial.data", 10)
sm2 = SharedMemory("trial.data")

print(sm.name())

assert sm.valid()
assert sm2.valid()

assert len(sm.data) == 10
assert len(sm2.data) == 10

assert sm.data == b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"
assert sm2.data == b"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"

assert not sm.is_locked()
assert not sm2.is_locked()

sm.lock()
assert sm.is_locked()
assert not sm2.is_locked()
sm.unlock()

sm2.lock()
assert sm2.is_locked()
assert not sm.is_locked()
sm2.unlock()

with pytest.raises(BufferError):
sm.timestamp = datetime.now()

sm.lock()
dt = sm.timestamp = datetime.now()

with pytest.raises(TypeError):
sm.data = "abcdefghij"

sm.data = b"abcdefghij" # len == 10
sm.unlock()

sm2.lock()
assert sm2.data == b"abcdefghij"
assert sm2.timestamp == dt
sm2.unlock()

t = threading.Thread(target=sm2.wait, daemon=True)
t.start()

time.sleep(0.1)
assert t.is_alive()

sm.notify_all()

time.sleep(0.1)
assert not t.is_alive()