Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AnalogIO refactoring and support for buffering and data type conversions #92

Merged
merged 19 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 68 additions & 8 deletions OpenEphys.Onix/OpenEphys.Onix/AnalogInput.cs
Original file line number Diff line number Diff line change
@@ -1,27 +1,87 @@
using System;
using System.ComponentModel;
using System.Linq;
using System.Reactive;
using System.Reactive.Linq;
using System.Runtime.InteropServices;
using Bonsai;
using OpenCV.Net;

namespace OpenEphys.Onix
{
public class AnalogInput : Source<ManagedFrame<short>>
public class AnalogInput : Source<AnalogInputDataFrame>
{
[TypeConverter(typeof(AnalogIO.NameConverter))]
public string DeviceName { get; set; }

public override IObservable<ManagedFrame<short>> Generate()
public int BufferSize { get; set; } = 100;

public AnalogIODataType DataType { get; set; } = AnalogIODataType.S16;

static Mat CreateVoltageScale(int bufferSize, float[] voltsPerDivision)
{
using var scaleHeader = Mat.CreateMatHeader(
voltsPerDivision,
rows: voltsPerDivision.Length,
cols: 1,
depth: Depth.F32,
channels: 1);
var voltageScale = new Mat(scaleHeader.Rows, bufferSize, scaleHeader.Depth, scaleHeader.Channels);
CV.Repeat(scaleHeader, voltageScale);
return voltageScale;
}

public unsafe override IObservable<AnalogInputDataFrame> Generate()
{
var bufferSize = BufferSize;
var dataType = DataType;
return Observable.Using(
() => DeviceManager.ReserveDevice(DeviceName),
disposable => disposable.Subject.SelectMany(deviceInfo =>
{
var device = deviceInfo.GetDeviceContext(typeof(AnalogIO));
return deviceInfo.Context.FrameReceived
.Where(frame => frame.DeviceAddress == device.Address)
.Select(frame => new ManagedFrame<short>(frame));
}));
Observable.Create<AnalogInputDataFrame>(observer =>
{
var device = deviceInfo.GetDeviceContext(typeof(AnalogIO));
var ioDeviceInfo = (AnalogIODeviceInfo)deviceInfo;

var sampleIndex = 0;
var voltageScale = dataType == AnalogIODataType.Volts
? CreateVoltageScale(bufferSize, ioDeviceInfo.VoltsPerDivision)
: null;
var transposeBuffer = voltageScale != null
? new Mat(AnalogIO.ChannelCount, bufferSize, Depth.S16, 1)
: null;
var analogDataBuffer = new short[AnalogIO.ChannelCount * bufferSize];
var hubSyncCounterBuffer = new ulong[bufferSize];
var clockBuffer = new ulong[bufferSize];

var frameObserver = Observer.Create<oni.Frame>(
frame =>
{
var payload = (AnalogInputPayload*)frame.Data.ToPointer();
Marshal.Copy(new IntPtr(payload->AnalogData), analogDataBuffer, sampleIndex * AnalogIO.ChannelCount, AnalogIO.ChannelCount);
glopesdev marked this conversation as resolved.
Show resolved Hide resolved
hubSyncCounterBuffer[sampleIndex] = payload->HubSyncCounter;
clockBuffer[sampleIndex] = frame.Clock;
if (++sampleIndex >= bufferSize)
{
var analogData = BufferHelper.CopyTranspose(
analogDataBuffer,
bufferSize,
AnalogIO.ChannelCount,
Depth.S16,
voltageScale,
transposeBuffer);
observer.OnNext(new AnalogInputDataFrame(clockBuffer, hubSyncCounterBuffer, analogData));
hubSyncCounterBuffer = new ulong[bufferSize];
clockBuffer = new ulong[bufferSize];
sampleIndex = 0;
}
},
observer.OnError,
observer.OnCompleted);
return deviceInfo.Context.FrameReceived
.Where(frame => frame.DeviceAddress == device.Address)
.SubscribeSafe(frameObserver);
})));
}
}
}
28 changes: 28 additions & 0 deletions OpenEphys.Onix/OpenEphys.Onix/AnalogInputDataFrame.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
using System.Runtime.InteropServices;
using OpenCV.Net;

namespace OpenEphys.Onix
{
public class AnalogInputDataFrame
{
public AnalogInputDataFrame(ulong[] clock, ulong[] hubSyncCounter, Mat analogData)
{
Clock = clock;
HubSyncCounter = hubSyncCounter;
AnalogData = analogData;
}

public ulong[] Clock { get; }

public ulong[] HubSyncCounter { get; }

public Mat AnalogData { get; }
}

[StructLayout(LayoutKind.Sequential, Pack = 1)]
unsafe struct AnalogInputPayload
{
public ulong HubSyncCounter;
public fixed short AnalogData[AnalogIO.ChannelCount];
}
}
117 changes: 114 additions & 3 deletions OpenEphys.Onix/OpenEphys.Onix/AnalogOutput.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,134 @@
using System.Linq;
using System.Reactive.Linq;
using Bonsai;
using OpenCV.Net;

namespace OpenEphys.Onix
{
public class AnalogOutput : Sink<ushort[]>
public class AnalogOutput : Sink<Mat>
{
const AnalogIOVoltageRange OutputRange = AnalogIOVoltageRange.TenVolts;

[TypeConverter(typeof(AnalogIO.NameConverter))]
public string DeviceName { get; set; }

public override IObservable<ushort[]> Process(IObservable<ushort[]> source)
public AnalogIODataType DataType { get; set; } = AnalogIODataType.S16;

public override IObservable<Mat> Process(IObservable<Mat> source)
{
var dataType = DataType;
return Observable.Using(
() => DeviceManager.ReserveDevice(DeviceName),
disposable => disposable.Subject.SelectMany(deviceInfo =>
{
var bufferSize = 0;
var scaleBuffer = default(Mat);
var transposeBuffer = default(Mat);
var sampleScale = dataType == AnalogIODataType.Volts
? 1 / AnalogIODeviceInfo.GetVoltsPerDivision(OutputRange)
: 1;
var device = deviceInfo.GetDeviceContext(typeof(AnalogIO));
return source.Do(device.Write);
return source.Do(data =>
{
if (dataType == AnalogIODataType.S16 && data.Depth != Depth.S16 ||
dataType == AnalogIODataType.Volts && data.Depth != Depth.F32)
{
ThrowDataTypeException(data.Depth);
}

AssertChannelCount(data.Rows);
if (bufferSize != data.Cols)
{
bufferSize = data.Cols;
transposeBuffer = bufferSize > 1
? new Mat(data.Cols, data.Rows, data.Depth, 1)
: null;
if (sampleScale != 1)
{
scaleBuffer = transposeBuffer != null
? new Mat(data.Cols, data.Rows, Depth.S16, 1)
: new Mat(data.Rows, data.Cols, Depth.S16, 1);
}
else scaleBuffer = null;
}

var outputBuffer = data;
if (transposeBuffer != null)
{
CV.Transpose(outputBuffer, transposeBuffer);
outputBuffer = transposeBuffer;
}

if (scaleBuffer != null)
{
CV.ConvertScale(outputBuffer, scaleBuffer, sampleScale);
outputBuffer = scaleBuffer;
}

var dataSize = outputBuffer.Step * outputBuffer.Rows;
device.Write(outputBuffer.Data, dataSize);
});
}));
}

public IObservable<short[]> Process(IObservable<short[]> source)
{
if (DataType != AnalogIODataType.S16)
ThrowDataTypeException(Depth.S16);

return Observable.Using(
() => DeviceManager.ReserveDevice(DeviceName),
disposable => disposable.Subject.SelectMany(deviceInfo =>
{
var device = deviceInfo.GetDeviceContext(typeof(AnalogIO));
return source.Do(data =>
{
AssertChannelCount(data.Length);
device.Write(data);
});
}));
}

public IObservable<float[]> Process(IObservable<float[]> source)
{
if (DataType != AnalogIODataType.Volts)
ThrowDataTypeException(Depth.F32);

return Observable.Using(
() => DeviceManager.ReserveDevice(DeviceName),
disposable => disposable.Subject.SelectMany(deviceInfo =>
{
var device = deviceInfo.GetDeviceContext(typeof(AnalogIO));
var divisionsPerVolt = 1 / AnalogIODeviceInfo.GetVoltsPerDivision(OutputRange);
return source.Do(data =>
{
AssertChannelCount(data.Length);
var samples = new short[data.Length];
for (int i = 0; i < samples.Length; i++)
{
samples[i] = (short)(data[i] * divisionsPerVolt);
}

device.Write(samples);
});
}));
}

static void AssertChannelCount(int channels)
{
if (channels != AnalogIO.ChannelCount)
{
throw new InvalidOperationException(
$"The input data must have exactly {AnalogIO.ChannelCount} channels."
);
}
}

static void ThrowDataTypeException(Depth depth)
{
throw new InvalidOperationException(
$"Invalid input data type '{depth}' for the specified analog IO configuration."
);
}
}
}
34 changes: 30 additions & 4 deletions OpenEphys.Onix/OpenEphys.Onix/BufferHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace OpenEphys.Onix
{
static class BufferHelper
{
public static Mat CopyBuffer<TBuffer>(
public static Mat CopyTranspose<TBuffer>(
TBuffer[] buffer,
int sampleCount,
int channelCount,
Expand All @@ -17,9 +17,35 @@ public static Mat CopyBuffer<TBuffer>(
channelCount,
depth,
channels: 1);
var amplifierData = new Mat(bufferHeader.Cols, bufferHeader.Rows, bufferHeader.Depth, 1);
CV.Transpose(bufferHeader, amplifierData);
return amplifierData;
var data = new Mat(bufferHeader.Cols, bufferHeader.Rows, depth, 1);
CV.Transpose(bufferHeader, data);
return data;
}

public static Mat CopyTranspose<TBuffer>(
TBuffer[] buffer,
int sampleCount,
int channelCount,
Depth depth,
Mat scale,
Mat transposeBuffer)
where TBuffer : unmanaged
{
if (scale == null)
{
return CopyTranspose(buffer, sampleCount, channelCount, depth);
}

using var bufferHeader = Mat.CreateMatHeader(
buffer,
sampleCount,
channelCount,
depth,
glopesdev marked this conversation as resolved.
Show resolved Hide resolved
channels: 1);
var data = new Mat(bufferHeader.Cols, bufferHeader.Rows, scale.Depth, 1);
CV.Transpose(bufferHeader, transposeBuffer);
CV.Mul(transposeBuffer, scale, data);
return data;
}
}
}
Loading