Skip to content

Refactor workitems.{h,cc} #230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
206 changes: 206 additions & 0 deletions c_src/workitems.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,141 @@ OpenTask::DoWork()

} // OpenTask::DoWork()

/**
* WriteTask functions
*/

WriteTask::WriteTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref,
DbObjectPtr_t & _db_handle,
leveldb::WriteBatch* _batch,
leveldb::WriteOptions* _options)
: WorkTask(_owner_env, _caller_ref, _db_handle),
batch(_batch),
options(_options)
{}

WriteTask::~WriteTask()
{
delete batch;
delete options;
}

work_result
WriteTask::DoWork()
{
leveldb::Status status = m_DbPtr->m_Db->Write(*options, batch);

return (status.ok() ? work_result(ATOM_OK) : work_result(local_env(), ATOM_ERROR_DB_WRITE, status));
}

/**
* GetTask functions
*/

GetTask::GetTask(ErlNifEnv *_caller_env,
ERL_NIF_TERM _caller_ref,
DbObjectPtr_t & _db_handle,
ERL_NIF_TERM _key_term,
leveldb::ReadOptions &_options)
: WorkTask(_caller_env, _caller_ref, _db_handle),
options(_options)
{
ErlNifBinary key;

enif_inspect_binary(_caller_env, _key_term, &key);
m_Key.assign((const char *)key.data, key.size);
}

GetTask::~GetTask() {}

work_result
GetTask::DoWork()
{
ERL_NIF_TERM value_bin;
BinaryValue value(local_env(), value_bin);
leveldb::Slice key_slice(m_Key);

leveldb::Status status = m_DbPtr->m_Db->Get(options, key_slice, &value);

if(!status.ok()){
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Original code was:

 if(!status.ok())
    return work_result(ATOM_NOT_FOUND);

if ( status.IsNotFound() )
return work_result(ATOM_NOT_FOUND);
else
return work_result(local_env(), ATOM_ERROR, status);
}

return work_result(local_env(), ATOM_OK, value_bin);
}

/**
* IterTask functions
*/

IterTask::IterTask(ErlNifEnv *_caller_env,
ERL_NIF_TERM _caller_ref,
DbObjectPtr_t & _db_handle,
const bool _keys_only,
leveldb::ReadOptions &_options)
: WorkTask(_caller_env, _caller_ref, _db_handle),
keys_only(_keys_only), options(_options)
{}

IterTask::~IterTask() {}

work_result
IterTask::DoWork()
{
ItrObject * itr_ptr=0;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Originally these were not initialized to zero

void * itr_ptr_ptr=0;

// NOTE: transferring ownership of options to ItrObject
itr_ptr_ptr=ItrObject::CreateItrObject(m_DbPtr, keys_only, options);

// Copy caller_ref to reuse in future iterator_move calls
itr_ptr=((ItrObjErlang*)itr_ptr_ptr)->m_ItrPtr;
itr_ptr->itr_ref_env = enif_alloc_env();
itr_ptr->itr_ref = enif_make_copy(itr_ptr->itr_ref_env, caller_ref());

ERL_NIF_TERM result = enif_make_resource(local_env(), itr_ptr_ptr);

// release reference created during CreateItrObject()
enif_release_resource(itr_ptr_ptr);

return work_result(local_env(), ATOM_OK, result);
} // operator()

/**
* MoveTask functions
*/

// Constructor with no seek target:

MoveTask::MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref,
ItrObjectPtr_t & Iter, action_t& _action)
: WorkTask(NULL, _caller_ref, Iter->m_DbPtr),
m_Itr(Iter), action(_action)
{
// special case construction
local_env_=NULL;
enif_self(_caller_env, &local_pid);
}

// Constructor with seek target:

MoveTask::MoveTask(ErlNifEnv *_caller_env, ERL_NIF_TERM _caller_ref,
ItrObjectPtr_t & Iter, action_t& _action,
std::string& _seek_target)
: WorkTask(NULL, _caller_ref, Iter->m_DbPtr),
m_Itr(Iter), action(_action),
seek_target(_seek_target)
{
// special case construction
local_env_=NULL;
enif_self(_caller_env, &local_pid);
}

MoveTask::~MoveTask() {};

work_result
MoveTask::DoWork()
{
Expand Down Expand Up @@ -359,6 +488,83 @@ MoveTask::recycle()

} // MoveTask::recycle

/**
* CloseTask functions
*/

CloseTask::CloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref,
DbObjectPtr_t & _db_handle)
: WorkTask(_owner_env, _caller_ref, _db_handle)
{}

CloseTask::~CloseTask()
{
}

work_result
CloseTask::DoWork()
{
DbObject * db_ptr;

// get db pointer then clear reference count to it
db_ptr=m_DbPtr.get();
m_DbPtr.assign(NULL);

if (NULL!=db_ptr)
{
// set closing flag, this is blocking
db_ptr->InitiateCloseRequest();

// db_ptr no longer valid
db_ptr=NULL;

return(work_result(ATOM_OK));
} // if
else
{
return work_result(local_env(), ATOM_ERROR, ATOM_BADARG);
} // else
}

/**
* ItrCloseTask functions
*/

ItrCloseTask::ItrCloseTask(ErlNifEnv* _owner_env, ERL_NIF_TERM _caller_ref,
ItrObjectPtr_t & _itr_handle)
: WorkTask(_owner_env, _caller_ref),
m_ItrPtr(_itr_handle)
{}

ItrCloseTask::~ItrCloseTask()
{
}

work_result
ItrCloseTask::DoWork()
{
ItrObject * itr_ptr;

// get iterator pointer then clear reference count to it
itr_ptr=m_ItrPtr.get();
m_ItrPtr.assign(NULL);

if (NULL!=itr_ptr)
{
// set closing flag, this is blocking
itr_ptr->InitiateCloseRequest();

// itr_ptr no longer valid
itr_ptr=NULL;

return(work_result(ATOM_OK));
} // if
else
{
return work_result(local_env(), ATOM_ERROR, ATOM_BADARG);
} // else
}

/**
* DestroyTask functions
*/
Expand Down
Loading