@@ -1515,28 +1515,38 @@ scan_data(<<Size:64, MsgIdAndMsg:Size/binary, 255, Rest/bits>> = Data,
15151515 % % a remnant from a previous compaction, but it might
15161516 % % simply be a coincidence. Try the next byte.
15171517 #{MsgIdInt := true } ->
1518- <<_ , Rest2 /bits >> = Data ,
1519- scan_data (Rest2 , Fd , Fun , Offset + 1 , FileSize , MsgIdsFound , Acc );
1518+ scan_next_byte (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc );
15201519 % % Data looks to be a message.
15211520 _ ->
1522- % % Avoid sub-binary construction.
1523- MsgId = <<MsgIdInt :128 >>,
15241521 TotalSize = Size + 9 ,
1525- case Fun ({MsgId , TotalSize , Offset }) of
1526- % % Confirmed to be a message by the provided fun.
1527- {valid , Entry } ->
1522+ case check_msg (Fun , MsgIdInt , TotalSize , Offset , Acc ) of
1523+ {continue , NewAcc } ->
15281524 scan_data (Rest , Fd , Fun , Offset + TotalSize , FileSize ,
1529- MsgIdsFound #{MsgIdInt => true }, [Entry |Acc ]);
1530- % % Confirmed to be a message but we don't need it anymore.
1531- previously_valid ->
1532- scan_data (Rest , Fd , Fun , Offset + TotalSize , FileSize ,
1533- MsgIdsFound #{MsgIdInt => true }, Acc );
1534- % % Not a message, try the next byte.
1535- invalid ->
1536- <<_ , Rest2 /bits >> = Data ,
1537- scan_data (Rest2 , Fd , Fun , Offset + 1 , FileSize , MsgIdsFound , Acc )
1525+ MsgIdsFound #{MsgIdInt => true }, NewAcc );
1526+ try_next_byte ->
1527+ scan_next_byte (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15381528 end
15391529 end ;
1530+ % % Large message alone in its own file
1531+ scan_data (<<Size :64 , MsgIdInt :128 , _Rest /bits >> = Data , Fd , Fun , Offset , FileSize , _MsgIdsFound , _Acc )
1532+ when Offset == 0 ,
1533+ FileSize == Size + 9 ->
1534+ {ok , CurrentPos } = file :position (Fd , cur ),
1535+ case file :pread (Fd , FileSize - 1 , 1 ) of
1536+ {ok , <<255 >>} ->
1537+ TotalSize = FileSize ,
1538+ case check_msg (Fun , MsgIdInt , TotalSize , Offset , []) of
1539+ {continue , NewAcc } ->
1540+ NewAcc ;
1541+ try_next_byte ->
1542+ {ok , _ } = file :position (Fd , CurrentPos ),
1543+ scan_next_byte (Data , Fd , Fun , Offset , FileSize , #{}, [])
1544+ end ;
1545+ _ ->
1546+ % % Wrong end marker
1547+ {ok , _ } = file :position (Fd , CurrentPos ),
1548+ scan_next_byte (Data , Fd , Fun , Offset , FileSize , #{}, [])
1549+ end ;
15401550% % This might be the start of a message.
15411551scan_data (<<Size :64 , Rest /bits >> = Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc )
15421552 when byte_size (Rest ) < Size + 1 , Size < FileSize - Offset ->
@@ -1545,9 +1555,27 @@ scan_data(Data, Fd, Fun, Offset, FileSize, MsgIdsFound, Acc)
15451555 when byte_size (Data ) < 8 ->
15461556 scan (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc );
15471557% % This is definitely not a message. Try the next byte.
1548- scan_data (<<_ , Rest /bits >>, Fd , Fun , Offset , FileSize , MsgIdsFound , Acc ) ->
1558+ scan_data (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc ) ->
1559+ scan_next_byte (Data , Fd , Fun , Offset , FileSize , MsgIdsFound , Acc ).
1560+
1561+ scan_next_byte (<<_ , Rest /bits >>, Fd , Fun , Offset , FileSize , MsgIdsFound , Acc ) ->
15491562 scan_data (Rest , Fd , Fun , Offset + 1 , FileSize , MsgIdsFound , Acc ).
15501563
1564+ check_msg (Fun , MsgIdInt , TotalSize , Offset , Acc ) ->
1565+ % % Avoid sub-binary construction.
1566+ MsgId = <<MsgIdInt :128 >>,
1567+ case Fun ({MsgId , TotalSize , Offset }) of
1568+ % % Confirmed to be a message by the provided fun.
1569+ {valid , Entry } ->
1570+ {continue , [Entry |Acc ]};
1571+ % % Confirmed to be a message but we don't need it anymore.
1572+ previously_valid ->
1573+ {continue , Acc };
1574+ % % Not a message, try the next byte.
1575+ invalid ->
1576+ try_next_byte
1577+ end .
1578+
15511579% %----------------------------------------------------------------------------
15521580% % Ets index
15531581% %----------------------------------------------------------------------------
0 commit comments