Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 114 additions & 62 deletions src/main.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,18 @@
#include "getopt.h"
#include "log.h"

#define MAX_MESSAGE_SIZE (2048 * 1024)
#define BUFFER_SIZE (MAX_MESSAGE_SIZE * chain_nums)

typedef struct {
uv_mutex_t lock;
uint8_t buffer[BUFFER_SIZE];
ssize_t length;
} message_buffer_t;

message_buffer_t message_buffer;
uv_async_t message_handler;

std::atomic<uint32_t> found_solutions{0};

typedef std::chrono::high_resolution_clock Time;
Expand Down Expand Up @@ -56,17 +68,25 @@ void on_write_end(uv_write_t *req, int status)
free(req);
}

void print_block(job_t *job, uint8_t *hash)
{
char *hex_string = bytes_to_hex(hash, 32);
LOG("new block: %d -> %d, height: %d, hash: %s\n", job->from_group, job->to_group, job->height, hex_string);
free(hex_string);
}

std::mutex write_mutex;
uint8_t write_buffer[4096 * 1024];
void submit_new_block(mining_worker_t *worker)
{
expire_template_for_new_block(load_worker__template(worker));
mining_template_t *template_ptr = load_worker__template(worker);
print_block(template_ptr->job, (uint8_t *) hasher_hash(worker, true));
expire_template_for_new_block(template_ptr);

const std::lock_guard<std::mutex> lock(write_mutex);

ssize_t buf_size = write_new_block(worker, write_buffer);
uv_buf_t buf = uv_buf_init((char *)write_buffer, buf_size);
print_hex("new solution", (uint8_t *) hasher_buf(worker, true), 32);

uv_write_t *write_req = (uv_write_t *)malloc(sizeof(uv_write_t));
uint32_t buf_count = 1;
Expand Down Expand Up @@ -199,46 +219,10 @@ void log_hashrate(uv_timer_t *timer)
}
}

uint8_t read_buf[2048 * 1024 * chain_nums];
blob_t read_blob = {read_buf, 0};
server_message_t *decode_buf(const uv_buf_t *buf, ssize_t nread)
{
if (read_blob.len == 0)
{
read_blob.blob = (uint8_t *)buf->base;
read_blob.len = nread;
server_message_t *message = decode_server_message(&read_blob);
if (message)
{
// some bytes left
if (read_blob.len > 0)
{
memcpy(read_buf, read_blob.blob, read_blob.len);
read_blob.blob = read_buf;
}
return message;
}
else
{ // no bytes consumed
memcpy(read_buf, buf->base, nread);
read_blob.blob = read_buf;
read_blob.len = nread;
return NULL;
}
}
else
{
assert(read_blob.blob == read_buf);
memcpy(read_buf + read_blob.len, buf->base, nread);
read_blob.len += nread;
return decode_server_message(&read_blob);
}
}

void connect_to_broker();

void try_to_reconnect(uv_timer_t *timer){
read_blob.len = 0;
message_buffer.length = 0;
free(uv_socket);
free(uv_connect);
connect_to_broker();
Expand All @@ -259,36 +243,101 @@ void on_read(uv_stream_t *server, ssize_t nread, const uv_buf_t *buf)
return;
}

server_message_t *message = decode_buf(buf, nread);
if (message)
uv_mutex_lock(&message_buffer.lock);
assert(message_buffer.length + nread <= BUFFER_SIZE);
memcpy(message_buffer.buffer + message_buffer.length, buf->base, nread);
message_buffer.length += nread;
uv_mutex_unlock(&message_buffer.lock);

uv_async_send(&message_handler);
free(buf->base);
}

uint8_t latest_job[MAX_MESSAGE_SIZE];

void process_message(uv_async_t* handle) {
bool found_latest_job = false;
ssize_t latest_job_offset = 0;
ssize_t latest_job_len = 0;
ssize_t offset = 0;

uv_mutex_lock(&message_buffer.lock);
uint8_t *buf = message_buffer.buffer;
ssize_t len = message_buffer.length;

// try to read the latest jobs message from the buffer.
while (len - offset >= 4)
{
switch (message->kind)
ssize_t message_size = decode_size(buf + offset);
ssize_t total_message_size = 4 + message_size;
// ignore the submit result message for simplicity
if (total_message_size == 47)
{
offset += total_message_size;
continue;
}
if (len - offset >= total_message_size)
{
latest_job_offset = offset + 4;
latest_job_len = message_size;
offset += total_message_size;
found_latest_job = true;
} else
{
case JOBS:
for (int i = 0; i < message->jobs->len; i++)
{
update_templates(message->jobs->jobs[i]);
}
start_mining_if_needed();
break;
}
}

case SUBMIT_RESULT:
char *block_hash_hex = bytes_to_hex(message->submit_result->block_hash, 32);
LOG(
"submitted: %d -> %d, %s: %d \n",
message->submit_result->from_group,
message->submit_result->to_group,
block_hash_hex,
message->submit_result->status
);
free(block_hash_hex);
break;
if (found_latest_job)
{
memcpy(latest_job, buf + latest_job_offset, latest_job_len);
}

if (offset > 0)
{
if (offset == len)
{
message_buffer.length = 0;
}
else
{
ssize_t remain = len - offset;
memmove(message_buffer.buffer, buf + offset, remain);
message_buffer.length = remain;
}
free_server_message_except_jobs(message);
}
uv_mutex_unlock(&message_buffer.lock);

free(buf->base);
// uv_close((uv_handle_t *) server, free_close_cb);
if (found_latest_job)
{
server_message_t *message = decode_server_message(latest_job, latest_job_len);
if (message)
{
switch (message->kind)
{
case JOBS:
for (int i = 0; i < message->jobs->len; i++)
{
update_templates(message->jobs->jobs[i]);
}
start_mining_if_needed();
break;

case SUBMIT_RESULT:
char *block_hash_hex = bytes_to_hex(message->submit_result->block_hash, 32);
LOG(
"submitted: %d -> %d, %s: %d \n",
message->submit_result->from_group,
message->submit_result->to_group,
block_hash_hex,
message->submit_result->status
);
free(block_hash_hex);
break;
}
free_server_message_except_jobs(message);
}
}
}

void on_connect(uv_connect_t *req, int status)
Expand Down Expand Up @@ -426,6 +475,9 @@ int main(int argc, char **argv)
setup_gpu_worker_count(gpu_count, gpu_count * parallel_mining_works_per_gpu);

loop = uv_default_loop();
uv_mutex_init(&message_buffer.lock);
message_buffer.length = 0;
uv_async_init(loop, &message_handler, process_message);
uv_timer_init(loop, &reconnect_timer);
connect_to_broker();

Expand Down
26 changes: 1 addition & 25 deletions src/messages.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,24 +252,9 @@ void extract_submit_result(uint8_t **bytes, submit_result_t *result)
result->status = extract_bool(bytes);
}

server_message_t *decode_server_message(blob_t *blob)
server_message_t *decode_server_message(uint8_t *bytes, ssize_t len)
{
uint8_t *bytes = blob->blob;
ssize_t len = blob->len;

if (len <= 4) {
return NULL; // not enough bytes for decoding
}

uint8_t *pos = bytes;
ssize_t message_size = extract_size(&pos);
assert(pos == bytes + 4);

ssize_t message_byte_size = message_size + 4;
if (len < message_byte_size) {
return NULL; // not enough bytes for decoding
}

uint8_t version = extract_byte(&pos);
if (version != mining_protocol_version) {
LOG("Invalid protocol version %d, expect %d\n", version, mining_protocol_version);
Expand Down Expand Up @@ -297,15 +282,6 @@ server_message_t *decode_server_message(blob_t *blob)
LOGERR("Invalid server message kind\n");
exit(1);
}

assert(pos == (bytes + message_byte_size));
if (message_byte_size < len) {
blob->len = len - message_byte_size;
memmove(blob->blob, pos, blob->len);
} else {
blob->len = 0;
}

return server_message;
}

Expand Down
Loading