Skip to content

Commit

Permalink
some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
austindonisan committed Feb 25, 2024
1 parent 9f44109 commit d92c437
Showing 1 changed file with 35 additions and 35 deletions.
70 changes: 35 additions & 35 deletions 1brc.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

/*
* wait() for all child processes before exiting.
* If 0, instead "cheat" by returning immediately, leaving orphan processes.
* If false, "cheat" by returning immediately, leaving orphan processes.
*/
#define UNMAP 0
#define UNMAP false

/*
* Pin worker threads to a CPU.
Expand All @@ -27,12 +27,12 @@
* Each worker is assigned to the lowest unused CPU # we're scheduled for.
* For hyperthreaded CPUs, logical CPU numbers must not be interleaved.
*/
#define PIN_CPU 1
#define PIN_CPU true

/*
* Print timing information and city summary to stderr.
*/
#define DEBUG 0
#define DEBUG false

#define HASH_SHIFT 17 // 17 is a happy compromise between non-10k/10k: 16 is 1% faster/10% slower; 18 is 1% slower/3% faster
#define HASH_LONG_SHIFT 14 // 14 is requried to fit 10k entries and also fastest
Expand Down Expand Up @@ -74,7 +74,7 @@ typedef struct {
typedef struct {
const HashPointers p;
HashCounts counts;
} hash_t;
} Hash;

typedef struct {
long start;
Expand All @@ -88,7 +88,6 @@ typedef struct {
bool last;
} Worker;


typedef struct {
int64_t packedSumCount;
int32_t min;
Expand Down Expand Up @@ -140,17 +139,17 @@ typedef struct {
void prep_workers(Worker *workers, int numWorkers, bool warmup, int fd, struct stat *fileStat);
void process(int id, Worker *workers, int numWorkers, int fd, Results *out);
void start_worker(Worker *w, Results *out);
void process_chunk(const void * const restrict base, const unsigned int * offsets, hash_t * restrict h);
__m256i process_long(const void * const restrict start, hash_t * restrict h, int * restrict semicolonBytesOut);
void process_chunk(const void * const restrict base, const uint32_t * offsets, Hash * restrict h);
__m256i process_long(const void * const restrict start, Hash * restrict h, int * restrict semicolonBytesOut);
inline __m256i hash_cities(__m256i a, __m256i b, __m256i c, __m256i d, __m256i e, __m256i f, __m256i g, __m256i h);
inline int hash_city(__m256i str);
inline long insert_city(hash_t * restrict h, long hash, const __m256i maskedCity);
int insert_city_long1(hash_t * restrict h, int hash, __m256i seg0, __m256i seg1);
int insert_city_long2(hash_t * restrict h, int hash, __m256i seg0, __m256i seg1, __m256i seg2);
int insert_city_long3(hash_t * restrict h, int hash, __m256i seg0, __m256i seg1, __m256i seg2, __m256i seg3);
inline long insert_city(Hash * restrict h, long hash, const __m256i maskedCity);
int insert_city_long1(Hash * restrict h, int hash, __m256i seg0, __m256i seg1);
int insert_city_long2(Hash * restrict h, int hash, __m256i seg0, __m256i seg1, __m256i seg2);
int insert_city_long3(Hash * restrict h, int hash, __m256i seg0, __m256i seg1, __m256i seg2, __m256i seg3);
void merge(Results * restrict dst, Results * restrict src);
int sort_result(const void *a, const void *b, void *arg);
unsigned int find_next_row(const void *data, unsigned int offset);
uint32_t find_next_row(const void *data, uint32_t offset);
void print_results(Results *results);
void debug_results(Results *results);
inline __m256i city_from_long_hash(int hashValue);
Expand Down Expand Up @@ -413,14 +412,16 @@ void merge(Results * restrict dst, Results * restrict src) {
void process(int id, Worker * workers, int numWorkers, int fdOut, Results *out) {
TIMER_INIT();

int max_k = 8;
// processes at this point never process chunks, they only fork() child processes and then merge the results
// assign children process to chunks if there are fewer than max_k workers to create
// otherwise children process recurse to here and branch out again as necessary
const int max_k = 8;
const bool doWork = numWorkers <= max_k;
const int k = doWork ? numWorkers : (numWorkers + (max_k - 1)) / max_k;

int fd[k][2];
struct pollfd poll_fds[k];


void *tmp = mmap(NULL, RESULTS_MEMORY_SIZE * k, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, -1, 0);
Results *childResults[k];
for (int i = 0; i < k; i++) {
Expand Down Expand Up @@ -471,7 +472,6 @@ void process(int id, Worker * workers, int numWorkers, int fdOut, Results *out)
new_id += n;
}


int childrenFinished = 0;
while(childrenFinished < k) {
poll(poll_fds, k, -1);
Expand Down Expand Up @@ -527,7 +527,7 @@ void setup_results(Results *r) {

}

void convert_hash_to_results(hash_t * restrict hash, Results * restrict out) {
void convert_hash_to_results(Hash * restrict hash, Results * restrict out) {
out->numCities = hash->counts.numCities;
out->numLongCities = 0;

Expand Down Expand Up @@ -572,7 +572,6 @@ void convert_hash_to_results(hash_t * restrict hash, Results * restrict out) {
}
}


void start_worker(Worker *w, Results *out) {
TIMER_INIT();

Expand Down Expand Up @@ -600,7 +599,7 @@ void start_worker(Worker *w, Results *out) {
void * hashedCitiesLong = hashData;
hashData += HASHED_CITIES_LONG_SIZE;

hash_t hash = {{packedOffsets, hashedCities, hashedStorage, hashedCitiesLong}, {0, 0}};
Hash hash = {{packedOffsets, hashedCities, hashedStorage, hashedCitiesLong}, {0, 0}};

void * const data = mmap(NULL, MMAP_DATA_SIZE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);

Expand All @@ -616,8 +615,8 @@ void start_worker(Worker *w, Results *out) {
bool first = w->first && start == w->start;
bool last = w->last && end == w->end;

unsigned int chunk_size = (unsigned int)(end - start);
unsigned int mapped_file_length = last ? PAGE_CEIL(chunk_size) : chunk_size + PAGE_SIZE;
uint32_t chunk_size = (uint32_t)(end - start);
uint32_t mapped_file_length = last ? PAGE_CEIL(chunk_size) : chunk_size + PAGE_SIZE;

mmap(data + DUMMY_SIZE, mapped_file_length, PROT_READ, MAP_PRIVATE | MAP_FIXED, w->fd, start);

Expand All @@ -633,7 +632,7 @@ void start_worker(Worker *w, Results *out) {
}


unsigned int offsets[STRIDE + 1];
uint32_t offsets[STRIDE + 1];
if (first) {
offsets[0] = DUMMY_SIZE;
}
Expand All @@ -651,10 +650,10 @@ void start_worker(Worker *w, Results *out) {
TIMER_MS_NUM("convert", w->workerId);
}

__attribute__((aligned(4096))) void process_chunk(const void * const restrict base, const unsigned int * offsets, hash_t * restrict hashOut) {
alignas(64) hash_t hash = *hashOut;
alignas(64) long nums[STRIDE];
alignas(32) unsigned int starts[STRIDE];
__attribute__((aligned(4096))) void process_chunk(const void * const restrict base, const uint32_t * offsets, Hash * restrict hashOut) {
alignas(64) Hash hash = *hashOut;
alignas(64) uint64_t nums[STRIDE];
alignas(32) uint32_t starts[STRIDE];
bool checkFinished;

__m256i starts_v = _mm256_loadu_si256((__m256i *)offsets);
Expand Down Expand Up @@ -842,7 +841,7 @@ __attribute__((aligned(4096))) void process_chunk(const void * const restrict ba
mulled = _mm256_srli_epi32(mulled, 22);
__m256i final = _mm256_sign_epi32(mulled, minus_mask);

// scale+offset hashs in the store/load to avoid register intermediates
// scale+offset the hashs in the store/load to avoid register intermediates
// long instead of int to advoid unecessary sign extends
long hash0 = insert_city(&hash, _mm256_extract_epi32(city_hashes, 0), maskedCity0);
__m128i vals0 = _mm_load_si128(hash.p.hashedStorage + 4*hash0 + 16*0);
Expand Down Expand Up @@ -922,10 +921,11 @@ __attribute__((aligned(4096))) void process_chunk(const void * const restrict ba
}

int hash_long(long x, long y) {
long seed = 0x9e3779b97f4a7c15;
long seed = 0x9e3779b97f4a7c15; // ~fxhash
return ((_lrotl(x * seed, 5) ^ y) * seed) & HASH_LONG_MASK;
}
__m256i process_long(const void * const restrict start, hash_t * restrict h, int * restrict semicolonBytesOut) {

__m256i process_long(const void * const restrict start, Hash * restrict h, int * restrict semicolonBytesOut) {
__m256i seg0 = _mm256_loadu_si256(start + 0);
__m256i seg1 = _mm256_loadu_si256(start + 32);
__m256i seg2 = _mm256_loadu_si256(start + 64);
Expand Down Expand Up @@ -994,7 +994,7 @@ __attribute__((always_inline)) inline int hash_city(__m256i str) {
return _mm256_extract_epi32(hash, 0);
}

__attribute__((always_inline)) inline long insert_city(hash_t * restrict h, long hash, const __m256i maskedCity) {
__attribute__((always_inline)) inline long insert_city(Hash * restrict h, long hash, const __m256i maskedCity) {

while (1) {
__m256i stored = _mm256_load_si256(h->p.hashedCities + hash);
Expand All @@ -1021,7 +1021,7 @@ __attribute__((always_inline)) inline long insert_city(hash_t * restrict h, long
}
}

int insert_city_long1(hash_t * restrict hash, int hash_value, __m256i seg0, __m256i seg1) {
int insert_city_long1(Hash * restrict hash, int hash_value, __m256i seg0, __m256i seg1) {
while (1) {
__m256i stored0 = _mm256_loadu_si256(hash->p.hashedCitiesLong + hash_value + 0);
__m256i stored1 = _mm256_loadu_si256(hash->p.hashedCitiesLong + hash_value + 32);
Expand All @@ -1043,7 +1043,7 @@ int insert_city_long1(hash_t * restrict hash, int hash_value, __m256i seg0, __m2
}
}

int insert_city_long2(hash_t * restrict hash, int hash_value, __m256i seg0, __m256i seg1, __m256i seg2) {
int insert_city_long2(Hash * restrict hash, int hash_value, __m256i seg0, __m256i seg1, __m256i seg2) {
while (1) {
__m256i stored0 = _mm256_loadu_si256(hash->p.hashedCitiesLong + hash_value + 0);
__m256i stored1 = _mm256_loadu_si256(hash->p.hashedCitiesLong + hash_value + 32);
Expand All @@ -1068,7 +1068,7 @@ int insert_city_long2(hash_t * restrict hash, int hash_value, __m256i seg0, __m2
}
}

int insert_city_long3(hash_t * restrict hash, int hash_value, __m256i seg0, __m256i seg1, __m256i seg2, __m256i seg3) {
int insert_city_long3(Hash * restrict hash, int hash_value, __m256i seg0, __m256i seg1, __m256i seg2, __m256i seg3) {
while (1) {
__m256i stored0 = _mm256_loadu_si256(hash->p.hashedCitiesLong + hash_value + 0);
__m256i stored1 = _mm256_loadu_si256(hash->p.hashedCitiesLong + hash_value + 32);
Expand Down Expand Up @@ -1151,10 +1151,10 @@ void print_results(Results *results) {
fputs(buffer, stdout);
}

unsigned int find_next_row(const void *data, unsigned int offset) {
uint32_t find_next_row(const void *data, uint32_t offset) {
__m256i newlines = _mm256_set1_epi8('\n');
__m256i chars = _mm256_loadu_si256(data + offset);
unsigned int bytes = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(chars, newlines)));
uint32_t bytes = _tzcnt_u32(_mm256_movemask_epi8(_mm256_cmpeq_epi8(chars, newlines)));
if (likely(bytes < 32)) {
return offset + bytes + 1;
}
Expand Down

0 comments on commit d92c437

Please sign in to comment.