diff --git a/1brc.c b/1brc.c index 0aff5b4..a05dda7 100644 --- a/1brc.c +++ b/1brc.c @@ -16,9 +16,9 @@ /* * wait() for all child processes before exiting. - * If 0, instead "cheat" by returning immediately, leaving orphan processes. + * If false, "cheat" by returning immediately, leaving orphan processes. */ -#define UNMAP 0 +#define UNMAP false /* * Pin worker threads to a CPU. @@ -27,12 +27,12 @@ * Each worker is assigned to the lowest unused CPU # we're scheduled for. * For hyperthreaded CPUs, logical CPU numbers must not be interleaved. */ -#define PIN_CPU 1 +#define PIN_CPU true /* * Print timing information and city summary to stderr. */ -#define DEBUG 0 +#define DEBUG false #define HASH_SHIFT 17 // 17 is a happy compromise between non-10k/10k: 16 is 1% faster/10% slower; 18 is 1% slower/3% faster #define HASH_LONG_SHIFT 14 // 14 is requried to fit 10k entries and also fastest @@ -74,7 +74,7 @@ typedef struct { typedef struct { const HashPointers p; HashCounts counts; -} hash_t; +} Hash; typedef struct { long start; @@ -88,7 +88,6 @@ typedef struct { bool last; } Worker; - typedef struct { int64_t packedSumCount; int32_t min; @@ -140,17 +139,17 @@ typedef struct { void prep_workers(Worker *workers, int numWorkers, bool warmup, int fd, struct stat *fileStat); void process(int id, Worker *workers, int numWorkers, int fd, Results *out); void start_worker(Worker *w, Results *out); -void process_chunk(const void * const restrict base, const unsigned int * offsets, hash_t * restrict h); -__m256i process_long(const void * const restrict start, hash_t * restrict h, int * restrict semicolonBytesOut); +void process_chunk(const void * const restrict base, const uint32_t * offsets, Hash * restrict h); +__m256i process_long(const void * const restrict start, Hash * restrict h, int * restrict semicolonBytesOut); inline __m256i hash_cities(__m256i a, __m256i b, __m256i c, __m256i d, __m256i e, __m256i f, __m256i g, __m256i h); inline int hash_city(__m256i str); -inline long insert_city(hash_t * restrict h, long hash, const __m256i maskedCity); -int insert_city_long1(hash_t * restrict h, int hash, __m256i seg0, __m256i seg1); -int insert_city_long2(hash_t * restrict h, int hash, __m256i seg0, __m256i seg1, __m256i seg2); -int insert_city_long3(hash_t * restrict h, int hash, __m256i seg0, __m256i seg1, __m256i seg2, __m256i seg3); +inline long insert_city(Hash * restrict h, long hash, const __m256i maskedCity); +int insert_city_long1(Hash * restrict h, int hash, __m256i seg0, __m256i seg1); +int insert_city_long2(Hash * restrict h, int hash, __m256i seg0, __m256i seg1, __m256i seg2); +int insert_city_long3(Hash * restrict h, int hash, __m256i seg0, __m256i seg1, __m256i seg2, __m256i seg3); void merge(Results * restrict dst, Results * restrict src); int sort_result(const void *a, const void *b, void *arg); -unsigned int find_next_row(const void *data, unsigned int offset); +uint32_t find_next_row(const void *data, uint32_t offset); void print_results(Results *results); void debug_results(Results *results); inline __m256i city_from_long_hash(int hashValue); @@ -413,14 +412,16 @@ void merge(Results * restrict dst, Results * restrict src) { void process(int id, Worker * workers, int numWorkers, int fdOut, Results *out) { TIMER_INIT(); - int max_k = 8; + // processes at this point never process chunks, they only fork() child processes and then merge the results + // assign children process to chunks if there are fewer than max_k workers to create + // otherwise children process recurse to here and branch out again as necessary + const int max_k = 8; const bool doWork = numWorkers <= max_k; const int k = doWork ? numWorkers : (numWorkers + (max_k - 1)) / max_k; int fd[k][2]; struct pollfd poll_fds[k]; - void *tmp = mmap(NULL, RESULTS_MEMORY_SIZE * k, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0); Results *childResults[k]; for (int i = 0; i < k; i++) { @@ -471,7 +472,6 @@ void process(int id, Worker * workers, int numWorkers, int fdOut, Results *out) new_id += n; } - int childrenFinished = 0; while(childrenFinished < k) { poll(poll_fds, k, -1); @@ -527,7 +527,7 @@ void setup_results(Results *r) { } -void convert_hash_to_results(hash_t * restrict hash, Results * restrict out) { +void convert_hash_to_results(Hash * restrict hash, Results * restrict out) { out->numCities = hash->counts.numCities; out->numLongCities = 0; @@ -572,7 +572,6 @@ void convert_hash_to_results(hash_t * restrict hash, Results * restrict out) { } } - void start_worker(Worker *w, Results *out) { TIMER_INIT(); @@ -600,7 +599,7 @@ void start_worker(Worker *w, Results *out) { void * hashedCitiesLong = hashData; hashData += HASHED_CITIES_LONG_SIZE; - hash_t hash = {{packedOffsets, hashedCities, hashedStorage, hashedCitiesLong}, {0, 0}}; + Hash hash = {{packedOffsets, hashedCities, hashedStorage, hashedCitiesLong}, {0, 0}}; void * const data = mmap(NULL, MMAP_DATA_SIZE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); @@ -616,8 +615,8 @@ void start_worker(Worker *w, Results *out) { bool first = w->first && start == w->start; bool last = w->last && end == w->end; - unsigned int chunk_size = (unsigned int)(end - start); - unsigned int mapped_file_length = last ? PAGE_CEIL(chunk_size) : chunk_size + PAGE_SIZE; + uint32_t chunk_size = (uint32_t)(end - start); + uint32_t mapped_file_length = last ? PAGE_CEIL(chunk_size) : chunk_size + PAGE_SIZE; mmap(data + DUMMY_SIZE, mapped_file_length, PROT_READ, MAP_PRIVATE | MAP_FIXED, w->fd, start); @@ -633,7 +632,7 @@ void start_worker(Worker *w, Results *out) { } - unsigned int offsets[STRIDE + 1]; + uint32_t offsets[STRIDE + 1]; if (first) { offsets[0] = DUMMY_SIZE; } @@ -651,10 +650,10 @@ void start_worker(Worker *w, Results *out) { TIMER_MS_NUM("convert", w->workerId); } -__attribute__((aligned(4096))) void process_chunk(const void * const restrict base, const unsigned int * offsets, hash_t * restrict hashOut) { - alignas(64) hash_t hash = *hashOut; - alignas(64) long nums[STRIDE]; - alignas(32) unsigned int starts[STRIDE]; +__attribute__((aligned(4096))) void process_chunk(const void * const restrict base, const uint32_t * offsets, Hash * restrict hashOut) { + alignas(64) Hash hash = *hashOut; + alignas(64) uint64_t nums[STRIDE]; + alignas(32) uint32_t starts[STRIDE]; bool checkFinished; __m256i starts_v = _mm256_loadu_si256((__m256i *)offsets); @@ -842,7 +841,7 @@ __attribute__((aligned(4096))) void process_chunk(const void * const restrict ba mulled = _mm256_srli_epi32(mulled, 22); __m256i final = _mm256_sign_epi32(mulled, minus_mask); - // scale+offset hashs in the store/load to avoid register intermediates + // scale+offset the hashs in the store/load to avoid register intermediates // long instead of int to advoid unecessary sign extends long hash0 = insert_city(&hash, _mm256_extract_epi32(city_hashes, 0), maskedCity0); __m128i vals0 = _mm_load_si128(hash.p.hashedStorage + 4*hash0 + 16*0); @@ -922,10 +921,11 @@ __attribute__((aligned(4096))) void process_chunk(const void * const restrict ba } int hash_long(long x, long y) { - long seed = 0x9e3779b97f4a7c15; + long seed = 0x9e3779b97f4a7c15; // ~fxhash return ((_lrotl(x * seed, 5) ^ y) * seed) & HASH_LONG_MASK; } -__m256i process_long(const void * const restrict start, hash_t * restrict h, int * restrict semicolonBytesOut) { + +__m256i process_long(const void * const restrict start, Hash * restrict h, int * restrict semicolonBytesOut) { __m256i seg0 = _mm256_loadu_si256(start + 0); __m256i seg1 = _mm256_loadu_si256(start + 32); __m256i seg2 = _mm256_loadu_si256(start + 64); @@ -994,7 +994,7 @@ __attribute__((always_inline)) inline int hash_city(__m256i str) { return _mm256_extract_epi32(hash, 0); } -__attribute__((always_inline)) inline long insert_city(hash_t * restrict h, long hash, const __m256i maskedCity) { +__attribute__((always_inline)) inline long insert_city(Hash * restrict h, long hash, const __m256i maskedCity) { while (1) { __m256i stored = _mm256_load_si256(h->p.hashedCities + hash); @@ -1021,7 +1021,7 @@ __attribute__((always_inline)) inline long insert_city(hash_t * restrict h, long } } -int insert_city_long1(hash_t * restrict hash, int hash_value, __m256i seg0, __m256i seg1) { +int insert_city_long1(Hash * restrict hash, int hash_value, __m256i seg0, __m256i seg1) { while (1) { __m256i stored0 = _mm256_loadu_si256(hash->p.hashedCitiesLong + hash_value + 0); __m256i stored1 = _mm256_loadu_si256(hash->p.hashedCitiesLong + hash_value + 32); @@ -1043,7 +1043,7 @@ int insert_city_long1(hash_t * restrict hash, int hash_value, __m256i seg0, __m2 } } -int insert_city_long2(hash_t * restrict hash, int hash_value, __m256i seg0, __m256i seg1, __m256i seg2) { +int insert_city_long2(Hash * restrict hash, int hash_value, __m256i seg0, __m256i seg1, __m256i seg2) { while (1) { __m256i stored0 = _mm256_loadu_si256(hash->p.hashedCitiesLong + hash_value + 0); __m256i stored1 = _mm256_loadu_si256(hash->p.hashedCitiesLong + hash_value + 32); @@ -1068,7 +1068,7 @@ int insert_city_long2(hash_t * restrict hash, int hash_value, __m256i seg0, __m2 } } -int insert_city_long3(hash_t * restrict hash, int hash_value, __m256i seg0, __m256i seg1, __m256i seg2, __m256i seg3) { +int insert_city_long3(Hash * restrict hash, int hash_value, __m256i seg0, __m256i seg1, __m256i seg2, __m256i seg3) { while (1) { __m256i stored0 = _mm256_loadu_si256(hash->p.hashedCitiesLong + hash_value + 0); __m256i stored1 = _mm256_loadu_si256(hash->p.hashedCitiesLong + hash_value + 32); @@ -1151,10 +1151,10 @@ void print_results(Results *results) { fputs(buffer, stdout); } -unsigned int find_next_row(const void *data, unsigned int offset) { +uint32_t find_next_row(const void *data, uint32_t offset) { __m256i newlines = _mm256_set1_epi8('\n'); __m256i chars = _mm256_loadu_si256(data + offset); - unsigned int bytes = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(chars, newlines))); + uint32_t bytes = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(chars, newlines))); if (likely(bytes < 32)) { return offset + bytes + 1; }