-
Notifications
You must be signed in to change notification settings - Fork 233
Data Archiving
Data archiving is the process of periodically moving stable records in fact tables from live batches to archive batches, and converting them to a compressed format (run-length encoding).
User specifies an archiving delay (typically 24-48 hours) with the schema for each table. In addition, an archiving interval (every 2 hours for instance) is specified as how often to run the archiving process for the table.
gForceDB maintains a cutoff event time internally for each fact table; records up to the cutoff should have been stored in ( and should be retrieved for queries from) archive batches. This means when a newly arrived upsert has an event time older than the cutoff, this upsert in live store should not be queried to avoid over counting until it is merged/archived into archive batches.
Each time archiving runs, all records in live batches that have their event time older than the new cutoff (now - archiving_delay
) are sorted, compressed and merged into archive batches. An archive batch is created/merged for each UTC day the records fall upon. When all records of a live batch have been processed, the space of this batch is reclaimed. Similarly for entries in the primary key.
Records to be archived are sorted in a user specified column order. Columns early on this order should have lower cardinalities, for better overall compression efficiency. Prefixes of this column order are also used for common equality filter matching. For instance, it's better to put city_id
as the first column in this order since most queries will have a city_id = xx
filter, despite the fact that status
has a lower cardinality. The list of column sort order should contains only columns where we expect compression to be effective; columns like fare_total
with a very high cardinality will not be compressed and shouldn't be included.
Here's an example of an archive batch before compression and merging (with a sort order of [city_id, status, fx_rate]
):
city_id | status | fx_rate | fare_total |
---|---|---|---|
1 | 0 | 1.0 | 5.2 |
1 | 0 | 1.0 | 4.3 |
1 | 1 | NULL | NULL |
5 | 0 | 1.0 | 6.3 |
5 | 0 | 1.0 | 3.6 |
18 | 0 | 1.2 | 8.9 |
18 | 0 | 1.3 | 7.2 |
After sorting, run-length encoding is applied to each sorted column recursively. Here are the results generated from the above example:
- city_id
value | null | count | accumulative_count |
---|---|---|---|
1 | 0 | 3 | 3 |
5 | 0 | 2 | 5 |
18 | 0 | 2 | 7 |
- status
value | null | count | accumulative_count |
---|---|---|---|
0 | 0 | 2 | 2 |
1 | 0 | 1 | 3 |
0 | 0 | 2 | 5 |
0 | 0 | 2 | 7 |
- fx_rate
value | null | count | accumulative_count |
---|---|---|---|
1.0 | 0 | 2 | 2 |
0.0 | 1 | 1 | 3 |
1.0 | 0 | 2 | 5 |
1.2 | 0 | 1 | 6 |
1.3 | 0 | 1 | 7 |
Notice that the 3rd and 4th row in status
have the same value, yet remain uncombined. This is because they are generated recursively from different values (5 and 18) of previous columns (city_id). The purpose of doing it this way is to enforce many-to-one mapping when going from a less compressed column to a more compressed column (reverse column sort order), in order to perform binary search for a single value in the reverse column sort order at query execution time.
In archive batches we store accumulative counts instead of the original counts to enable binary search on GPU for accumulative count (record_index + 1
) without decompression.
One sorted and compressed batch is generated in memory for each UTC day the records fall upon. Because archiving runs multiple times per day (for load smoothing effect), there may be existing archive data on disk for the same UTC day. This requires merging of the newly sorted and compressed data in memory with existing ones on disk. The merging is also recursive along the sort column order to maintain the many-to-one property described above.
An archive vector party file on disk for a column is named as YYYYMMDD-$cutoff
. After merging, a new file is created with the new cutoff for each column. After new files have been created for all columns, the new cutoff is persisted on disk, and the in memory cutoff value is updated to the new value for query splitting. Finally the vector party files with the old cutoff are deleted. This ensures data consistency before/after archiving/recovery.