Skip to content

Commit 37bcb32

Browse files
committed
Merge remote-tracking branch 'origin/develop'
2 parents 647f0ed + f7f3f29 commit 37bcb32

File tree

7 files changed

+198
-66
lines changed

7 files changed

+198
-66
lines changed

include/kllvm/binary/deserializer.h

Lines changed: 141 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -210,68 +210,172 @@ class proof_trace_ringbuffer : public proof_trace_buffer {
210210
shm_ringbuffer *shm_buffer_;
211211
sem_t *data_avail_;
212212
sem_t *space_avail_;
213+
213214
std::deque<uint8_t> peek_data_;
215+
std::array<uint8_t, shm_ringbuffer::buffered_access_sz> buffer_;
216+
size_t buffer_data_size_{0};
217+
size_t buffer_data_start_{0};
218+
214219
bool peek_eof_{false};
220+
bool buffered_eof_{false};
215221
bool read_eof_{false};
216222

217-
bool read(uint8_t *ptr, size_t len = 1) {
218-
for (size_t i = 0; i < len; i++) {
219-
if (!peek_data_.empty()) {
220-
ptr[i] = peek_data_.front();
221-
peek_data_.pop_front();
222-
continue;
223-
}
223+
// Helper function that reads from the shared memory ringbuffer into the
224+
// buffer_. It tries to read shm_ringbuffer::buffered_access_sz bytes of data.
225+
// It assumes that buffer_ is empty.
226+
void read_from_shared_memory() {
227+
assert(buffer_data_size_ == 0);
228+
assert(buffer_data_start_ == 0);
224229

225-
if (peek_eof_) {
226-
read_eof_ = true;
227-
return false;
228-
}
230+
if (buffered_eof_) {
231+
return;
232+
}
229233

230-
if (read_eof_) {
231-
return false;
234+
sem_wait(data_avail_);
235+
236+
if (shm_buffer_->eof()) {
237+
// EOF has been written to the ringbuffer. Check if this is the last chunk
238+
// of data to be read.
239+
size_t remaining_data_sz = shm_buffer_->data_size();
240+
if (remaining_data_sz < shm_ringbuffer::buffered_access_sz) {
241+
// This is the last chunk of data to be read from the ringbuffer.
242+
shm_buffer_->get(buffer_.data(), remaining_data_sz);
243+
sem_post(space_avail_);
244+
245+
buffer_data_size_ = remaining_data_sz;
246+
buffered_eof_ = true;
247+
248+
return;
232249
}
250+
}
251+
252+
// Else, either EOF has not been written or EOF has been written but there
253+
// are remaining full chunks to be read. In any case, we can read a full
254+
// chunk.
255+
shm_buffer_->get(buffer_.data());
256+
sem_post(space_avail_);
257+
258+
buffer_data_size_ = shm_ringbuffer::buffered_access_sz;
259+
}
260+
261+
bool read(uint8_t *ptr, size_t len = 1) {
262+
// Check if we have read EOF already.
263+
if (read_eof_) {
264+
return false;
265+
}
266+
267+
// Consume peeked data.
268+
while (len > 0 && !peek_data_.empty()) {
269+
*ptr = peek_data_.front();
270+
peek_data_.pop_front();
271+
ptr++;
272+
len--;
273+
}
233274

234-
while (int wait_status = sem_trywait(data_avail_)) {
235-
assert(wait_status == -1 && errno == EAGAIN);
236-
if (shm_buffer_->eof()) {
275+
// Consume peeked EOF.
276+
if (len > 0 && peek_eof_) {
277+
read_eof_ = true;
278+
return false;
279+
}
280+
281+
// Peeked data has been fully consumed. If more data is requested, we need
282+
// to read from buffer_ and/or shared memory.
283+
while (len > 0) {
284+
// If buffer_ is empty, try to fetch more data from the shared memory
285+
// ringbuffer.
286+
if (buffer_data_size_ == 0) {
287+
// Check for and conusme buffered EOF.
288+
if (buffered_eof_) {
237289
read_eof_ = true;
238290
return false;
239291
}
292+
293+
assert(buffer_data_start_ == 0);
294+
read_from_shared_memory();
295+
}
296+
297+
// Read available data from the buffer_.
298+
assert(buffer_data_start_ < shm_ringbuffer::buffered_access_sz);
299+
assert(
300+
buffer_data_start_ + buffer_data_size_
301+
<= shm_ringbuffer::buffered_access_sz);
302+
size_t size_to_read_from_buffer = std::min(len, buffer_data_size_);
303+
memcpy(
304+
ptr, buffer_.data() + buffer_data_start_, size_to_read_from_buffer);
305+
ptr += size_to_read_from_buffer;
306+
len -= size_to_read_from_buffer;
307+
308+
buffer_data_start_ += size_to_read_from_buffer;
309+
buffer_data_size_ -= size_to_read_from_buffer;
310+
if (buffer_data_start_ == shm_ringbuffer::buffered_access_sz) {
311+
assert(buffer_data_size_ == 0);
312+
buffer_data_start_ = 0;
240313
}
241-
shm_buffer_->get(&ptr[i]);
242-
sem_post(space_avail_);
243314
}
244315

316+
assert(len == 0);
245317
return true;
246318
}
247319

248320
bool peek(uint8_t *ptr, size_t len = 1) {
249-
for (size_t i = 0; i < len; i++) {
250-
if (i < peek_data_.size()) {
251-
ptr[i] = peek_data_[i];
252-
continue;
253-
}
321+
// Check if we have read EOF already.
322+
if (read_eof_) {
323+
return false;
324+
}
254325

255-
if (peek_eof_) {
256-
return false;
257-
}
326+
// Copy already peeked data.
327+
size_t i = 0;
328+
while (len > 0 && i < peek_data_.size()) {
329+
*ptr = peek_data_[i];
330+
ptr++;
331+
i++;
332+
len--;
333+
}
258334

259-
if (read_eof_) {
260-
return false;
261-
}
335+
// Check for already peeked EOF.
336+
if (len > 0 && peek_eof_) {
337+
return false;
338+
}
262339

263-
while (int wait_status = sem_trywait(data_avail_)) {
264-
assert(wait_status == -1 && errno == EAGAIN);
265-
if (shm_buffer_->eof()) {
340+
// Already peeked data has been fully copied. If more data is requested, we
341+
// need to peek from buffer_ and/or shared memory.
342+
while (len > 0) {
343+
// If buffer_ is empty, try to fetch more data from the shared memory
344+
// ringbuffer.
345+
if (buffer_data_size_ == 0) {
346+
// Check for buffered EOF.
347+
if (buffered_eof_) {
266348
peek_eof_ = true;
267349
return false;
268350
}
351+
352+
assert(buffer_data_start_ == 0);
353+
read_from_shared_memory();
354+
}
355+
356+
// Peek available data from the buffer_.
357+
assert(buffer_data_start_ < shm_ringbuffer::buffered_access_sz);
358+
assert(
359+
buffer_data_start_ + buffer_data_size_
360+
<= shm_ringbuffer::buffered_access_sz);
361+
size_t size_to_peek_from_buffer = std::min(len, buffer_data_size_);
362+
memcpy(
363+
ptr, buffer_.data() + buffer_data_start_, size_to_peek_from_buffer);
364+
peek_data_.insert(
365+
peek_data_.end(), buffer_.begin() + buffer_data_start_,
366+
buffer_.begin() + buffer_data_start_ + size_to_peek_from_buffer);
367+
ptr += size_to_peek_from_buffer;
368+
len -= size_to_peek_from_buffer;
369+
370+
buffer_data_start_ += size_to_peek_from_buffer;
371+
buffer_data_size_ -= size_to_peek_from_buffer;
372+
if (buffer_data_start_ == shm_ringbuffer::buffered_access_sz) {
373+
assert(buffer_data_size_ == 0);
374+
buffer_data_start_ = 0;
269375
}
270-
shm_buffer_->get(&ptr[i]);
271-
sem_post(space_avail_);
272-
peek_data_.push_back(ptr[i]);
273376
}
274377

378+
assert(len == 0);
275379
return true;
276380
}
277381

@@ -280,7 +384,8 @@ class proof_trace_ringbuffer : public proof_trace_buffer {
280384
void *shm_object, sem_t *data_avail, sem_t *space_avail)
281385
: shm_buffer_(static_cast<shm_ringbuffer *>(shm_object))
282386
, data_avail_(data_avail)
283-
, space_avail_(space_avail) {
387+
, space_avail_(space_avail)
388+
, buffer_() {
284389
new (shm_buffer_) shm_ringbuffer;
285390
}
286391

include/kllvm/binary/ringbuffer.h

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,16 @@ class shm_ringbuffer {
1818
// empty, we maintain the invariant that the capacity of the buffer is one byte
1919
// less than its size. This way, the buffer is empty iff read_pos == write_pos,
2020
// and it is full iff read_pos == (write_pos+1)%size.
21-
static constexpr size_t size = 1024;
21+
static constexpr size_t size = 4096;
2222

2323
// Ringbuffer capacity in bytes.
2424
// As commented above, the capacity is always equal to size-1.
2525
static constexpr size_t capacity = size - 1;
2626

27+
// The default size in bytes for put and get operations.
28+
static constexpr size_t buffered_access_sz = 64;
29+
static_assert(buffered_access_sz <= capacity);
30+
2731
private:
2832
bool eof_{false};
2933
size_t read_pos_{0};
@@ -42,23 +46,22 @@ class shm_ringbuffer {
4246
// undefined behavior.
4347
void put_eof();
4448

45-
// Returns true when the ringbuffer is empty and the EOF has been written, and
46-
// false otherwise. As commented above, the ringbuffer is empty iff
47-
// read_pos == write_pos.
49+
// Returns true when eof has been written in the ringbuffer. At that point,
50+
// the ringbuffer may still contain data, but no further writes can happen.
4851
[[nodiscard]] bool eof() const;
4952

5053
// Add data to the ringbuffer. The behavior is undefined if the buffer does not
5154
// have enough remaining space to fit the data or if EOF has been written to the
5255
// ringbuffer. The behavior is also undefined if the data pointer passed to this
5356
// function does not point to a contiguous memory chunk of the corresponding
5457
// size.
55-
void put(uint8_t const *data, size_t count = 1);
58+
void put(uint8_t const *data, size_t count = buffered_access_sz);
5659

5760
// Get and remove data from the ringbuffer. The behavior is undefined if more
5861
// data is requested than it is currently available in the ringbuffer. The
5962
// behavior is also undefined if the data pointer passed to this function does
6063
// not point to a contiguous memory chunk of the corresponding size.
61-
void get(uint8_t *data, size_t count = 1);
64+
void get(uint8_t *data, size_t count = buffered_access_sz);
6265
};
6366

6467
} // namespace kllvm

include/kllvm/binary/serializer.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,14 +188,18 @@ class proof_trace_ringbuffer_writer : public proof_trace_writer {
188188
sem_t *data_avail_;
189189
sem_t *space_avail_;
190190

191+
std::array<uint8_t, shm_ringbuffer::buffered_access_sz> buffer_;
192+
size_t buffer_data_size_{0};
193+
191194
void write(uint8_t const *ptr, size_t len = 1);
192195

193196
public:
194197
proof_trace_ringbuffer_writer(
195198
void *shm_object, sem_t *data_avail, sem_t *space_avail)
196199
: shm_buffer_(reinterpret_cast<shm_ringbuffer *>(shm_object))
197200
, data_avail_(data_avail)
198-
, space_avail_(space_avail) { }
201+
, space_avail_(space_avail)
202+
, buffer_() { }
199203

200204
~proof_trace_ringbuffer_writer() override { shm_buffer_->~shm_ringbuffer(); }
201205

lib/binary/ringbuffer.cpp

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,7 @@ void shm_ringbuffer::put_eof() {
2222
}
2323

2424
bool shm_ringbuffer::eof() const {
25-
// NOTE: for synchronization purposes, it is important that the eof_ field is
26-
// checked first. Typically, the reader process will call this, so we want to
27-
// avoid a race where the writer updates buf.writer_pos after the reader has
28-
// accessed it but before the reader has fully evaluated the condition. If
29-
// eof_ is checked first, and due to short-circuiting, we know that if eof_ is
30-
// true, the writer will not do any further updates to the write_pos_ field,
31-
// and if eof_ is false, the reader will not access write_pos_ at all.
32-
return eof_ && write_pos_ == read_pos_;
25+
return eof_;
3326
}
3427

3528
void shm_ringbuffer::put(uint8_t const *data, size_t count) {

lib/binary/serializer.cpp

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,10 +223,26 @@ void proof_trace_file_writer::write_string(char const *str) {
223223
}
224224

225225
void proof_trace_ringbuffer_writer::write(uint8_t const *ptr, size_t len) {
226-
for (size_t i = 0; i < len; i++) {
227-
sem_wait(space_avail_);
228-
shm_buffer_->put(&ptr[i]);
229-
sem_post(data_avail_);
226+
while (len > 0) {
227+
// Write to the buffer.
228+
assert(buffer_data_size_ < shm_ringbuffer::buffered_access_sz);
229+
size_t size_to_write_to_buffer
230+
= std::min(len, shm_ringbuffer::buffered_access_sz - buffer_data_size_);
231+
memcpy(buffer_.data() + buffer_data_size_, ptr, size_to_write_to_buffer);
232+
233+
ptr += size_to_write_to_buffer;
234+
buffer_data_size_ += size_to_write_to_buffer;
235+
len -= size_to_write_to_buffer;
236+
237+
// If the buffer is full, write its data to the shared memory ringbuffer and
238+
// empty it.
239+
if (buffer_data_size_ == shm_ringbuffer::buffered_access_sz) {
240+
sem_wait(space_avail_);
241+
shm_buffer_->put(buffer_.data());
242+
sem_post(data_avail_);
243+
244+
buffer_data_size_ = 0;
245+
}
230246
}
231247
}
232248

@@ -246,7 +262,13 @@ void proof_trace_ringbuffer_writer::write_string(char const *str) {
246262
}
247263

248264
void proof_trace_ringbuffer_writer::write_eof() {
265+
sem_wait(space_avail_);
266+
// Write any remaining buffer contents to the ringbuffer before writing EOF.
267+
shm_buffer_->put(buffer_.data(), buffer_data_size_);
249268
shm_buffer_->put_eof();
269+
sem_post(data_avail_);
270+
271+
buffer_data_size_ = 0;
250272
}
251273

252274
} // namespace kllvm

tools/kore-proof-trace/main.cpp

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ cl::opt<bool> use_shared_memory(
5050
exit(1); \
5151
} while (0)
5252

53+
static constexpr mode_t perms
54+
= S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH;
55+
5356
// NOLINTNEXTLINE(*-cognitive-complexity)
5457
int main(int argc, char **argv) {
5558
cl::HideUnrelatedOptions({&kore_proof_trace_cat});
@@ -86,8 +89,7 @@ int main(int argc, char **argv) {
8689
shm_unlink(input_filename.c_str());
8790

8891
// Open shared memory object
89-
int fd = shm_open(
90-
input_filename.c_str(), O_CREAT | O_EXCL | O_RDWR, S_IRUSR | S_IWUSR);
92+
int fd = shm_open(input_filename.c_str(), O_CREAT | O_EXCL | O_RDWR, perms);
9193
if (fd == -1) {
9294
ERR_EXIT("shm_open reader");
9395
}
@@ -114,16 +116,16 @@ int main(int argc, char **argv) {
114116
sem_unlink(space_avail_sem_name.c_str());
115117

116118
// Initialize semaphores
117-
// NOLINTNEXTLINE(*-pro-type-vararg)
118-
sem_t *data_avail = sem_open(
119-
data_avail_sem_name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, 0);
119+
sem_t *data_avail
120+
// NOLINTNEXTLINE(*-pro-type-vararg)
121+
= sem_open(data_avail_sem_name.c_str(), O_CREAT | O_EXCL, perms, 0);
120122
if (data_avail == SEM_FAILED) {
121123
ERR_EXIT("sem_init data_avail reader");
122124
}
123125
// NOLINTNEXTLINE(*-pro-type-vararg)
124126
sem_t *space_avail = sem_open(
125-
space_avail_sem_name.c_str(), O_CREAT | O_EXCL, S_IRUSR | S_IWUSR,
126-
shm_ringbuffer::capacity);
127+
space_avail_sem_name.c_str(), O_CREAT | O_EXCL, perms,
128+
shm_ringbuffer::capacity / shm_ringbuffer::buffered_access_sz);
127129
if (space_avail == SEM_FAILED) {
128130
ERR_EXIT("sem_init space_avail reader");
129131
}
@@ -132,7 +134,10 @@ int main(int argc, char **argv) {
132134
auto trace = parser.parse_proof_trace_from_shmem(
133135
shm_object, data_avail, space_avail);
134136

135-
// Close semaphores
137+
// Close the shared memory object and semaphores
138+
if (close(fd) == -1) {
139+
ERR_EXIT("close shm object reader");
140+
}
136141
if (sem_close(data_avail) == -1) {
137142
ERR_EXIT("sem_close data reader");
138143
}

0 commit comments

Comments
 (0)