Skip to content

Commit a7cd3c5

Browse files
authored
Merge pull request #92 from neurogears/issue-76
AnalogIO refactoring and support for buffering and data type conversions
2 parents dcb94f6 + 2602312 commit a7cd3c5

File tree

9 files changed

+373
-142
lines changed

9 files changed

+373
-142
lines changed
Lines changed: 68 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,87 @@
11
using System;
22
using System.ComponentModel;
33
using System.Linq;
4+
using System.Reactive;
45
using System.Reactive.Linq;
6+
using System.Runtime.InteropServices;
57
using Bonsai;
8+
using OpenCV.Net;
69

710
namespace OpenEphys.Onix
811
{
9-
public class AnalogInput : Source<ManagedFrame<short>>
12+
public class AnalogInput : Source<AnalogInputDataFrame>
1013
{
1114
[TypeConverter(typeof(AnalogIO.NameConverter))]
1215
public string DeviceName { get; set; }
1316

14-
public override IObservable<ManagedFrame<short>> Generate()
17+
public int BufferSize { get; set; } = 100;
18+
19+
public AnalogIODataType DataType { get; set; } = AnalogIODataType.S16;
20+
21+
static Mat CreateVoltageScale(int bufferSize, float[] voltsPerDivision)
1522
{
23+
using var scaleHeader = Mat.CreateMatHeader(
24+
voltsPerDivision,
25+
rows: voltsPerDivision.Length,
26+
cols: 1,
27+
depth: Depth.F32,
28+
channels: 1);
29+
var voltageScale = new Mat(scaleHeader.Rows, bufferSize, scaleHeader.Depth, scaleHeader.Channels);
30+
CV.Repeat(scaleHeader, voltageScale);
31+
return voltageScale;
32+
}
33+
34+
public unsafe override IObservable<AnalogInputDataFrame> Generate()
35+
{
36+
var bufferSize = BufferSize;
37+
var dataType = DataType;
1638
return Observable.Using(
1739
() => DeviceManager.ReserveDevice(DeviceName),
1840
disposable => disposable.Subject.SelectMany(deviceInfo =>
19-
{
20-
var device = deviceInfo.GetDeviceContext(typeof(AnalogIO));
21-
return deviceInfo.Context.FrameReceived
22-
.Where(frame => frame.DeviceAddress == device.Address)
23-
.Select(frame => new ManagedFrame<short>(frame));
24-
}));
41+
Observable.Create<AnalogInputDataFrame>(observer =>
42+
{
43+
var device = deviceInfo.GetDeviceContext(typeof(AnalogIO));
44+
var ioDeviceInfo = (AnalogIODeviceInfo)deviceInfo;
45+
46+
var sampleIndex = 0;
47+
var voltageScale = dataType == AnalogIODataType.Volts
48+
? CreateVoltageScale(bufferSize, ioDeviceInfo.VoltsPerDivision)
49+
: null;
50+
var transposeBuffer = voltageScale != null
51+
? new Mat(AnalogIO.ChannelCount, bufferSize, Depth.S16, 1)
52+
: null;
53+
var analogDataBuffer = new short[AnalogIO.ChannelCount * bufferSize];
54+
var hubSyncCounterBuffer = new ulong[bufferSize];
55+
var clockBuffer = new ulong[bufferSize];
56+
57+
var frameObserver = Observer.Create<oni.Frame>(
58+
frame =>
59+
{
60+
var payload = (AnalogInputPayload*)frame.Data.ToPointer();
61+
Marshal.Copy(new IntPtr(payload->AnalogData), analogDataBuffer, sampleIndex * AnalogIO.ChannelCount, AnalogIO.ChannelCount);
62+
hubSyncCounterBuffer[sampleIndex] = payload->HubSyncCounter;
63+
clockBuffer[sampleIndex] = frame.Clock;
64+
if (++sampleIndex >= bufferSize)
65+
{
66+
var analogData = BufferHelper.CopyTranspose(
67+
analogDataBuffer,
68+
bufferSize,
69+
AnalogIO.ChannelCount,
70+
Depth.S16,
71+
voltageScale,
72+
transposeBuffer);
73+
observer.OnNext(new AnalogInputDataFrame(clockBuffer, hubSyncCounterBuffer, analogData));
74+
hubSyncCounterBuffer = new ulong[bufferSize];
75+
clockBuffer = new ulong[bufferSize];
76+
sampleIndex = 0;
77+
}
78+
},
79+
observer.OnError,
80+
observer.OnCompleted);
81+
return deviceInfo.Context.FrameReceived
82+
.Where(frame => frame.DeviceAddress == device.Address)
83+
.SubscribeSafe(frameObserver);
84+
})));
2585
}
2686
}
2787
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using System.Runtime.InteropServices;
2+
using OpenCV.Net;
3+
4+
namespace OpenEphys.Onix
5+
{
6+
public class AnalogInputDataFrame
7+
{
8+
public AnalogInputDataFrame(ulong[] clock, ulong[] hubSyncCounter, Mat analogData)
9+
{
10+
Clock = clock;
11+
HubSyncCounter = hubSyncCounter;
12+
AnalogData = analogData;
13+
}
14+
15+
public ulong[] Clock { get; }
16+
17+
public ulong[] HubSyncCounter { get; }
18+
19+
public Mat AnalogData { get; }
20+
}
21+
22+
[StructLayout(LayoutKind.Sequential, Pack = 1)]
23+
unsafe struct AnalogInputPayload
24+
{
25+
public ulong HubSyncCounter;
26+
public fixed short AnalogData[AnalogIO.ChannelCount];
27+
}
28+
}

OpenEphys.Onix/OpenEphys.Onix/AnalogOutput.cs

Lines changed: 114 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,134 @@
33
using System.Linq;
44
using System.Reactive.Linq;
55
using Bonsai;
6+
using OpenCV.Net;
67

78
namespace OpenEphys.Onix
89
{
9-
public class AnalogOutput : Sink<ushort[]>
10+
public class AnalogOutput : Sink<Mat>
1011
{
12+
const AnalogIOVoltageRange OutputRange = AnalogIOVoltageRange.TenVolts;
13+
1114
[TypeConverter(typeof(AnalogIO.NameConverter))]
1215
public string DeviceName { get; set; }
1316

14-
public override IObservable<ushort[]> Process(IObservable<ushort[]> source)
17+
public AnalogIODataType DataType { get; set; } = AnalogIODataType.S16;
18+
19+
public override IObservable<Mat> Process(IObservable<Mat> source)
1520
{
21+
var dataType = DataType;
1622
return Observable.Using(
1723
() => DeviceManager.ReserveDevice(DeviceName),
1824
disposable => disposable.Subject.SelectMany(deviceInfo =>
1925
{
26+
var bufferSize = 0;
27+
var scaleBuffer = default(Mat);
28+
var transposeBuffer = default(Mat);
29+
var sampleScale = dataType == AnalogIODataType.Volts
30+
? 1 / AnalogIODeviceInfo.GetVoltsPerDivision(OutputRange)
31+
: 1;
2032
var device = deviceInfo.GetDeviceContext(typeof(AnalogIO));
21-
return source.Do(device.Write);
33+
return source.Do(data =>
34+
{
35+
if (dataType == AnalogIODataType.S16 && data.Depth != Depth.S16 ||
36+
dataType == AnalogIODataType.Volts && data.Depth != Depth.F32)
37+
{
38+
ThrowDataTypeException(data.Depth);
39+
}
40+
41+
AssertChannelCount(data.Rows);
42+
if (bufferSize != data.Cols)
43+
{
44+
bufferSize = data.Cols;
45+
transposeBuffer = bufferSize > 1
46+
? new Mat(data.Cols, data.Rows, data.Depth, 1)
47+
: null;
48+
if (sampleScale != 1)
49+
{
50+
scaleBuffer = transposeBuffer != null
51+
? new Mat(data.Cols, data.Rows, Depth.S16, 1)
52+
: new Mat(data.Rows, data.Cols, Depth.S16, 1);
53+
}
54+
else scaleBuffer = null;
55+
}
56+
57+
var outputBuffer = data;
58+
if (transposeBuffer != null)
59+
{
60+
CV.Transpose(outputBuffer, transposeBuffer);
61+
outputBuffer = transposeBuffer;
62+
}
63+
64+
if (scaleBuffer != null)
65+
{
66+
CV.ConvertScale(outputBuffer, scaleBuffer, sampleScale);
67+
outputBuffer = scaleBuffer;
68+
}
69+
70+
var dataSize = outputBuffer.Step * outputBuffer.Rows;
71+
device.Write(outputBuffer.Data, dataSize);
72+
});
2273
}));
2374
}
75+
76+
public IObservable<short[]> Process(IObservable<short[]> source)
77+
{
78+
if (DataType != AnalogIODataType.S16)
79+
ThrowDataTypeException(Depth.S16);
80+
81+
return Observable.Using(
82+
() => DeviceManager.ReserveDevice(DeviceName),
83+
disposable => disposable.Subject.SelectMany(deviceInfo =>
84+
{
85+
var device = deviceInfo.GetDeviceContext(typeof(AnalogIO));
86+
return source.Do(data =>
87+
{
88+
AssertChannelCount(data.Length);
89+
device.Write(data);
90+
});
91+
}));
92+
}
93+
94+
public IObservable<float[]> Process(IObservable<float[]> source)
95+
{
96+
if (DataType != AnalogIODataType.Volts)
97+
ThrowDataTypeException(Depth.F32);
98+
99+
return Observable.Using(
100+
() => DeviceManager.ReserveDevice(DeviceName),
101+
disposable => disposable.Subject.SelectMany(deviceInfo =>
102+
{
103+
var device = deviceInfo.GetDeviceContext(typeof(AnalogIO));
104+
var divisionsPerVolt = 1 / AnalogIODeviceInfo.GetVoltsPerDivision(OutputRange);
105+
return source.Do(data =>
106+
{
107+
AssertChannelCount(data.Length);
108+
var samples = new short[data.Length];
109+
for (int i = 0; i < samples.Length; i++)
110+
{
111+
samples[i] = (short)(data[i] * divisionsPerVolt);
112+
}
113+
114+
device.Write(samples);
115+
});
116+
}));
117+
}
118+
119+
static void AssertChannelCount(int channels)
120+
{
121+
if (channels != AnalogIO.ChannelCount)
122+
{
123+
throw new InvalidOperationException(
124+
$"The input data must have exactly {AnalogIO.ChannelCount} channels."
125+
);
126+
}
127+
}
128+
129+
static void ThrowDataTypeException(Depth depth)
130+
{
131+
throw new InvalidOperationException(
132+
$"Invalid input data type '{depth}' for the specified analog IO configuration."
133+
);
134+
}
24135
}
25136
}

OpenEphys.Onix/OpenEphys.Onix/BufferHelper.cs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ namespace OpenEphys.Onix
44
{
55
static class BufferHelper
66
{
7-
public static Mat CopyBuffer<TBuffer>(
7+
public static Mat CopyTranspose<TBuffer>(
88
TBuffer[] buffer,
99
int sampleCount,
1010
int channelCount,
@@ -17,9 +17,35 @@ public static Mat CopyBuffer<TBuffer>(
1717
channelCount,
1818
depth,
1919
channels: 1);
20-
var amplifierData = new Mat(bufferHeader.Cols, bufferHeader.Rows, bufferHeader.Depth, 1);
21-
CV.Transpose(bufferHeader, amplifierData);
22-
return amplifierData;
20+
var data = new Mat(bufferHeader.Cols, bufferHeader.Rows, depth, 1);
21+
CV.Transpose(bufferHeader, data);
22+
return data;
23+
}
24+
25+
public static Mat CopyTranspose<TBuffer>(
26+
TBuffer[] buffer,
27+
int sampleCount,
28+
int channelCount,
29+
Depth depth,
30+
Mat scale,
31+
Mat transposeBuffer)
32+
where TBuffer : unmanaged
33+
{
34+
if (scale == null)
35+
{
36+
return CopyTranspose(buffer, sampleCount, channelCount, depth);
37+
}
38+
39+
using var bufferHeader = Mat.CreateMatHeader(
40+
buffer,
41+
sampleCount,
42+
channelCount,
43+
depth,
44+
channels: 1);
45+
var data = new Mat(bufferHeader.Cols, bufferHeader.Rows, scale.Depth, 1);
46+
CV.Transpose(bufferHeader, transposeBuffer);
47+
CV.Mul(transposeBuffer, scale, data);
48+
return data;
2349
}
2450
}
2551
}

0 commit comments

Comments
 (0)