From 733fb4f1b1548b77c61ea9437d646465553e6cd6 Mon Sep 17 00:00:00 2001 From: Austin Donisan Date: Thu, 22 Feb 2024 23:35:51 -0800 Subject: [PATCH] a bit of readability, switch to c23 --- 1brc.c | 170 ++++++++++++++++++++++++++++----------------------------- 1 file changed, 82 insertions(+), 88 deletions(-) diff --git a/1brc.c b/1brc.c index 53f7cd2..3699203 100644 --- a/1brc.c +++ b/1brc.c @@ -1,23 +1,15 @@ #define _GNU_SOURCE #include -#include -#include -#include #include #include #include -#include -#include #include #include -#include #include #include #include -#include #include #include -#include #include /* @@ -38,8 +30,11 @@ /* * Print timing information and city summary to stderr. */ -#define DEBUG 0 +#define DEBUG 1 +#define HASH_SHIFT 17 // 17 is a happy compromise between non-10k/10k: 16 is 1% faster/10% slower; 18 is 1% slower/3% faster +#define HASH_LONG_SHIFT 14 // 14 is requried to fit 10k entries and also fastest +#define HASH_RESULT_SHIFT 14 // 14 is required to fit 10k entries and also fastest #define MAX_CITIES 10000 #define MAX_TEMP 999 @@ -48,21 +43,19 @@ #define SHORT_CITY_LENGTH 32 #define LONG_CITY_LENGTH 128 -#define HASH_SHIFT 17 // 16 is 1% faster for non-10k, 17 is 10% faster for 10k -#define HASH_LONG_SHIFT 14 // 14 is requried to fit 10k entries -#define HASH_RESULT_SHIFT 14 - // wrapping and fitting nicely in pages is better than extra buffer at the end #define HASH_ENTRIES (1 << HASH_SHIFT) #define HASH_LONG_ENTRIES (1 << HASH_LONG_SHIFT) +// 32 byte AVX2 registers can fit 8 values at once +// going up/down to 64/16 byte AXV512/SSE requires code changes, too #define STRIDE 8 typedef struct { - int64_t packed_sum; + int64_t packedSum; int32_t min; int32_t max; -} hash_entry_t; +} HashEntry; typedef struct { int * const restrict packedOffsets; @@ -85,7 +78,7 @@ typedef struct { long start; long end; int fd; - int worker_id; + int workerId; int cpuId; bool fork; bool warmup; @@ -142,8 +135,8 @@ typedef struct { LongCity * restrict longCities; } Results; -void prep_workers(worker_t *workers, int num_workers, bool warmup, int fd, struct stat *fileStat); -void process(int id, worker_t * workers, int num_workers, int fd, Results *out); +void prep_workers(worker_t *workers, int numWorkers, bool warmup, int fd, struct stat *fileStat); +void process(int id, worker_t * workers, int numWorkers, int fd, Results *out); void start_worker(worker_t *w, Results *out); void process_chunk(const void * const restrict base, const unsigned int * offsets, hash_t * restrict h); __m256i process_long(const void * const restrict start, hash_t * restrict h, int * restrict semicolonBytesOut); @@ -158,9 +151,7 @@ int sort_result(const void *a, const void *b, void *arg); unsigned int find_next_row(const void *data, unsigned int offset); void print_results(Results *results); void debug_results(Results *results); -inline int hash_to_offset(int hash, int streamIdx); inline __m256i city_from_long_hash(int hashValue); -inline int long_hash_from_city(__m256i city); inline void setup_results(Results *r); inline bool city_is_long(PackedCity city); void print256(__m256i var); @@ -207,7 +198,7 @@ void print256(__m256i var); #define LINE_TRUNC(v) ((v) & (LINE_MASK)) #define LINE_CEIL(v) (LINE_TRUNC(v + LINE_SIZE - 1)) -#define HASH_ENTRY_SIZE ((int)(STRIDE * sizeof(hash_entry_t))) +#define HASH_ENTRY_SIZE ((int)(STRIDE * sizeof(HashEntry))) #define HASH_DATA_OFFSET 5 // log2(HASH_DATA_ENTRY_WIDTH) #define HASH_CITY_OFFSET 5 // log2(SHORT_CITY_LENGTH) @@ -249,29 +240,19 @@ void print256(__m256i var); #define LONG_CITY_SENTINEL 0xFACADE00 -alignas(32) const void * const MASKED_DUMMY = (char []){ - 0 ,'A','D', 0 , 0 , 0 , 0 , 0 , - 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , - 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , - 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , +alignas(32) const void * const MASKED_DUMMY = (long []){ + 'A' << 8 | 'D' << 16, 0, 0, 0 }; -alignas(64) const void * const CITY_MASK = (char []){ - 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, - 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, - 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, - 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, - 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, - }; +alignas(64) const void * const CITY_MASK = (long []){ + -1, -1, -1, -1, 0, 0, 0, 0, +}; int main(int argc, char** argv) { TIMER_INIT(); if (argc < 3 || argc > 4) { - fprintf(stderr, "Usage: good file workers [warmup]\n"); + fprintf(stderr, "Usage: 1brc file workers [warmup]\n"); return EXIT_FAILURE; } @@ -289,34 +270,34 @@ int main(int argc, char** argv) { return EXIT_FAILURE; } - int num_workers = atoi(argv[2]); - if (num_workers < 1 || num_workers > 256) { + int numWorkers = atoi(argv[2]); + if (numWorkers < 1 || numWorkers > 256) { fprintf(stderr, "workers must be between 1 and 256\n"); return EXIT_FAILURE; } const bool warmup = DEBUG && (argc < 4 ? false : atoi(argv[3]) != 0); - if ((fileStat.st_size - 1) / PAGE_SIZE < num_workers) { - D(fprintf(stderr, "decreasing num_workers to %ld\n", fileStat.st_size / PAGE_SIZE + 1);) - num_workers = (int) (fileStat.st_size / PAGE_SIZE) + 1; + if ((fileStat.st_size - 1) / PAGE_SIZE < numWorkers) { + D(fprintf(stderr, "decreasing numWorkers to %ld\n", fileStat.st_size / PAGE_SIZE + 1);) + numWorkers = (int) (fileStat.st_size / PAGE_SIZE) + 1; } - void *mem = mmap(NULL, RESULTS_MEMORY_SIZE + sizeof(worker_t) * num_workers, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); + void *mem = mmap(NULL, RESULTS_MEMORY_SIZE + sizeof(worker_t) * numWorkers, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0); Results *results = mem; setup_results(results); mem += RESULTS_MEMORY_SIZE; worker_t *workers = mem; - prep_workers(workers, num_workers, warmup, fd, &fileStat); + prep_workers(workers, numWorkers, warmup, fd, &fileStat); TIMER_RESET(); - if (UNMAP && num_workers == 1) { + if (UNMAP && numWorkers == 1) { start_worker(workers, results); } else { - process(0, workers, num_workers, -1, results); + process(0, workers, numWorkers, -1, results); } TIMER_MS("process"); @@ -332,33 +313,33 @@ int main(int argc, char** argv) { return 0; } -void prep_workers(worker_t *workers, int num_workers, bool warmup, int fd, struct stat *fileStat) { +void prep_workers(worker_t *workers, int numWorkers, bool warmup, int fd, struct stat *fileStat) { cpu_set_t cpuset; CPU_ZERO(&cpuset); sched_getaffinity(0, sizeof(cpu_set_t), &cpuset); int numCpus = CPU_COUNT(&cpuset); - if (numCpus < num_workers) { - fprintf(stderr, "%d threads is less than %d available CPUS\n", num_workers, numCpus); + if (numCpus < numWorkers) { + fprintf(stderr, "%d threads is less than %d available CPUS\n", numWorkers, numCpus); exit(1); } long cpu = 0; long start = 0; - long delta = PAGE_TRUNC(fileStat->st_size / num_workers); - for (int i = 0; i < num_workers; i++) { + long delta = PAGE_TRUNC(fileStat->st_size / numWorkers); + for (int i = 0; i < numWorkers; i++) { while (!CPU_ISSET(cpu, &cpuset)) { cpu++; } worker_t *w = workers + i; - w->worker_id = i; + w->workerId = i; w->cpuId = cpu++; w->fd = fd; w->start = start; w->end = (start += delta); w->first = i == 0; - w->last = i == num_workers - 1; + w->last = i == numWorkers - 1; if (w->last) { w->end = fileStat->st_size; } @@ -427,12 +408,12 @@ void merge(Results * restrict dst, Results * restrict src) { } } -void process(int id, worker_t * workers, int num_workers, int fdOut, Results *out) { +void process(int id, worker_t * workers, int numWorkers, int fdOut, Results *out) { TIMER_INIT(); int max_k = 8; - const bool doWork = num_workers <= max_k; - const int k = doWork ? num_workers : (num_workers + (max_k - 1)) / max_k; + const bool doWork = numWorkers <= max_k; + const int k = doWork ? numWorkers : (numWorkers + (max_k - 1)) / max_k; int fd[k][2]; struct pollfd poll_fds[k]; @@ -447,12 +428,15 @@ void process(int id, worker_t * workers, int num_workers, int fdOut, Results *ou int new_id = id; for (int i = 0; i < k; i++) { - pipe(fd[i]); + if (pipe(fd[i])) { + perror("pipe"); + exit(1); + } poll_fds[i].fd = fd[i][0]; poll_fds[i].events = POLLIN; - int n = (num_workers + ((k-i)/2)) / (k - i); - num_workers -= n; + int n = (numWorkers + ((k-i)/2)) / (k - i); + numWorkers -= n; if (fork() == 0) { close(fd[i][0]); @@ -468,7 +452,11 @@ void process(int id, worker_t * workers, int num_workers, int fdOut, Results *ou } start_worker(workers + new_id, childResults[i]); - write(fd[i][1], "0", 1); + if (write(fd[i][1], "0", 1) < 0) { + perror("write"); + exit(1); + } + exit(0); } @@ -502,7 +490,10 @@ void process(int id, worker_t * workers, int num_workers, int fdOut, Results *ou if (fdOut != -1) { - write(fdOut, "0", 1); + if (write(fdOut, "0", 1) < 0) { + perror("parrent write"); + exit(1); + } if (UNMAP) { while(wait(NULL) != -1) {} } @@ -593,7 +584,7 @@ void start_worker(worker_t *w, Results *out) { madvise(hashData + PACKED_OFFSETS_SIZE, HASHED_CITIES_SIZE + HASHED_DATA_SIZE + HASHED_CITIES_LONG_SIZE, MADV_HUGEPAGE); madvise(hashData, HASH_MEMORY_SIZE, MADV_POPULATE_WRITE); - TIMER_MS_NUM("mmap", w->worker_id); + TIMER_MS_NUM("mmap", w->workerId); int * packedOffsets = hashData; hashData += PACKED_OFFSETS_SIZE; @@ -635,7 +626,7 @@ void start_worker(worker_t *w, Results *out) { } volatile long dummy2 = dummy; (void)dummy2; - TIMER_MS_NUM("warmup", w->worker_id); + TIMER_MS_NUM("warmup", w->workerId); TIMER_RESET(); } @@ -650,12 +641,12 @@ void start_worker(worker_t *w, Results *out) { offsets[STRIDE] = last ? chunk_size + DUMMY_SIZE : find_next_row(data, chunk_size + DUMMY_SIZE); process_chunk(data, offsets, &hash); - TIMER_MS_NUM("chunk", w->worker_id); + TIMER_MS_NUM("chunk", w->workerId); } TIMER_RESET(); convert_hash_to_results(&hash, out); - TIMER_MS_NUM("convert", w->worker_id); + TIMER_MS_NUM("convert", w->workerId); } __attribute__((aligned(4096))) void process_chunk(const void * const restrict base, const unsigned int * offsets, hash_t * restrict hashOut) { @@ -702,14 +693,15 @@ __attribute__((aligned(4096))) void process_chunk(const void * const restrict ba __m256i rawCity6 = _mm256_loadu_si256(base + starts[6]); __m256i rawCity7 = _mm256_loadu_si256(base + starts[7]); - int semicolonBytes0 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity0, _mm256_set1_epi8(';')))); - int semicolonBytes1 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity1, _mm256_set1_epi8(';')))); - int semicolonBytes2 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity2, _mm256_set1_epi8(';')))); - int semicolonBytes3 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity3, _mm256_set1_epi8(';')))); - int semicolonBytes4 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity4, _mm256_set1_epi8(';')))); - int semicolonBytes5 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity5, _mm256_set1_epi8(';')))); - int semicolonBytes6 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity6, _mm256_set1_epi8(';')))); - int semicolonBytes7 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity7, _mm256_set1_epi8(';')))); + __m256i semicolons = _mm256_set1_epi8(';'); + int semicolonBytes0 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity0, semicolons))); + int semicolonBytes1 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity1, semicolons))); + int semicolonBytes2 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity2, semicolons))); + int semicolonBytes3 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity3, semicolons))); + int semicolonBytes4 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity4, semicolons))); + int semicolonBytes5 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity5, semicolons))); + int semicolonBytes6 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity6, semicolons))); + int semicolonBytes7 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity7, semicolons))); // 127 keeps the the opcode small _mm_prefetch(base + starts[0] + semicolonBytes0 + 127, _MM_HINT_NTA); @@ -936,9 +928,11 @@ __m256i process_long(const void * const restrict start, hash_t * restrict h, int __m256i seg1 = _mm256_loadu_si256(start + 32); __m256i seg2 = _mm256_loadu_si256(start + 64); __m256i seg3 = _mm256_loadu_si256(start + 96); - int semicolonBytes1 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(seg1, _mm256_set1_epi8(';')))); - int semicolonBytes2 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(seg2, _mm256_set1_epi8(';')))); - int semicolonBytes3 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(seg3, _mm256_set1_epi8(';')))); + + __m256i semicolons = _mm256_set1_epi8(';'); + int semicolonBytes1 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(seg1, semicolons))); + int semicolonBytes2 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(seg2, semicolons))); + int semicolonBytes3 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(seg3, semicolons))); int hash = hash_long(*(long *)start, *((long *)start + 1)); @@ -1197,22 +1191,22 @@ __attribute__((always_inline)) inline __m256i city_from_long_hash(int hashValue) return _mm256_set_epi32(0, 0, 0, 0, 0, 0, hashValue, LONG_CITY_SENTINEL); } -__attribute__((always_inline)) inline int long_hash_from_city(__m256i city) { - if (_mm256_extract_epi64(city, 0) == 0xDEADBEEF00000000) { - return _mm256_extract_epi32(city, 2); - } - return -1; -} - -__attribute__((always_inline)) inline int hash_to_offset(int hash, int streamIdx) { - return hash * (int)(HASH_ENTRY_SIZE / SHORT_CITY_LENGTH) + streamIdx * (int)sizeof(hash_entry_t); -} - __attribute__((always_inline)) inline bool city_is_long(PackedCity city) { return city.longRef.sentinel == LONG_CITY_SENTINEL; } + void print256(__m256i var) { char val[32]; memcpy(val, &var, sizeof(val)); - fprintf(stderr, "%02x %02x %02x %02x %02x %02x %02x %02x | %02x %02x %02x %02x %02x %02x %02x %02x | %02x %02x %02x %02x %02x %02x %02x %02x | %02x %02x %02x %02x %02x %02x %02x %02x\n", 0xFF & val[0], 0xFF & val[1], 0xFF & val[2], 0xFF & val[3], 0xFF & val[4], 0xFF & val[5], 0xFF & val[6], 0xFF & val[7], 0xFF & val[8], 0xFF & val[9], 0xFF & val[10], 0xFF & val[11], 0xFF & val[12], 0xFF & val[13], 0xFF & val[14], 0xFF & val[15], 0xFF & val[16], 0xFF & val[17], 0xFF & val[18], 0xFF & val[19], 0xFF & val[20], 0xFF & val[21], 0xFF & val[22], 0xFF & val[23], 0xFF & val[24], 0xFF & val[25], 0xFF & val[26], 0xFF & val[27], 0xFF & val[28], 0xFF & val[29], 0xFF & val[30], 0xFF & val[31]); + fprintf( + stderr, + "%02x %02x %02x %02x %02x %02x %02x %02x | %02x %02x %02x %02x %02x %02x %02x %02x | %02x %02x %02x %02x %02x %02x %02x %02x | %02x %02x %02x %02x %02x %02x %02x %02x\n", + 0xFF & val[0], 0xFF & val[1], 0xFF & val[2], 0xFF & val[3], + 0xFF & val[4], 0xFF & val[5], 0xFF & val[6], 0xFF & val[7], + 0xFF & val[8], 0xFF & val[9], 0xFF & val[10], 0xFF & val[11], + 0xFF & val[12], 0xFF & val[13], 0xFF & val[14], 0xFF & val[15], + 0xFF & val[16], 0xFF & val[17], 0xFF & val[18], 0xFF & val[19], + 0xFF & val[20], 0xFF & val[21], 0xFF & val[22], 0xFF & val[23], + 0xFF & val[24], 0xFF & val[25], 0xFF & val[26], 0xFF & val[27], + 0xFF & val[28], 0xFF & val[29], 0xFF & val[30], 0xFF & val[31]); }