Skip to content

Commit

Permalink
a bit of readability, switch to c23
Browse files Browse the repository at this point in the history
  • Loading branch information
austindonisan committed Feb 23, 2024
1 parent cabb23e commit 733fb4f
Showing 1 changed file with 82 additions and 88 deletions.
170 changes: 82 additions & 88 deletions 1brc.c
Original file line number Diff line number Diff line change
@@ -1,23 +1,15 @@
#define _GNU_SOURCE
#include <fcntl.h>
#include <features.h>
#include <immintrin.h>
#include <limits.h>
#include <math.h>
#include <poll.h>
#include <sched.h>
#include <stdalign.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#include <x86intrin.h>

/*
Expand All @@ -38,8 +30,11 @@
/*
* Print timing information and city summary to stderr.
*/
#define DEBUG 0
#define DEBUG 1

#define HASH_SHIFT 17 // 17 is a happy compromise between non-10k/10k: 16 is 1% faster/10% slower; 18 is 1% slower/3% faster
#define HASH_LONG_SHIFT 14 // 14 is requried to fit 10k entries and also fastest
#define HASH_RESULT_SHIFT 14 // 14 is required to fit 10k entries and also fastest

#define MAX_CITIES 10000
#define MAX_TEMP 999
Expand All @@ -48,21 +43,19 @@
#define SHORT_CITY_LENGTH 32
#define LONG_CITY_LENGTH 128

#define HASH_SHIFT 17 // 16 is 1% faster for non-10k, 17 is 10% faster for 10k
#define HASH_LONG_SHIFT 14 // 14 is requried to fit 10k entries
#define HASH_RESULT_SHIFT 14

// wrapping and fitting nicely in pages is better than extra buffer at the end
#define HASH_ENTRIES (1 << HASH_SHIFT)
#define HASH_LONG_ENTRIES (1 << HASH_LONG_SHIFT)

// 32 byte AVX2 registers can fit 8 values at once
// going up/down to 64/16 byte AXV512/SSE requires code changes, too
#define STRIDE 8

typedef struct {
int64_t packed_sum;
int64_t packedSum;
int32_t min;
int32_t max;
} hash_entry_t;
} HashEntry;

typedef struct {
int * const restrict packedOffsets;
Expand All @@ -85,7 +78,7 @@ typedef struct {
long start;
long end;
int fd;
int worker_id;
int workerId;
int cpuId;
bool fork;
bool warmup;
Expand Down Expand Up @@ -142,8 +135,8 @@ typedef struct {
LongCity * restrict longCities;
} Results;

void prep_workers(worker_t *workers, int num_workers, bool warmup, int fd, struct stat *fileStat);
void process(int id, worker_t * workers, int num_workers, int fd, Results *out);
void prep_workers(worker_t *workers, int numWorkers, bool warmup, int fd, struct stat *fileStat);
void process(int id, worker_t * workers, int numWorkers, int fd, Results *out);
void start_worker(worker_t *w, Results *out);
void process_chunk(const void * const restrict base, const unsigned int * offsets, hash_t * restrict h);
__m256i process_long(const void * const restrict start, hash_t * restrict h, int * restrict semicolonBytesOut);
Expand All @@ -158,9 +151,7 @@ int sort_result(const void *a, const void *b, void *arg);
unsigned int find_next_row(const void *data, unsigned int offset);
void print_results(Results *results);
void debug_results(Results *results);
inline int hash_to_offset(int hash, int streamIdx);
inline __m256i city_from_long_hash(int hashValue);
inline int long_hash_from_city(__m256i city);
inline void setup_results(Results *r);
inline bool city_is_long(PackedCity city);
void print256(__m256i var);
Expand Down Expand Up @@ -207,7 +198,7 @@ void print256(__m256i var);
#define LINE_TRUNC(v) ((v) & (LINE_MASK))
#define LINE_CEIL(v) (LINE_TRUNC(v + LINE_SIZE - 1))

#define HASH_ENTRY_SIZE ((int)(STRIDE * sizeof(hash_entry_t)))
#define HASH_ENTRY_SIZE ((int)(STRIDE * sizeof(HashEntry)))

#define HASH_DATA_OFFSET 5 // log2(HASH_DATA_ENTRY_WIDTH)
#define HASH_CITY_OFFSET 5 // log2(SHORT_CITY_LENGTH)
Expand Down Expand Up @@ -249,29 +240,19 @@ void print256(__m256i var);

#define LONG_CITY_SENTINEL 0xFACADE00

alignas(32) const void * const MASKED_DUMMY = (char []){
0 ,'A','D', 0 , 0 , 0 , 0 , 0 ,
0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ,
0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ,
0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 ,
alignas(32) const void * const MASKED_DUMMY = (long []){
'A' << 8 | 'D' << 16, 0, 0, 0
};

alignas(64) const void * const CITY_MASK = (char []){
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0,
};
alignas(64) const void * const CITY_MASK = (long []){
-1, -1, -1, -1, 0, 0, 0, 0,
};

int main(int argc, char** argv) {
TIMER_INIT();

if (argc < 3 || argc > 4) {
fprintf(stderr, "Usage: good file workers [warmup]\n");
fprintf(stderr, "Usage: 1brc file workers [warmup]\n");
return EXIT_FAILURE;
}

Expand All @@ -289,34 +270,34 @@ int main(int argc, char** argv) {
return EXIT_FAILURE;
}

int num_workers = atoi(argv[2]);
if (num_workers < 1 || num_workers > 256) {
int numWorkers = atoi(argv[2]);
if (numWorkers < 1 || numWorkers > 256) {
fprintf(stderr, "workers must be between 1 and 256\n");
return EXIT_FAILURE;
}

const bool warmup = DEBUG && (argc < 4 ? false : atoi(argv[3]) != 0);

if ((fileStat.st_size - 1) / PAGE_SIZE < num_workers) {
D(fprintf(stderr, "decreasing num_workers to %ld\n", fileStat.st_size / PAGE_SIZE + 1);)
num_workers = (int) (fileStat.st_size / PAGE_SIZE) + 1;
if ((fileStat.st_size - 1) / PAGE_SIZE < numWorkers) {
D(fprintf(stderr, "decreasing numWorkers to %ld\n", fileStat.st_size / PAGE_SIZE + 1);)
numWorkers = (int) (fileStat.st_size / PAGE_SIZE) + 1;
}

void *mem = mmap(NULL, RESULTS_MEMORY_SIZE + sizeof(worker_t) * num_workers, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
void *mem = mmap(NULL, RESULTS_MEMORY_SIZE + sizeof(worker_t) * numWorkers, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
Results *results = mem;
setup_results(results);

mem += RESULTS_MEMORY_SIZE;

worker_t *workers = mem;
prep_workers(workers, num_workers, warmup, fd, &fileStat);
prep_workers(workers, numWorkers, warmup, fd, &fileStat);

TIMER_RESET();
if (UNMAP && num_workers == 1) {
if (UNMAP && numWorkers == 1) {
start_worker(workers, results);
}
else {
process(0, workers, num_workers, -1, results);
process(0, workers, numWorkers, -1, results);
}
TIMER_MS("process");

Expand All @@ -332,33 +313,33 @@ int main(int argc, char** argv) {
return 0;
}

void prep_workers(worker_t *workers, int num_workers, bool warmup, int fd, struct stat *fileStat) {
void prep_workers(worker_t *workers, int numWorkers, bool warmup, int fd, struct stat *fileStat) {
cpu_set_t cpuset;
CPU_ZERO(&cpuset);
sched_getaffinity(0, sizeof(cpu_set_t), &cpuset);
int numCpus = CPU_COUNT(&cpuset);

if (numCpus < num_workers) {
fprintf(stderr, "%d threads is less than %d available CPUS\n", num_workers, numCpus);
if (numCpus < numWorkers) {
fprintf(stderr, "%d threads is less than %d available CPUS\n", numWorkers, numCpus);
exit(1);
}

long cpu = 0;
long start = 0;
long delta = PAGE_TRUNC(fileStat->st_size / num_workers);
for (int i = 0; i < num_workers; i++) {
long delta = PAGE_TRUNC(fileStat->st_size / numWorkers);
for (int i = 0; i < numWorkers; i++) {
while (!CPU_ISSET(cpu, &cpuset)) {
cpu++;
}

worker_t *w = workers + i;
w->worker_id = i;
w->workerId = i;
w->cpuId = cpu++;
w->fd = fd;
w->start = start;
w->end = (start += delta);
w->first = i == 0;
w->last = i == num_workers - 1;
w->last = i == numWorkers - 1;
if (w->last) {
w->end = fileStat->st_size;
}
Expand Down Expand Up @@ -427,12 +408,12 @@ void merge(Results * restrict dst, Results * restrict src) {
}
}

void process(int id, worker_t * workers, int num_workers, int fdOut, Results *out) {
void process(int id, worker_t * workers, int numWorkers, int fdOut, Results *out) {
TIMER_INIT();

int max_k = 8;
const bool doWork = num_workers <= max_k;
const int k = doWork ? num_workers : (num_workers + (max_k - 1)) / max_k;
const bool doWork = numWorkers <= max_k;
const int k = doWork ? numWorkers : (numWorkers + (max_k - 1)) / max_k;

int fd[k][2];
struct pollfd poll_fds[k];
Expand All @@ -447,12 +428,15 @@ void process(int id, worker_t * workers, int num_workers, int fdOut, Results *ou

int new_id = id;
for (int i = 0; i < k; i++) {
pipe(fd[i]);
if (pipe(fd[i])) {
perror("pipe");
exit(1);
}
poll_fds[i].fd = fd[i][0];
poll_fds[i].events = POLLIN;

int n = (num_workers + ((k-i)/2)) / (k - i);
num_workers -= n;
int n = (numWorkers + ((k-i)/2)) / (k - i);
numWorkers -= n;

if (fork() == 0) {
close(fd[i][0]);
Expand All @@ -468,7 +452,11 @@ void process(int id, worker_t * workers, int num_workers, int fdOut, Results *ou
}

start_worker(workers + new_id, childResults[i]);
write(fd[i][1], "0", 1);
if (write(fd[i][1], "0", 1) < 0) {
perror("write");
exit(1);
}

exit(0);
}

Expand Down Expand Up @@ -502,7 +490,10 @@ void process(int id, worker_t * workers, int num_workers, int fdOut, Results *ou


if (fdOut != -1) {
write(fdOut, "0", 1);
if (write(fdOut, "0", 1) < 0) {
perror("parrent write");
exit(1);
}
if (UNMAP) {
while(wait(NULL) != -1) {}
}
Expand Down Expand Up @@ -593,7 +584,7 @@ void start_worker(worker_t *w, Results *out) {

madvise(hashData + PACKED_OFFSETS_SIZE, HASHED_CITIES_SIZE + HASHED_DATA_SIZE + HASHED_CITIES_LONG_SIZE, MADV_HUGEPAGE);
madvise(hashData, HASH_MEMORY_SIZE, MADV_POPULATE_WRITE);
TIMER_MS_NUM("mmap", w->worker_id);
TIMER_MS_NUM("mmap", w->workerId);

int * packedOffsets = hashData;
hashData += PACKED_OFFSETS_SIZE;
Expand Down Expand Up @@ -635,7 +626,7 @@ void start_worker(worker_t *w, Results *out) {
}
volatile long dummy2 = dummy;
(void)dummy2;
TIMER_MS_NUM("warmup", w->worker_id);
TIMER_MS_NUM("warmup", w->workerId);
TIMER_RESET();
}

Expand All @@ -650,12 +641,12 @@ void start_worker(worker_t *w, Results *out) {
offsets[STRIDE] = last ? chunk_size + DUMMY_SIZE : find_next_row(data, chunk_size + DUMMY_SIZE);

process_chunk(data, offsets, &hash);
TIMER_MS_NUM("chunk", w->worker_id);
TIMER_MS_NUM("chunk", w->workerId);
}

TIMER_RESET();
convert_hash_to_results(&hash, out);
TIMER_MS_NUM("convert", w->worker_id);
TIMER_MS_NUM("convert", w->workerId);
}

__attribute__((aligned(4096))) void process_chunk(const void * const restrict base, const unsigned int * offsets, hash_t * restrict hashOut) {
Expand Down Expand Up @@ -702,14 +693,15 @@ __attribute__((aligned(4096))) void process_chunk(const void * const restrict ba
__m256i rawCity6 = _mm256_loadu_si256(base + starts[6]);
__m256i rawCity7 = _mm256_loadu_si256(base + starts[7]);

int semicolonBytes0 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity0, _mm256_set1_epi8(';'))));
int semicolonBytes1 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity1, _mm256_set1_epi8(';'))));
int semicolonBytes2 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity2, _mm256_set1_epi8(';'))));
int semicolonBytes3 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity3, _mm256_set1_epi8(';'))));
int semicolonBytes4 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity4, _mm256_set1_epi8(';'))));
int semicolonBytes5 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity5, _mm256_set1_epi8(';'))));
int semicolonBytes6 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity6, _mm256_set1_epi8(';'))));
int semicolonBytes7 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity7, _mm256_set1_epi8(';'))));
__m256i semicolons = _mm256_set1_epi8(';');
int semicolonBytes0 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity0, semicolons)));
int semicolonBytes1 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity1, semicolons)));
int semicolonBytes2 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity2, semicolons)));
int semicolonBytes3 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity3, semicolons)));
int semicolonBytes4 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity4, semicolons)));
int semicolonBytes5 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity5, semicolons)));
int semicolonBytes6 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity6, semicolons)));
int semicolonBytes7 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(rawCity7, semicolons)));

// 127 keeps the the opcode small
_mm_prefetch(base + starts[0] + semicolonBytes0 + 127, _MM_HINT_NTA);
Expand Down Expand Up @@ -936,9 +928,11 @@ __m256i process_long(const void * const restrict start, hash_t * restrict h, int
__m256i seg1 = _mm256_loadu_si256(start + 32);
__m256i seg2 = _mm256_loadu_si256(start + 64);
__m256i seg3 = _mm256_loadu_si256(start + 96);
int semicolonBytes1 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(seg1, _mm256_set1_epi8(';'))));
int semicolonBytes2 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(seg2, _mm256_set1_epi8(';'))));
int semicolonBytes3 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(seg3, _mm256_set1_epi8(';'))));

__m256i semicolons = _mm256_set1_epi8(';');
int semicolonBytes1 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(seg1, semicolons)));
int semicolonBytes2 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(seg2, semicolons)));
int semicolonBytes3 = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(seg3, semicolons)));

int hash = hash_long(*(long *)start, *((long *)start + 1));

Expand Down Expand Up @@ -1197,22 +1191,22 @@ __attribute__((always_inline)) inline __m256i city_from_long_hash(int hashValue)
return _mm256_set_epi32(0, 0, 0, 0, 0, 0, hashValue, LONG_CITY_SENTINEL);
}

__attribute__((always_inline)) inline int long_hash_from_city(__m256i city) {
if (_mm256_extract_epi64(city, 0) == 0xDEADBEEF00000000) {
return _mm256_extract_epi32(city, 2);
}
return -1;
}

__attribute__((always_inline)) inline int hash_to_offset(int hash, int streamIdx) {
return hash * (int)(HASH_ENTRY_SIZE / SHORT_CITY_LENGTH) + streamIdx * (int)sizeof(hash_entry_t);
}

__attribute__((always_inline)) inline bool city_is_long(PackedCity city) {
return city.longRef.sentinel == LONG_CITY_SENTINEL;
}

void print256(__m256i var) {
char val[32];
memcpy(val, &var, sizeof(val));
fprintf(stderr, "%02x %02x %02x %02x %02x %02x %02x %02x | %02x %02x %02x %02x %02x %02x %02x %02x | %02x %02x %02x %02x %02x %02x %02x %02x | %02x %02x %02x %02x %02x %02x %02x %02x\n", 0xFF & val[0], 0xFF & val[1], 0xFF & val[2], 0xFF & val[3], 0xFF & val[4], 0xFF & val[5], 0xFF & val[6], 0xFF & val[7], 0xFF & val[8], 0xFF & val[9], 0xFF & val[10], 0xFF & val[11], 0xFF & val[12], 0xFF & val[13], 0xFF & val[14], 0xFF & val[15], 0xFF & val[16], 0xFF & val[17], 0xFF & val[18], 0xFF & val[19], 0xFF & val[20], 0xFF & val[21], 0xFF & val[22], 0xFF & val[23], 0xFF & val[24], 0xFF & val[25], 0xFF & val[26], 0xFF & val[27], 0xFF & val[28], 0xFF & val[29], 0xFF & val[30], 0xFF & val[31]);
fprintf(
stderr,
"%02x %02x %02x %02x %02x %02x %02x %02x | %02x %02x %02x %02x %02x %02x %02x %02x | %02x %02x %02x %02x %02x %02x %02x %02x | %02x %02x %02x %02x %02x %02x %02x %02x\n",
0xFF & val[0], 0xFF & val[1], 0xFF & val[2], 0xFF & val[3],
0xFF & val[4], 0xFF & val[5], 0xFF & val[6], 0xFF & val[7],
0xFF & val[8], 0xFF & val[9], 0xFF & val[10], 0xFF & val[11],
0xFF & val[12], 0xFF & val[13], 0xFF & val[14], 0xFF & val[15],
0xFF & val[16], 0xFF & val[17], 0xFF & val[18], 0xFF & val[19],
0xFF & val[20], 0xFF & val[21], 0xFF & val[22], 0xFF & val[23],
0xFF & val[24], 0xFF & val[25], 0xFF & val[26], 0xFF & val[27],
0xFF & val[28], 0xFF & val[29], 0xFF & val[30], 0xFF & val[31]);
}

0 comments on commit 733fb4f

Please sign in to comment.