Skip to content

Commit d9c3d98

Browse files
committed
fixed framing issues
1 parent 486f5a3 commit d9c3d98

File tree

1 file changed

+40
-29
lines changed

1 file changed

+40
-29
lines changed

src/core/Akka.Streams/Dsl/Framing.cs

Lines changed: 40 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -169,39 +169,45 @@ public FramingException(string message) : base(message)
169169
protected FramingException(SerializationInfo info, StreamingContext context) : base(info, context) { }
170170
}
171171

172-
private static readonly Func<IEnumerator<byte>, int, int> BigEndianDecoder = (enumerator, length) =>
172+
private static int BigEndianDecoder(ref ReadOnlySpan<byte> enumerator, int length)
173173
{
174-
var count = length;
175-
var decoded = 0;
176-
while (count > 0)
174+
if (length > enumerator.Length)
175+
{
176+
throw new ArgumentException("Length exceeds the size of the enumerator.");
177+
}
178+
179+
var result = 0;
180+
181+
// Assuming 'length' is 4 for a 32-bit integer.
182+
// Adjust the loop and shift amounts if dealing with different sizes.
183+
for (var i = 0; i < length; i++)
177184
{
178-
decoded <<= 8;
179-
if (!enumerator.MoveNext()) throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte string");
180-
decoded |= enumerator.Current & 0xFF;
181-
count--;
185+
result |= enumerator[i] << ((length - 1 - i) * 8);
182186
}
183187

184-
return decoded;
185-
};
188+
return result;
189+
}
186190

187-
private static readonly Func<IEnumerator<byte>, int, int> LittleEndianDecoder = (enumerator, length) =>
191+
private static int LittleEndianDecoder(ref ReadOnlySpan<byte> span, int length)
188192
{
189-
var highestOcted = (length - 1) << 3;
190-
var mask = (int) (1L << (length << 3)) - 1;
191-
var count = length;
193+
if (length > span.Length)
194+
{
195+
throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte span");
196+
}
197+
192198
var decoded = 0;
199+
var highestOctetShift = (length - 1) << 3;
200+
var mask = (int)(1L << (length << 3)) - 1;
193201

194-
while (count > 0)
202+
for (var i = 0; i < length; i++)
195203
{
196-
// decoded >>>= 8 on the jvm
197-
decoded = (int) ((uint) decoded >> 8);
198-
if (!enumerator.MoveNext()) throw new IndexOutOfRangeException("LittleEndianDecoder reached end of byte string");
199-
decoded += (enumerator.Current & 0xFF) << highestOcted;
200-
count--;
204+
// Shift and add the ith byte to 'decoded'. No need for >>>= as in JVM; just shift appropriately.
205+
var shiftAmount = highestOctetShift - (i << 3);
206+
decoded |= (span[i] & 0xFF) << shiftAmount;
201207
}
202208

203209
return decoded & mask;
204-
};
210+
}
205211

206212
private sealed class SimpleFramingProtocolEncoderStage : SimpleLinearGraphStage<ByteString>
207213
{
@@ -342,8 +348,8 @@ private void DoParse()
342348
else if (_buffer.HasSubstring(_stage._separatorBytes, possibleMatchPosition))
343349
{
344350
// Found a match
345-
var parsedFrame = _buffer.Slice(0, possibleMatchPosition).Compact();
346-
_buffer = _buffer.Slice(possibleMatchPosition + _stage._separatorBytes.Count).Compact();
351+
var parsedFrame = _buffer.Slice(0, possibleMatchPosition);
352+
_buffer = _buffer.Slice(possibleMatchPosition + _stage._separatorBytes.Count);
347353
_nextPossibleMatch = 0;
348354
Push(_stage.Outlet, parsedFrame);
349355

@@ -422,7 +428,7 @@ public override void OnUpstreamFinish()
422428
/// </summary>
423429
private void PushFrame()
424430
{
425-
var emit = _buffer.Slice(0, _frameSize).Compact();
431+
var emit = _buffer.Slice(0, _frameSize);
426432
_buffer = _buffer.Slice(_frameSize);
427433
_frameSize = int.MaxValue;
428434
Push(_stage.Outlet, emit);
@@ -440,9 +446,14 @@ private void TryPushFrame()
440446
PushFrame();
441447
else if (bufferSize >= _stage._minimumChunkSize)
442448
{
443-
var iterator = _buffer.Slice(_stage._lengthFieldOffset).GetEnumerator();
444-
var parsedLength = _stage._intDecoder(iterator, _stage._lengthFieldLength);
445-
449+
var iterator = _buffer.Memory.Span.Slice(_stage._lengthFieldOffset);
450+
var parsedLength = _stage._byteOrder switch {
451+
ByteOrder.BigEndian => BigEndianDecoder(ref iterator, _stage._lengthFieldLength),
452+
ByteOrder.LittleEndian => LittleEndianDecoder(ref iterator, _stage._lengthFieldLength),
453+
_ => throw new NotSupportedException($"ByteOrder {_stage._byteOrder} is not supported")
454+
};
455+
456+
// TODO: AVOID ARRAY COPYING AGAIN HERE
446457
_frameSize = _stage._computeFrameSize.HasValue
447458
? _stage._computeFrameSize.Value(_buffer.Slice(0, _stage._lengthFieldOffset).ToArray(), parsedLength)
448459
: parsedLength + _stage._minimumChunkSize;
@@ -480,7 +491,7 @@ private void TryPull()
480491
private readonly int _maximumFramelength;
481492
private readonly int _lengthFieldOffset;
482493
private readonly int _minimumChunkSize;
483-
private readonly Func<IEnumerator<byte>, int, int> _intDecoder;
494+
private readonly ByteOrder _byteOrder;
484495
private readonly Option<Func<IReadOnlyList<byte>, int, int>> _computeFrameSize;
485496

486497
// For the sake of binary compatibility
@@ -500,7 +511,7 @@ public LengthFieldFramingStage(
500511
_lengthFieldOffset = lengthFieldOffset;
501512
_minimumChunkSize = lengthFieldOffset + lengthFieldLength;
502513
_computeFrameSize = computeFrameSize;
503-
_intDecoder = byteOrder == ByteOrder.BigEndian ? BigEndianDecoder : LittleEndianDecoder;
514+
_byteOrder = byteOrder;
504515
}
505516

506517
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new Logic(this);

0 commit comments

Comments
 (0)