From 36be331de21f77342773305983da7f21b4e1cd97 Mon Sep 17 00:00:00 2001 From: Jan Radon Date: Fri, 14 Jul 2017 14:36:38 +0200 Subject: [PATCH] #43: imaptest - code cleanup, removed unused variables - write_operations and write_buffer now in rados_mail_object. - bugfix: mail_buffer size now correctly calculated (get_used_size, instead buffer_size) - added additional buffer (checks / free) statements to make sure read and write buffer are empty in every case. --- src/librmb/rados-mail-object.cpp | 3 +- src/librmb/rados-mail-object.h | 18 ++-- src/storage-rbox/rbox-copy.cpp | 4 +- src/storage-rbox/rbox-mail.cpp | 12 ++- src/storage-rbox/rbox-save.cpp | 177 +++++++++++++++---------------- src/storage-rbox/rbox-save.h | 4 +- 6 files changed, 109 insertions(+), 109 deletions(-) diff --git a/src/librmb/rados-mail-object.cpp b/src/librmb/rados-mail-object.cpp index 73d3c341..789e9fe8 100644 --- a/src/librmb/rados-mail-object.cpp +++ b/src/librmb/rados-mail-object.cpp @@ -30,8 +30,7 @@ RadosMailObject::RadosMailObject() { this->save_date = 0; this->received_date = 0; memset(this->guid, 0, GUID_128_SIZE); - completion_private = librados::Rados::aio_create_completion(); - // std::make_shared(*librados::Rados::aio_create_completion()); this->object_size = -1; this->active_op = false; + this->mail_buffer = NULL; } diff --git a/src/librmb/rados-mail-object.h b/src/librmb/rados-mail-object.h index cec71fdc..8a82833d 100644 --- a/src/librmb/rados-mail-object.h +++ b/src/librmb/rados-mail-object.h @@ -7,7 +7,7 @@ #include #include #include - +#include #define GUID_128_SIZE 16 namespace librmb { @@ -16,8 +16,7 @@ class RadosMailObject { public: RadosMailObject(); virtual ~RadosMailObject() { - completion_private->release(); - write_op.remove(); + } void set_oid(const char* oid) { this->oid = oid; } @@ -46,11 +45,14 @@ class RadosMailObject { const uint64_t get_object_size() { return this->object_size; } void set_object_size(uint64_t& size) { this->object_size = size; } - librados::ObjectWriteOperation& get_write_op() { return this->write_op; } - librados::AioCompletion* get_completion_private() { return this->completion_private; } bool has_active_op() { return active_op; } void set_active_op(bool active) { this->active_op = active; } + std::map* get_completion_op_map() { + return &completion_op; + } + void set_mail_buffer(char* mail_buffer) { this->mail_buffer = mail_buffer; } + char* get_mail_buffer() { return this->mail_buffer; } private: std::string oid; @@ -65,11 +67,11 @@ class RadosMailObject { uint8_t guid[GUID_128_SIZE]; uint64_t object_size; // byte + std::map completion_op; - librados::ObjectWriteOperation write_op; - - librados::AioCompletion* completion_private; bool active_op; + // used as pointer to a buffer_t (to avoid using dovecot datatypes in library) + char* mail_buffer; public: // X_ATTRIBUTES diff --git a/src/storage-rbox/rbox-copy.cpp b/src/storage-rbox/rbox-copy.cpp index 0143e1d8..e69292be 100644 --- a/src/storage-rbox/rbox-copy.cpp +++ b/src/storage-rbox/rbox-copy.cpp @@ -91,8 +91,8 @@ static int rbox_mail_storage_try_copy(struct mail_save_context **_ctx, struct ma librados::IoCtx dest_io_ctx = r_storage->s->get_io_ctx(); librados::IoCtx src_io_ctx; - char *ns_src_mail = mail->box->list->ns->owner->username; - char *ns_dest_mail = ctx->dest_mail->box->list->ns->owner->username; + const char *ns_src_mail = mail->box->list->ns->owner->username; + const char *ns_dest_mail = ctx->dest_mail->box->list->ns->owner->username; i_debug("rbox_mail_storage_try_copy: mail = %p", mail); debug_print_mail_save_context(*_ctx, "rbox_mail_storage_try_copy", NULL); diff --git a/src/storage-rbox/rbox-mail.cpp b/src/storage-rbox/rbox-mail.cpp index ff3a825c..1231fc61 100644 --- a/src/storage-rbox/rbox-mail.cpp +++ b/src/storage-rbox/rbox-mail.cpp @@ -40,8 +40,9 @@ int rbox_get_index_record(struct mail *_mail) { struct rbox_mail *rmail = (struct rbox_mail *)_mail; struct rbox_mailbox *rbox = (struct rbox_mailbox *)_mail->transaction->box; - i_debug("last_seq %lu, mail_seq %lu, ext_id = %lu, uid=%d, old_oid=%s", rmail->last_seq, _mail->seq, rbox->ext_id, - _mail->uid, rmail->mail_object->get_oid().c_str()); + i_debug("last_seq %lu, mail_seq %lu, ext_id = %lu, uid=%lu, old_oid=%s", (unsigned long)rmail->last_seq, + (unsigned long)_mail->seq, (unsigned long)rbox->ext_id, (unsigned long)_mail->uid, + rmail->mail_object->get_oid().c_str()); if (rmail->last_seq != _mail->seq) { const struct obox_mail_index_record *obox_rec; @@ -270,6 +271,10 @@ static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, s return -1; } + if (rmail->mail_buffer != NULL) { + i_free(rmail->mail_buffer); + } + rmail->mail_buffer = p_new(default_pool, char, size_r + 1); if (rmail->mail_buffer == NULL) { FUNC_END_RET("ret == -1; out of memory"); @@ -296,6 +301,8 @@ static int rbox_mail_get_stream(struct mail *_mail, bool get_body ATTR_UNUSED, s } mail_data_bl.copy(0, ret, &rmail->mail_buffer[0]); + i_debug("rbox_mail_get_stream(oid=%s, size_r = %lu, read_from_rados = %d):", + rmail->mail_object->get_oid().c_str(), size_r, ret); offset += ret; } while (ret > 0); @@ -344,6 +351,7 @@ void rbox_index_mail_set_seq(struct mail *_mail, uint32_t seq, bool saving) { i_free(rmail_->mail_buffer); } if (rmail_->mail_object != NULL) { + rmail_->mail_object->get_completion_op_map()->clear(); delete rmail_->mail_object; rmail_->mail_object = NULL; } diff --git a/src/storage-rbox/rbox-save.cpp b/src/storage-rbox/rbox-save.cpp index d0e47ac9..8294d903 100644 --- a/src/storage-rbox/rbox-save.cpp +++ b/src/storage-rbox/rbox-save.cpp @@ -85,11 +85,6 @@ void rbox_add_to_index(struct mail_save_context *_ctx) { guid_128_generate(r_ctx->mail_oid); - // save old ptr. - if (r_ctx->current_object != nullptr) { - r_ctx->objects.push_back(r_ctx->current_object); - } - r_ctx->current_object = new RadosMailObject(); r_ctx->current_object->set_oid(guid_128_to_string(r_ctx->mail_oid)); @@ -125,24 +120,28 @@ int rbox_save_begin(struct mail_save_context *_ctx, struct istream *input) { i_stream_unref(&crlf_input); } - r_ctx->mail_buffer = buffer_create_dynamic(default_pool, 1014); - if (r_ctx->mail_buffer == NULL) { + if (r_ctx->current_object->get_mail_buffer() != NULL) { + buffer_t *buffer = (buffer_t *)r_ctx->current_object->get_mail_buffer(); + // make 100% sure, buffer is empty! + buffer_free(&buffer); + } + r_ctx->current_object->set_mail_buffer((char *)buffer_create_dynamic(default_pool, 1014)); + // r_ctx->mail_buffer = ; + if (r_ctx->current_object->get_mail_buffer() == NULL) { FUNC_END_RET("ret == -1"); return -1; } - if (_ctx->data.output != NULL) { o_stream_unref(&_ctx->data.output); } - _ctx->data.output = o_stream_create_buffer(r_ctx->mail_buffer); - - bufferlist version_bl; - version_bl.append(RadosMailObject::X_ATTR_VERSION_VALUE); - r_ctx->current_object->get_write_op().setxattr(RadosMailObject::X_ATTR_VERSION.c_str(), version_bl); + // _ctx->data.output = o_stream_create_buffer(r_ctx->mail_buffer); + _ctx->data.output = o_stream_create_buffer((buffer_t *)r_ctx->current_object->get_mail_buffer()); debug_print_mail_save_context(_ctx, "rbox-save::rbox_save_begin", NULL); + r_ctx->objects.push_back(r_ctx->current_object); + FUNC_END(); return 0; } @@ -187,36 +186,41 @@ int rbox_save_continue(struct mail_save_context *_ctx) { return 0; } -int rbox_save_mail_write_metadata(struct rbox_save_context *ctx) { +int rbox_save_mail_write_metadata(struct rbox_save_context *ctx, librados::ObjectWriteOperation *write_op_xattr) { FUNC_START(); struct mail_save_data *mdata = &ctx->ctx.data; + { + bufferlist version_bl; + version_bl.append(RadosMailObject::X_ATTR_VERSION_VALUE); + write_op_xattr->setxattr(RadosMailObject::X_ATTR_VERSION.c_str(), version_bl); + } { bufferlist bl; bl.append(guid_128_to_string(ctx->mail_guid)); - ctx->current_object->get_write_op().setxattr(RadosMailObject::X_ATTR_GUID.c_str(), bl); + write_op_xattr->setxattr(RadosMailObject::X_ATTR_GUID.c_str(), bl); } { bufferlist bl; long ts = static_cast(mdata->save_date); bl.append(std::to_string(ts)); - ctx->current_object->get_write_op().setxattr(RadosMailObject::X_ATTR_SAVE_DATE.c_str(), bl); + write_op_xattr->setxattr(RadosMailObject::X_ATTR_SAVE_DATE.c_str(), bl); } if (mdata->pop3_uidl != NULL) { i_assert(strchr(mdata->pop3_uidl, '\n') == NULL); bufferlist bl; bl.append(mdata->pop3_uidl); - ctx->current_object->get_write_op().setxattr(RadosMailObject::X_ATTR_POP3_UIDL.c_str(), bl); + write_op_xattr->setxattr(RadosMailObject::X_ATTR_POP3_UIDL.c_str(), bl); } if (mdata->pop3_order != 0) { bufferlist bl; bl.append(std::to_string(mdata->pop3_order)); - ctx->current_object->get_write_op().setxattr(RadosMailObject::X_ATTR_POP3_ORDER.c_str(), bl); + write_op_xattr->setxattr(RadosMailObject::X_ATTR_POP3_ORDER.c_str(), bl); } - ctx->current_object->get_write_op().mtime(&mdata->received_date); + write_op_xattr->mtime(&mdata->received_date); FUNC_END(); return 0; @@ -229,22 +233,46 @@ void remove_from_rados(librmb::RadosStorage *_storage, const std::string &_oid) i_debug("removed oid=%s", _oid.c_str()); } +bool wait_for_rados_operations(std::vector &object_list) { + bool ctx_failed = false; + // wait for all writes to finish! + // imaptest shows it's possible that begin -> continue -> finish cycle is invoked several times before + // rbox_transaction_save_commit_pre is called. + for (std::vector::iterator it_cur_obj = object_list.begin(); + it_cur_obj != object_list.end(); ++it_cur_obj) { + // if we come from copy mail, there is no operation to wait for. + if ((*it_cur_obj)->has_active_op()) { + for (std::map::iterator map_it = + (*it_cur_obj)->get_completion_op_map()->begin(); + map_it != (*it_cur_obj)->get_completion_op_map()->end(); map_it++) { + map_it->first->wait_for_complete_and_cb(); + ctx_failed = map_it->first->get_return_value() < 0 || ctx_failed ? true : false; + // clean up + map_it->first->release(); + map_it->second->remove(); + delete map_it->second; + } + i_debug("OID %s , SAVED success=%s", (*it_cur_obj)->get_oid().c_str(), + ctx_failed ? "false" : "true"); //, file_size); + (*it_cur_obj)->get_completion_op_map()->clear(); + (*it_cur_obj)->set_active_op(false); + } + } + return ctx_failed; +} void clean_up_failed(struct rbox_save_context *r_ctx) { struct rbox_storage *r_storage = (struct rbox_storage *)&r_ctx->mbox->storage->storage; - int ret = r_ctx->current_object->get_completion_private()->wait_for_safe_and_cb(); + wait_for_rados_operations(r_ctx->objects); - if (ret >= 0) { - if (r_ctx->current_object->get_completion_private()->get_return_value() >= 0) { - r_ctx->current_object->get_write_op().remove(); - remove_from_rados(r_storage->s, r_ctx->current_object->get_oid()); - } + for (std::vector::iterator it_cur_obj = r_ctx->objects.begin(); + it_cur_obj != r_ctx->objects.end(); ++it_cur_obj) { + remove_from_rados(r_storage->s, (*it_cur_obj)->get_oid()); } // clean up index mail_index_expunge(r_ctx->trans, r_ctx->seq); mail_cache_transaction_reset(r_ctx->ctx.transaction->cache_trans); - r_ctx->current_object->get_completion_private()->release(); r_ctx->mail_count--; } @@ -260,21 +288,19 @@ void clean_up_write_finish(struct mail_save_context *_ctx) { index_save_context_free(_ctx); } -int split_buffer_and_exec(const buffer_t *buffer, size_t buffer_length, uint64_t max_size, - struct rbox_save_context *r_ctx) { +int split_buffer_and_exec_op(const buffer_t *buffer, size_t buffer_length, uint64_t max_size, + struct rbox_save_context *r_ctx, librados::ObjectWriteOperation *write_op_xattr) { struct rbox_storage *r_storage = (struct rbox_storage *)&r_ctx->mbox->storage->storage; size_t write_buffer_size = buffer_length; int ret_val = 0; assert(max_size > 0); int rest = write_buffer_size % max_size; - // TODO(jrse) move op_list to rados_object - std::vector op_list; - int div = write_buffer_size / max_size + (rest > 0 ? 1 : 0); for (int i = 0; i < div; i++) { int offset = i * max_size; - librados::ObjectWriteOperation op; + + librados::ObjectWriteOperation *op = i == 0 ? write_op_xattr : new librados::ObjectWriteOperation(); int length = max_size; if (buffer_length < ((i + 1) * length)) { @@ -283,21 +309,22 @@ int split_buffer_and_exec(const buffer_t *buffer, size_t buffer_length, uint64_t const char *buf = (char *)buffer->data + offset; librados::bufferlist tmp_buffer; tmp_buffer.append(buf, length); - op.write(offset, tmp_buffer); + op->write(offset, tmp_buffer); AioCompletion *completion = librados::Rados::aio_create_completion(); + completion->set_complete_callback(r_ctx->current_object, nullptr); - i_debug("creation aio operation %s:", r_ctx->current_object->get_oid().c_str()); - // MAKE SYNC, ASYNC - r_storage->s->get_io_ctx().aio_operate(r_ctx->current_object->get_oid(), completion, &op); - op_list.push_back(completion); - } + (*r_ctx->current_object->get_completion_op_map())[completion] = op; - for (AioCompletion *n : op_list) { - n->wait_for_safe_and_cb(); + i_debug("creation aio operation %s , div=%d, offset=%d, length=%d", r_ctx->current_object->get_oid().c_str(), div, + offset, length); - n->release(); + ret_val = r_storage->s->get_io_ctx().aio_operate(r_ctx->current_object->get_oid(), completion, op); + if (ret_val < 0) { + break; + } } + return ret_val; } @@ -320,30 +347,18 @@ int rbox_save_finish(struct mail_save_context *_ctx) { if (!r_ctx->failed) { if (ret == 0) { if (r_ctx->copying != TRUE) { - ret = r_ctx->current_object->get_completion_private()->set_complete_callback(r_ctx->current_object, nullptr); - rbox_save_mail_write_metadata(r_ctx); + // delete write_op_xattr is called after operation completes (wait_for_rados_operations) + librados::ObjectWriteOperation *write_op_xattr = new librados::ObjectWriteOperation(); + rbox_save_mail_write_metadata(r_ctx, write_op_xattr); - size_t write_buffer_size = buffer_get_size(r_ctx->mail_buffer); + buffer_t *mail_buffer = (buffer_t *)r_ctx->current_object->get_mail_buffer(); + size_t write_buffer_size = buffer_get_used_size(mail_buffer); int max_write_size = r_storage->s->get_max_write_size_bytes(); i_debug("OSD_MAX_WRITE_SIZE=%dmb", (max_write_size / 1024 / 1024)); - if (write_buffer_size > max_write_size) { - i_debug("file to big %lu buffer , max %d ", (unsigned long)write_buffer_size, - r_storage->s->get_max_write_size_bytes()); - - ret = split_buffer_and_exec(r_ctx->mail_buffer, write_buffer_size, max_write_size, r_ctx); - - } else { - librados::bufferlist mail_data_bl; - mail_data_bl.append(str_c(r_ctx->mail_buffer)); - r_ctx->current_object->get_write_op().write_full(mail_data_bl); - } - // MAKE SYNC, ASYNC - ret = r_storage->s->get_io_ctx().aio_operate(r_ctx->current_object->get_oid(), - r_ctx->current_object->get_completion_private(), - &r_ctx->current_object->get_write_op()); - // set current_objects operation to active! + // ObjectWriteOperation write_op_xattr is used for mails with data < max_write_size + ret = split_buffer_and_exec_op(mail_buffer, write_buffer_size, max_write_size, r_ctx, write_op_xattr); r_ctx->current_object->set_active_op(true); i_debug("async operate executed oid: %s , ret=%d", r_ctx->current_object->get_oid().c_str(), ret); } @@ -385,42 +400,19 @@ int rbox_transaction_save_commit_pre(struct mail_save_context *_ctx) { i_assert(r_ctx->finished); - r_ctx->objects.push_back(r_ctx->current_object); + r_ctx->failed = wait_for_rados_operations(r_ctx->objects); - // wait for all writes to finish! - // imaptest shows it's possible that begin -> continue -> finish cycle is invoked several times before - // rbox_transaction_save_commit_pre is called. + // if one write fails! all writes will be reverted and r_ctx->failed is true! + if (r_ctx->failed) { for (std::vector::iterator it = r_ctx->objects.begin(); it != r_ctx->objects.end(); ++it) { r_ctx->current_object = *it; - // if we come from copy mail, there is no operation to wait for. - if (r_ctx->current_object->has_active_op()) { - // note: wait_for_complete_and_cb will also wait if there is no active op. - int ret = r_ctx->current_object->get_completion_private()->wait_for_complete_and_cb(); - if (ret != 0) { - r_ctx->failed = true; - } else if (r_ctx->current_object->get_completion_private()->get_return_value() < 0) { - r_ctx->failed = true; - } - i_debug("OID %s , SAVED success=%s", r_ctx->current_object->get_oid().c_str(), - r_ctx->failed ? "false" : "true"); //, file_size); - - if (r_ctx->failed) { - break; - } - } - } - // if one write fails! all writes will be reverted and r_ctx->failed is true! - if (r_ctx->failed) { - for (std::vector::iterator it = r_ctx->objects.begin(); it != r_ctx->objects.end(); - ++it) { - r_ctx->current_object = *it; - // delete index entry and delete object if it exist - // remove entry from index is not successful in rbox_transaction_commit_post - // clean up will wait for object operation to complete - clean_up_failed(r_ctx); - } + // delete index entry and delete object if it exist + // remove entry from index is not successful in rbox_transaction_commit_post + // clean up will wait for object operation to complete + clean_up_failed(r_ctx); } + } if (rbox_sync_begin(r_ctx->mbox, &r_ctx->sync_ctx, TRUE) < 0) { r_ctx->failed = TRUE; @@ -477,8 +469,9 @@ void rbox_transaction_save_rollback(struct mail_save_context *_ctx) { debug_print_mail_save_context(_ctx, "rbox-save::rbox_transaction_save_rollback", NULL); - buffer_free(&r_ctx->mail_buffer); for (std::vector::iterator it = r_ctx->objects.begin(); it != r_ctx->objects.end(); ++it) { + buffer_t *mail_buffer = (buffer_t *)(*it)->get_mail_buffer(); + buffer_free(&mail_buffer); delete *it; } r_ctx->objects.clear(); diff --git a/src/storage-rbox/rbox-save.h b/src/storage-rbox/rbox-save.h index 14791943..d269fde2 100644 --- a/src/storage-rbox/rbox-save.h +++ b/src/storage-rbox/rbox-save.h @@ -24,8 +24,7 @@ class rbox_save_context { current_object(NULL), failed(1), finished(1), - copying(0), - mail_buffer(NULL) {} + copying(0) {} struct mail_save_context ctx; @@ -46,7 +45,6 @@ class rbox_save_context { librmb::RadosStorage &rados_storage; std::vector objects; librmb::RadosMailObject *current_object; - buffer_t *mail_buffer; unsigned int failed : 1; unsigned int finished : 1;