@@ -899,7 +899,7 @@ handle_cast({write, CRef, MsgRef, MsgId, Flow},
899899 % % or the non-current files. If the message *is* in the
900900 % % current file then the cache entry will be removed by
901901 % % the normal logic for that in write_message/4 and
902- % % maybe_roll_to_new_file /2.
902+ % % flush_or_roll_to_new_file /2.
903903 case index_lookup (MsgId , State ) of
904904 [# msg_location { file = File }]
905905 when File == State # msstate .current_file ->
@@ -1208,26 +1208,102 @@ gc_candidate(File, State = #msstate{ gc_candidates = Candidates,
12081208gc_candidate (File , State = # msstate { gc_candidates = Candidates }) ->
12091209 State # msstate { gc_candidates = Candidates #{ File => true }}.
12101210
1211- write_message (MsgId , Msg ,
1212- State0 = # msstate { current_file_handle = CurHdl ,
1213- current_file = CurFile ,
1214- current_file_offset = CurOffset ,
1215- file_summary_ets = FileSummaryEts }) ->
1216- {MaybeFlush , TotalSize } = writer_append (CurHdl , MsgId , Msg ),
1217- State = case MaybeFlush of
1218- flush -> internal_sync (State0 );
1219- ok -> State0
1220- end ,
1211+ -define (LARGE_MESSAGE_THRESHOLD , 4194304 ). % % 4MB.
1212+
1213+ write_message (MsgId , MsgBody , State ) ->
1214+ MsgBodyBin = term_to_binary (MsgBody ),
1215+ % % Large messages get written to their own files.
1216+ if
1217+ byte_size (MsgBodyBin ) >= ? LARGE_MESSAGE_THRESHOLD ->
1218+ write_large_message (MsgId , MsgBodyBin , State );
1219+ true ->
1220+ write_small_message (MsgId , MsgBodyBin , State )
1221+ end .
1222+
1223+ write_small_message (MsgId , MsgBodyBin ,
1224+ State = # msstate { current_file_handle = CurHdl ,
1225+ current_file = CurFile ,
1226+ current_file_offset = CurOffset ,
1227+ file_summary_ets = FileSummaryEts }) ->
1228+ {MaybeFlush , TotalSize } = writer_append (CurHdl , MsgId , MsgBodyBin ),
12211229 ok = index_insert (
12221230 # msg_location { msg_id = MsgId , ref_count = 1 , file = CurFile ,
12231231 offset = CurOffset , total_size = TotalSize }, State ),
12241232 [_ ,_ ] = ets :update_counter (FileSummaryEts , CurFile ,
12251233 [{# file_summary .valid_total_size , TotalSize },
12261234 {# file_summary .file_size , TotalSize }]),
1227- maybe_roll_to_new_file (CurOffset + TotalSize ,
1235+ flush_or_roll_to_new_file (CurOffset + TotalSize , MaybeFlush ,
12281236 State # msstate {
12291237 current_file_offset = CurOffset + TotalSize }).
12301238
1239+ flush_or_roll_to_new_file (
1240+ Offset , _MaybeFlush ,
1241+ State = # msstate { dir = Dir ,
1242+ current_file_handle = CurHdl ,
1243+ current_file = CurFile ,
1244+ file_summary_ets = FileSummaryEts ,
1245+ cur_file_cache_ets = CurFileCacheEts ,
1246+ file_size_limit = FileSizeLimit })
1247+ when Offset >= FileSizeLimit ->
1248+ State1 = internal_sync (State ),
1249+ ok = writer_close (CurHdl ),
1250+ NextFile = CurFile + 1 ,
1251+ {ok , NextHdl } = writer_open (Dir , NextFile ),
1252+ true = ets :insert_new (FileSummaryEts , # file_summary {
1253+ file = NextFile ,
1254+ valid_total_size = 0 ,
1255+ file_size = 0 ,
1256+ locked = false }),
1257+ % % Delete messages from the cache that were written to disk.
1258+ true = ets :match_delete (CurFileCacheEts , {'_' , '_' , 0 }),
1259+ State1 # msstate { current_file_handle = NextHdl ,
1260+ current_file = NextFile ,
1261+ current_file_offset = 0 };
1262+ % % If we need to flush, do so here.
1263+ flush_or_roll_to_new_file (_ , flush , State ) ->
1264+ internal_sync (State );
1265+ flush_or_roll_to_new_file (_ , _ , State ) ->
1266+ State .
1267+
1268+ write_large_message (MsgId , MsgBodyBin ,
1269+ State0 = # msstate { dir = Dir ,
1270+ current_file_handle = CurHdl ,
1271+ current_file = CurFile ,
1272+ file_summary_ets = FileSummaryEts ,
1273+ cur_file_cache_ets = CurFileCacheEts }) ->
1274+ % % Flush the current file and close it.
1275+ ok = writer_flush (CurHdl ),
1276+ ok = writer_close (CurHdl ),
1277+ % % Open a new file, write the message directly and close it.
1278+ LargeMsgFile = CurFile + 1 ,
1279+ {ok , LargeMsgHdl } = writer_open (Dir , LargeMsgFile ),
1280+ TotalSize = writer_direct_write (LargeMsgHdl , MsgId , MsgBodyBin ),
1281+ ok = writer_close (CurHdl ),
1282+ % % Update ets with the new information directly.
1283+ ok = index_insert (
1284+ # msg_location { msg_id = MsgId , ref_count = 1 , file = LargeMsgFile ,
1285+ offset = 0 , total_size = TotalSize }, State0 ),
1286+ true = ets :insert_new (FileSummaryEts , # file_summary {
1287+ file = LargeMsgFile ,
1288+ valid_total_size = TotalSize ,
1289+ file_size = TotalSize ,
1290+ locked = false }),
1291+ % % Roll over to the next file.
1292+ NextFile = LargeMsgFile + 1 ,
1293+ {ok , NextHdl } = writer_open (Dir , NextFile ),
1294+ true = ets :insert_new (FileSummaryEts , # file_summary {
1295+ file = NextFile ,
1296+ valid_total_size = 0 ,
1297+ file_size = 0 ,
1298+ locked = false }),
1299+ % % Delete messages from the cache that were written to disk.
1300+ true = ets :match_delete (CurFileCacheEts , {'_' , '_' , 0 }),
1301+ % % Process confirms (this won't flush; we already did) and continue.
1302+ State = internal_sync (State0 ),
1303+ State # msstate { current_file_handle = NextHdl ,
1304+ current_file = NextFile ,
1305+ current_file_offset = 0 }.
1306+
12311307contains_message (MsgId , From , State ) ->
12321308 MsgLocation = index_lookup_positive_ref_count (MsgId , State ),
12331309 gen_server2 :reply (From , MsgLocation =/= not_found ),
@@ -1325,8 +1401,7 @@ writer_recover(Dir, Num, Offset) ->
13251401 ok = file :truncate (Fd ),
13261402 {ok , # writer {fd = Fd , buffer = prim_buffer :new ()}}.
13271403
1328- writer_append (# writer {buffer = Buffer }, MsgId , MsgBody ) ->
1329- MsgBodyBin = term_to_binary (MsgBody ),
1404+ writer_append (# writer {buffer = Buffer }, MsgId , MsgBodyBin ) ->
13301405 MsgBodyBinSize = byte_size (MsgBodyBin ),
13311406 EntrySize = MsgBodyBinSize + 16 , % % Size of MsgId + MsgBodyBin.
13321407 % % We send an iovec to the buffer instead of building a binary.
@@ -1354,6 +1429,21 @@ writer_flush(#writer{fd = Fd, buffer = Buffer}) ->
13541429 file :write (Fd , prim_buffer :read_iovec (Buffer , Size ))
13551430 end .
13561431
1432+ % % For large messages we don't buffer anything. Large messages
1433+ % % are kept within their own files.
1434+ % %
1435+ % % This is basically the same as writer_append except no buffering.
1436+ writer_direct_write (# writer {fd = Fd }, MsgId , MsgBodyBin ) ->
1437+ MsgBodyBinSize = byte_size (MsgBodyBin ),
1438+ EntrySize = MsgBodyBinSize + 16 , % % Size of MsgId + MsgBodyBin.
1439+ file :write (Fd , [
1440+ <<EntrySize :64 >>,
1441+ MsgId ,
1442+ MsgBodyBin ,
1443+ <<255 >> % % OK marker.
1444+ ]),
1445+ EntrySize + 9 .
1446+
13571447writer_close (# writer {fd = Fd }) ->
13581448 file :close (Fd ).
13591449
@@ -1700,33 +1790,6 @@ rebuild_index(Gatherer, Files, State) ->
17001790% % garbage collection / compaction / aggregation -- internal
17011791% %----------------------------------------------------------------------------
17021792
1703- maybe_roll_to_new_file (
1704- Offset ,
1705- State = # msstate { dir = Dir ,
1706- current_file_handle = CurHdl ,
1707- current_file = CurFile ,
1708- file_summary_ets = FileSummaryEts ,
1709- cur_file_cache_ets = CurFileCacheEts ,
1710- file_size_limit = FileSizeLimit })
1711- when Offset >= FileSizeLimit ->
1712- State1 = internal_sync (State ),
1713- ok = writer_close (CurHdl ),
1714- NextFile = CurFile + 1 ,
1715- {ok , NextHdl } = writer_open (Dir , NextFile ),
1716- true = ets :insert_new (FileSummaryEts , # file_summary {
1717- file = NextFile ,
1718- valid_total_size = 0 ,
1719- file_size = 0 ,
1720- locked = false }),
1721- % % We only delete messages from the cache that were written to disk
1722- % % in the previous file.
1723- true = ets :match_delete (CurFileCacheEts , {'_' , '_' , 0 }),
1724- State1 # msstate { current_file_handle = NextHdl ,
1725- current_file = NextFile ,
1726- current_file_offset = 0 };
1727- maybe_roll_to_new_file (_ , State ) ->
1728- State .
1729-
17301793% % We keep track of files that have seen removes and
17311794% % check those periodically for compaction. We only
17321795% % compact files that have less than half valid data.
0 commit comments