Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions nixnet/_funcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,40 @@ def nx_read_signal_single_point(
return timestamp_buffer_ctypes, value_buffer_ctypes


def nx_read_signal_waveform(
session_ref, # type: int
timeout, # type: float
number_of_signals, # type: int
number_of_values, # type: int
):
# type: (...) -> typing.Tuple[int, float, typing.List[_ctypedefs.f64], int]
total_number_of_values = number_of_values * number_of_signals

session_ref_ctypes = _ctypedefs.nxSessionRef_t(session_ref)
timeout_ctypes = _ctypedefs.f64(timeout)
start_time_ctypes = _ctypedefs.nxTimestamp_t()
delta_time_ctypes = _ctypedefs.f64()
value_buffer_ctypes = (_ctypedefs.f64 * total_number_of_values)() # type: ignore
size_of_value_buffer_ctypes = _ctypedefs.u32(
_ctypedefs.f64.BYTES * total_number_of_values)
number_of_values_returned_ctypes = _ctypedefs.u32()
result = _cfuncs.lib.nx_read_signal_waveform(
session_ref_ctypes,
timeout_ctypes,
ctypes.pointer(start_time_ctypes),
ctypes.pointer(delta_time_ctypes),
value_buffer_ctypes,
size_of_value_buffer_ctypes,
ctypes.pointer(number_of_values_returned_ctypes),
)
_errors.check_for_error(result.value)
return (
start_time_ctypes.value,
delta_time_ctypes.value,
value_buffer_ctypes,
number_of_values_returned_ctypes.value)


def nx_read_state(
session_ref, # type: int
state_id, # type: _enums.ReadState
Expand Down
145 changes: 131 additions & 14 deletions nixnet/_session/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
from __future__ import division
from __future__ import print_function

import itertools
import typing # NOQA: F401

from nixnet import _funcs
from nixnet import _props
from nixnet import constants

from nixnet._session import collection

Expand All @@ -19,20 +21,6 @@ def __repr__(self):
def _create_item(self, handle, index, name):
return Signal(handle, index, name)

@property
def resamp_rate(self):
# type: () -> float
"""float: Rate used to resample frame data to/from signal data in waveforms.

The units are in Hertz (samples per second).
"""
return _props.get_session_resamp_rate(self._handle)

@resamp_rate.setter
def resamp_rate(self, value):
# type: (float) -> None
_props.set_session_resamp_rate(self._handle, value)


class SinglePointInSignals(Signals):
"""Writeable signals in a session."""
Expand Down Expand Up @@ -71,6 +59,135 @@ def write(
_funcs.nx_write_signal_single_point(self._handle, list(signals))


class WaveformInSignals(Signals):
"""Writeable signals in a session."""

def __repr__(self):
return 'Session.WaveformInSignals(handle={0})'.format(self._handle)

@property
def resamp_rate(self):
# type: () -> float
"""float: Rate used to resample frame data to/from signal data in waveforms.

The units are in Hertz (samples per second).
"""
return _props.get_session_resamp_rate(self._handle)

@resamp_rate.setter
def resamp_rate(self, value):
# type: (float) -> None
_props.set_session_resamp_rate(self._handle, value)

def read(
self,
num_values_per_signal,
timeout=constants.TIMEOUT_NONE):
# type: (int, float) -> typing.Tuple[int, float, typing.List[typing.List[float]]]
"""Read data from a Signal Input Waveform session.

Returns:
tuple of int, float, and list of list of float: t0, dt, and a list
of signal waveforms. A signal waveform is a list of signal
values.
"""
num_signals = len(self)
t0, dt, flattened_signals, num_values_returned = _funcs.nx_read_signal_waveform(
self._handle,
timeout,
num_signals,
num_values_per_signal)
signals = self._unflatten_signals(flattened_signals, num_values_returned, num_signals)
return t0, dt, signals

@staticmethod
def _unflatten_signals(flattened_signals, num_values_returned, num_signals):
ranges = (
(si * num_signals, si * num_signals + num_values_returned)
for si in range(num_signals)
)
signals = [
[
signal_ctype.value
for signal_ctype in flattened_signals[start:end]
]
for start, end in ranges
]
return signals


class WaveformOutSignals(Signals):
"""Writeable signals in a session."""

def __repr__(self):
return 'Session.WaveformOutSignals(handle={0})'.format(self._handle)

@property
def resamp_rate(self):
# type: () -> float
"""float: Rate used to resample frame data to/from signal data in waveforms.

The units are in Hertz (samples per second).
"""
return _props.get_session_resamp_rate(self._handle)

@resamp_rate.setter
def resamp_rate(self, value):
# type: (float) -> None
_props.set_session_resamp_rate(self._handle, value)

def write(
self,
signals,
timeout=10):
# type: (typing.List[typing.List[float]], float) -> None
"""Write data to a Signal Output Waveform session.

Args:
signals(list of list of float): A list of signal waveforms. A
signal waveform is a list of signal values (float)). Each
waveform must be the same length.

The data you write is queued for transmit on the network. Using
the default queue configuration for this mode, and assuming a
1000 Hz resample rate, you can safely write 64 elements if you
have a sufficiently long timeout. To write more data, refer to
the XNET Session Number of Values Unused property to determine
the actual amount of queue space available for writing.
timeout(float): The time in seconds to wait for the data to be
queued for transmit. The timeout does not wait for frames to be
transmitted on the network (see
:any:`nixnet._session.base.SessionBase.wait_for_transmit_complete`).

If 'timeout' is positive, this function waits up to that 'timeout'
for space to become available in queues. If the space is not
available prior to the 'timeout', a 'timeout' error is returned.

If 'timeout' is 'constants.TIMEOUT_INFINITE', this functions
waits indefinitely for space to become available in queues.

If 'timeout' is 'constants.TIMEOUT_NONE', this function does not
wait and immediately returns with a 'timeout' error if all data
cannot be queued. Regardless of the 'timeout' used, if a 'timeout'
error occurs, none of the data is queued, so you can attempt to
call this function again at a later time with the same data.
"""
flattened_signals = self._flatten_signals(signals)
_funcs.nx_write_signal_waveform(self._handle, timeout, flattened_signals)

@staticmethod
def _flatten_signals(signals):
"""Flatten even lists of signals.

>>> WaveformOutSignals._flatten_signals([])
[]
>>> WaveformOutSignals._flatten_signals([[1, 2], [3, 4]])
[1, 2, 3, 4]
"""
flattened_signals = list(itertools.chain.from_iterable(signals))
return flattened_signals


class Signal(collection.Item):
"""Signal configuration for a session."""

Expand Down
Loading