Skip to content

Commit

Permalink
Merge pull request #55 from tezc/use-gettimeofday
Browse files Browse the repository at this point in the history
lower timestamp granularity ~3ms
  • Loading branch information
tezc authored May 16, 2021
2 parents 41a6f52 + ff65952 commit e220fed
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 84 deletions.
4 changes: 2 additions & 2 deletions jresql/src/test/java/resql/TimestampTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void tearDown() {
@Test
public void testTimestampIncrementFor10ms() throws InterruptedException {
ResultSet rs;
for (int i = 0; i < 10; i++) {
for (int i = 0; i < 100; i++) {
client.put("INSERT INTO ts_ms_table DEFAULT VALUES;");
rs = client.execute(false);
assert (rs.linesChanged() == 1);
Expand All @@ -75,6 +75,6 @@ public void testTimestampIncrementFor10ms() throws InterruptedException {

client.put("SELECT DISTINCT(ts_ms) FROM ts_ms_table;");
rs = client.execute(true);
assert (rs.rowCount() == 10);
assert (rs.rowCount() > 1);
}
}
12 changes: 6 additions & 6 deletions lib/sc/sc_time.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ uint64_t sc_time_ms()
return (dateTime.QuadPart / 10000);
#else
int rc;
struct timespec ts;
struct timeval t;

rc = clock_gettime(CLOCK_REALTIME, &ts);
rc = gettimeofday(&t, 0);
assert(rc == 0);
(void) rc;

return ts.tv_sec * 1000 + (uint64_t) (ts.tv_nsec / 10e6);
return t.tv_sec * 1000 + (uint64_t) (t.tv_usec / 1000);
#endif
}

Expand All @@ -80,13 +80,13 @@ uint64_t sc_time_ns()
return (dateTime.QuadPart * 100);
#else
int rc;
struct timespec ts;
struct timeval t;

rc = clock_gettime(CLOCK_REALTIME, &ts);
rc = gettimeofday(&t, 0);
assert(rc == 0);
(void) rc;

return ts.tv_sec * (uint64_t) 1000000000 + ts.tv_nsec;
return t.tv_sec * (uint64_t) 1000000000 + t.tv_usec * 1000;
#endif
}

Expand Down
11 changes: 6 additions & 5 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ static int server_create_entry(struct server *s, bool force, uint64_t seq,
bool ss_sending;
bool ss_running;
int rc;
uint64_t diff;
uint32_t size = sc_buf_size(buf);
void *data = sc_buf_rbuf(buf);

Expand Down Expand Up @@ -557,7 +558,8 @@ static int server_create_entry(struct server *s, bool force, uint64_t seq,
return force ? RS_FULL : RS_REJECT;
}

if (s->timestamp - s->last_ts > 5) {
diff = s->timestamp - s->last_ts;
if (diff > 3) {
s->last_ts = s->timestamp;

sc_buf_clear(&s->tmp);
Expand Down Expand Up @@ -777,7 +779,7 @@ int server_read_meta(struct server *s)
{
bool exist;
int rc;
struct state *state = &s->state;
struct state *st = &s->state;

rc = file_remove_path(s->meta_tmp_path);
if (rc != RS_OK) {
Expand All @@ -800,13 +802,12 @@ int server_read_meta(struct server *s)
return rc;
}

rc = store_init(&s->store, s->conf.node.dir, state->ss_term, state->ss_index);
rc = store_init(&s->store, s->conf.node.dir, st->term, st->index);
if (rc != RS_OK) {
return rc;
}

rc = snapshot_open(&s->ss, s->state.ss_path, s->state.ss_term,
s->state.ss_index);
rc = snapshot_open(&s->ss, s->state.ss_path, st->term, st->index);
if (rc != RS_OK) {
return rc;
}
Expand Down
2 changes: 0 additions & 2 deletions src/snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,6 @@ static void snapshot_compact(struct snapshot *ss, struct page *p)
}
}

state.ss_index = state.index;
state.ss_term = state.term;
state_close(&state);
file_remove_path(state.ss_path);

Expand Down
36 changes: 2 additions & 34 deletions src/state.c
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
#include <inttypes.h>

#define STATE_FILE "state.resql"
#define STATE_TMP_FILE "state.tmp.resql"
#define STATE_SS_FILE "snapshot.resql"
#define STATE_SS_TMP_FILE "snapshot.tmp.resql"

Expand Down Expand Up @@ -247,7 +246,6 @@ void state_init(struct state *st, struct state_cb cb, const char *path,

st->cb = cb;
st->path = sc_str_create_fmt("%s/%s", path, STATE_FILE);
st->tmp_path = sc_str_create_fmt("%s/%s", path, STATE_TMP_FILE);
st->ss_path = sc_str_create_fmt("%s/%s", path, STATE_SS_FILE);
st->ss_tmp_path = sc_str_create_fmt("%s/%s", path, STATE_SS_TMP_FILE);
st->max_page = UINT_MAX;
Expand All @@ -271,7 +269,6 @@ int state_term(struct state *st)

sc_buf_term(&st->tmp);
sc_str_destroy(&st->path);
sc_str_destroy(&st->tmp_path);
sc_str_destroy(&st->ss_path);
sc_str_destroy(&st->ss_tmp_path);

Expand Down Expand Up @@ -348,8 +345,6 @@ int state_write_vars(struct state *st, struct aux *aux)

sc_buf_put_64(&st->tmp, st->term);
sc_buf_put_64(&st->tmp, st->index);
sc_buf_put_64(&st->tmp, st->ss_term);
sc_buf_put_64(&st->tmp, st->ss_index);
sc_buf_put_64(&st->tmp, st->max_page);
sc_buf_put_64(&st->tmp, st->session_timeout);
meta_encode(&st->meta, &st->tmp);
Expand Down Expand Up @@ -389,8 +384,6 @@ int state_read_vars(struct state *st, struct aux *aux)

st->term = sc_buf_get_64(&st->tmp);
st->index = sc_buf_get_64(&st->tmp);
st->ss_term = sc_buf_get_64(&st->tmp);
st->ss_index = sc_buf_get_64(&st->tmp);
st->max_page = sc_buf_get_64(&st->tmp);
st->session_timeout = sc_buf_get_64(&st->tmp);
meta_decode(&st->meta, &st->tmp);
Expand Down Expand Up @@ -459,22 +452,12 @@ int state_read_snapshot(struct state *st, bool in_memory)
goto cleanup_aux;
}
} else {
b = file_exists_at(st->path);
sc_log_info("Found state file : %s \n", b ? "yes" : " no");

if (!b) {
rc = file_copy(st->path, st->ss_path);
if (rc != RS_OK) {
return rc;
}
}

rc = file_rename(st->tmp_path, st->path);
rc = file_copy(st->path, st->ss_path);
if (rc != RS_OK) {
return rc;
}

rc = aux_init(&st->aux, st->tmp_path, 0);
rc = aux_init(&st->aux, st->path, 0);
if (rc != RS_OK) {
return rc;
}
Expand All @@ -499,9 +482,6 @@ int state_read_for_snapshot(struct state *st)
bool b;
int rc;

st->snapshot = true;
st->in_memory = false;

b = file_exists_at(st->ss_path);
if (!b) {
rs_abort("Cannot find snapshot file at %s \n", st->ss_path);
Expand Down Expand Up @@ -531,11 +511,6 @@ int state_open(struct state *st, bool in_memory)
{
int rc;

st->in_memory = in_memory;
st->snapshot = false;

file_remove_path(st->tmp_path);

rc = file_remove_path(st->ss_tmp_path);
if (rc != RS_OK) {
return rc;
Expand Down Expand Up @@ -614,13 +589,6 @@ int state_close(struct state *st)
if (rc != RS_OK) {
ret = rc;
}

if (!st->snapshot && !st->in_memory) {
rc = file_rename(st->path, st->tmp_path);
if (rc != RS_OK) {
ret = rc;
}
}
}

return ret;
Expand Down
6 changes: 1 addition & 5 deletions src/state.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,8 @@ struct state_cb {

struct state {
struct state_cb cb;
bool snapshot;
bool in_memory;

char *path;
char *tmp_path;
char *ss_path;
char *ss_tmp_path;
char *last_err;
Expand All @@ -71,8 +69,6 @@ struct state {
struct meta meta;
uint64_t term;
uint64_t index;
uint64_t ss_term;
uint64_t ss_index;

// time
uint64_t timestamp;
Expand Down
2 changes: 1 addition & 1 deletion src/store.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ static int store_read(struct store *s)
page_clear(s->pages[0], s->ss_index);
}

if (page_prev_index(s->pages[1]) != last) {
if (page_prev_index(s->pages[1]) != page_last_index(s->pages[0])) {
page_clear(s->pages[1], s->ss_index);
}

Expand Down
9 changes: 5 additions & 4 deletions test/snapshot_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -453,13 +453,14 @@ static void snapshot_two_disk()

int main(void)
{
test_execute(snapshot_two);
test_execute(snapshot_simple);
test_execute(snapshot_big);

test_execute(snapshot_two_disk);

test_execute(snapshot_simple_disk);
test_execute(snapshot_big_disk);
test_execute(snapshot_simple);
test_execute(snapshot_big);
test_execute(snapshot_two);
test_execute(snapshot_two_disk);

return 0;
}
31 changes: 6 additions & 25 deletions test/test_util.c
Original file line number Diff line number Diff line change
Expand Up @@ -422,28 +422,19 @@ resql *test_client_create()

int rc, try = 0;
bool found;
const char *url = urls[0];
resql *c;

retry:
for (int i = 0; i < 9; i++) {
if (cluster[i] != NULL) {
url = urls[i];
break;
}
}

struct resql_config conf = {
.urls = url,
.urls = node8,
.timeout_millis = 60000,
};

retry:
rc = resql_create(&c, &conf);
if (rc != RESQL_OK) {
try++;
if (try >= 10) {
printf("Failed rs : %d \n", rc);
abort();
rs_abort("%s \n", resql_errstr(c));
}

resql_shutdown(c);
Expand Down Expand Up @@ -471,28 +462,18 @@ resql *test_client_create_timeout(uint32_t timeout)

int rc, try = 0;
bool found;
const char *url = urls[0];
resql *c;

retry:
for (int i = 0; i < 9; i++) {
if (cluster[i] != NULL) {
url = urls[i];
break;
}
}

struct resql_config conf = {
.urls = url,
.urls = node8,
.timeout_millis = timeout,
};

retry:
rc = resql_create(&c, &conf);
if (rc != RESQL_OK) {
try++;
if (try >= 10) {
printf("Failed rs : %d \n", rc);
abort();
rs_abort("%s \n", resql_errstr(c));
}

resql_shutdown(c);
Expand Down

0 comments on commit e220fed

Please sign in to comment.