Skip to content

Commit

Permalink
Merge pull request #60 from domob1812/async-rpc
Browse files Browse the repository at this point in the history
Port over async RPC calls from Huntercoin, implement "waitforblock" RPC call.
  • Loading branch information
phelixbtc committed May 15, 2014
2 parents 426a6d8 + 70160aa commit 1bb337b
Show file tree
Hide file tree
Showing 5 changed files with 241 additions and 32 deletions.
251 changes: 221 additions & 30 deletions src/bitcoinrpc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@
#include <boost/iostreams/stream.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/assign/list_of.hpp>
#include <boost/thread/thread.hpp>

#include <memory>
#include <list>

#ifdef USE_SSL
#include <boost/asio/ssl.hpp>
Expand Down Expand Up @@ -45,8 +49,6 @@ using namespace boost::asio;
using namespace json_spirit;

void ThreadRPCServer2(void* parg);
typedef Value(*rpcfn_type)(const Array& params, bool fHelp);
extern map<string, rpcfn_type> mapCallTable;
Value sendtoaddress(const Array& params, bool fHelp);

int64 nWalletUnlockTime;
Expand Down Expand Up @@ -3191,6 +3193,43 @@ Value getrawmempool(const Array& params, bool fHelp)
return a;
}

/* Block until a new block is found and return only then. */
static Value
waitforblock (const Array& params, bool fHelp)
{
if (fHelp || params.size () > 1)
throw runtime_error (
"waitforblock [blockHash]\n"
"Wait for a change in the best chain (a new block being found)"
" and return the new block's hash when it arrives. If blockHash"
" is given, wait until a block with different hash is found.\n");

if (IsInitialBlockDownload ())
throw JSONRPCError (RPC_CLIENT_IN_INITIAL_DOWNLOAD,
"huntercoin is downloading blocks...");

uint256 lastHash;
if (params.size () > 0)
lastHash = ParseHashV (params[0], "blockHash");
else
lastHash = hashBestChain;

boost::unique_lock<boost::mutex> lock(mut_newBlock);
while (true)
{
/* Atomically check whether we have found a new best block and return
it if that's the case. We use a lock on cs_main in order to
prevent race conditions. */
CRITICAL_BLOCK(cs_main)
{
if (lastHash != hashBestChain)
return hashBestChain.GetHex ();
}

/* Wait on the condition variable. */
cv_newBlock.wait (lock);
}
}


//
Expand Down Expand Up @@ -3263,6 +3302,7 @@ pair<string, rpcfn_type> pCallTable[] =
make_pair("signrawtransaction", &signrawtransaction),
make_pair("sendrawtransaction", &sendrawtransaction),
make_pair("getrawmempool", &getrawmempool),
make_pair("waitforblock", &waitforblock),
};
map<string, rpcfn_type> mapCallTable(pCallTable, pCallTable + sizeof(pCallTable)/sizeof(pCallTable[0]));

Expand Down Expand Up @@ -3298,6 +3338,14 @@ string pAllowInSafeMode[] =
};
set<string> setAllowInSafeMode(pAllowInSafeMode, pAllowInSafeMode + sizeof(pAllowInSafeMode)/sizeof(pAllowInSafeMode[0]));

/* Methods that will be called in a new thread and can block waiting for
some condition without hurting the RPC server performance. */
string pCallAsync[] =
{
"waitforblock",
};
set<string> setCallAsync(pCallAsync, pCallAsync + sizeof(pCallAsync)/sizeof(pCallAsync[0]));




Expand Down Expand Up @@ -3598,6 +3646,107 @@ class SSLIOStreamDevice : public iostreams::device<iostreams::bidirectional> {
};
#endif

/**
* Class encapsulating the state necessary for writing to a client connection.
* This is used to hold the state for async method calls.
*/
class ClientConnectionOutput
{

private:

/* The stream for outputting. */
#ifdef USE_SSL
SSLStream* sslStream;
SSLIOStreamDevice* d;
iostreams::stream<SSLIOStreamDevice>* stream;
#else
ip::tcp::iostream* stream;
#endif

public:

/* Basic constructor. */
#ifdef USE_SSL
inline ClientConnectionOutput (asio::io_service& io, ssl::context& c,
bool fUseSSL)
: sslStream(new SSLStream (io, c)),
d(new SSLIOStreamDevice (*sslStream, fUseSSL)),
stream(new iostreams::stream<SSLIOStreamDevice> (*d))
{}
#else
inline ClientConnectionOutput ()
: stream(new ip::tcp::iostream ())
{}
#endif

/* Destructor freeing everything. */
inline ~ClientConnectionOutput ()
{
delete stream;
#ifdef USE_SSL
delete d;
delete sslStream;
#endif
}

/* Wait for incoming connection. */
inline void
waitForConnection (ip::tcp::acceptor& acc, ip::tcp::endpoint& peer)
{
#ifdef USE_SSL
acc.accept(sslStream->lowest_layer(), peer);
#else
acc.accept(*stream->rdbuf(), peer);
#endif
}

/* Return the stream held. */
#ifdef USE_SSL
inline iostreams::stream<SSLIOStreamDevice>&
#else
inline ip::tcp::iostream&
#endif
getStream ()
{
return *stream;
}

};

/* Execute an RPC call, can be used as thread object for async calls. */
static void
ExecuteRpcCall (ClientConnectionOutput* out, rpcfn_type method,
json_spirit::Array params, json_spirit::Value id)
{
try
{
// Execute
Value result = method (params, false);

// Send reply
string strReply = JSONRPCReply (result, json_spirit::Value::null, id);
out->getStream () << HTTPReply (200, strReply) << std::flush;
}
catch (Object& objError)
{
ErrorReply (out->getStream (), objError, id);
}
catch (const boost::thread_interrupted& e)
{
ErrorReply (out->getStream (),
JSONRPCError (RPC_ASYNC_INTERRUPT,
"async method interrupted"), id);
}
catch (const std::exception& e)
{
ErrorReply (out->getStream (),
JSONRPCError (RPC_MISC_ERROR, e.what ()), id);
}

delete out;
}

void ThreadRPCServer(void* parg)
{
IMPLEMENT_RANDOMIZE_STACK(ThreadRPCServer(parg));
Expand Down Expand Up @@ -3669,41 +3818,83 @@ void ThreadRPCServer2(void* parg)
throw runtime_error("-rpcssl=1, but namecoin compiled without full openssl libraries.");
#endif

// Threads running async methods at the moment.
typedef std::list<boost::thread*> ThreadList;
ThreadList asyncThreads;

loop
{
/* Shut down. Do this here before we block again in the accept call
below, when the last command was "stop", it can exit now. */
if (fShutdown)
{
printf("Waiting for %d async RPC call threads to finish...\n",
asyncThreads.size());

for (ThreadList::iterator i = asyncThreads.begin();
i != asyncThreads.end(); ++i)
{
(*i)->interrupt();
}

for (ThreadList::iterator i = asyncThreads.begin();
i != asyncThreads.end(); ++i)
{
(*i)->join();
delete *i;
}

return;
}

// Clean up async threads.
printf("Trying to clean up %d async RPC call threads...\n",
asyncThreads.size());
for (ThreadList::iterator i = asyncThreads.begin();
i != asyncThreads.end(); )
{
if ((*i)->timed_join(boost::posix_time::seconds(0)))
{
printf("Async RPC call thread finished.\n");
delete *i;
i = asyncThreads.erase (i);
}
else
{
/* Explicitly increment iterator here in case we didn't
delete the element above. */
++i;
}
}

// Accept connection
std::auto_ptr<ClientConnectionOutput> out;
#ifdef USE_SSL
SSLStream sslStream(io_service, context);
SSLIOStreamDevice d(sslStream, fUseSSL);
iostreams::stream<SSLIOStreamDevice> stream(d);
out.reset(new ClientConnectionOutput(io_service, context, fUseSSL));
#else
ip::tcp::iostream stream;
out.reset(new ClientConnectionOutput());
#endif

ip::tcp::endpoint peer;
vnThreadsRunning[4]--;
#ifdef USE_SSL
acceptor.accept(sslStream.lowest_layer(), peer);
#else
acceptor.accept(*stream.rdbuf(), peer);
#endif
out->waitForConnection (acceptor, peer);
vnThreadsRunning[4]++;
if (fShutdown)
return;

// Restrict callers by IP
if (!ClientAllowed(peer.address().to_string()))
{
// Only send a 403 if we're not using SSL to prevent a DoS during the SSL handshake.
if (!fUseSSL)
stream << HTTPReply(403, "") << std::flush;
out->getStream() << HTTPReply(403, "") << std::flush;
continue;
}

map<string, string> mapHeaders;
string strRequest;

boost::thread api_caller(ReadHTTP, boost::ref(stream), boost::ref(mapHeaders), boost::ref(strRequest));
boost::thread api_caller(ReadHTTP, boost::ref(out->getStream()),
boost::ref(mapHeaders),
boost::ref(strRequest));
if (!api_caller.timed_join(boost::posix_time::seconds(GetArg("-rpctimeout", 30))))
{ // Timed out:
acceptor.cancel();
Expand All @@ -3714,7 +3905,7 @@ void ThreadRPCServer2(void* parg)
// Check authorization
if (mapHeaders.count("authorization") == 0)
{
stream << HTTPReply(401, "") << std::flush;
out->getStream() << HTTPReply(401, "") << std::flush;
continue;
}
if (!HTTPAuthorized(mapHeaders))
Expand All @@ -3723,7 +3914,7 @@ void ThreadRPCServer2(void* parg)
if (mapArgs["-rpcpassword"].size() < 15)
Sleep(50);

stream << HTTPReply(401, "") << std::flush;
out->getStream() << HTTPReply(401, "") << std::flush;
printf("ThreadRPCServer incorrect password attempt\n");
continue;
}
Expand Down Expand Up @@ -3770,27 +3961,27 @@ void ThreadRPCServer2(void* parg)
if (strWarning != "" && !GetBoolArg("-disablesafemode") && !setAllowInSafeMode.count(strMethod))
throw JSONRPCError(RPC_FORBIDDEN_BY_SAFE_MODE, string("Safe mode: ") + strWarning);

try
{
// Execute
Value result = (*(*mi).second)(params, false);

// Send reply
string strReply = JSONRPCReply(result, Value::null, id);
stream << HTTPReply(200, strReply) << std::flush;
}
catch (std::exception& e)
// Check for asynchronous execution and call the method.
const bool async = (setCallAsync.count(strMethod) > 0);
if (!async)
ExecuteRpcCall(out.release(), (*mi).second, params, id);
else
{
ErrorReply(stream, JSONRPCError(RPC_MISC_ERROR, e.what()), id);
std::auto_ptr<boost::thread> runner;
runner.reset (new boost::thread (&ExecuteRpcCall,
out.release(),
(*mi).second, params, id));
asyncThreads.push_back (runner.release());
}
}
catch (Object& objError)
{
ErrorReply(stream, objError, id);
ErrorReply(out->getStream(), objError, id);
}
catch (std::exception& e)
{
ErrorReply(stream, JSONRPCError(RPC_PARSE_ERROR, e.what()), id);
ErrorReply(out->getStream(),
JSONRPCError(RPC_PARSE_ERROR, e.what()), id);
}
}
}
Expand Down
7 changes: 7 additions & 0 deletions src/bitcoinrpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ void RPCTypeCheck(const json_spirit::Object& o,
extern std::string HelpRequiringPassphrase();
extern void EnsureWalletIsUnlocked();

typedef json_spirit::Value(*rpcfn_type)(const json_spirit::Array& params, bool fHelp);
extern std::map<std::string, rpcfn_type> mapCallTable;
extern std::set<std::string> setCallAsync;

// Bitcoin RPC error codes
enum RPCErrorCode
{
Expand Down Expand Up @@ -65,6 +69,9 @@ enum RPCErrorCode
RPC_WALLET_WRONG_ENC_STATE = -15, // Command given in wrong wallet encryption state (encrypting an encrypted wallet etc.)
RPC_WALLET_ENCRYPTION_FAILED = -16, // Failed to encrypt the wallet
RPC_WALLET_ALREADY_UNLOCKED = -17, // Wallet is already unlocked

// Async method call interrupted.
RPC_ASYNC_INTERRUPT = -100,
};

#endif
7 changes: 7 additions & 0 deletions src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ set<CWallet*> setpwalletRegistered;

CCriticalSection cs_main;

boost::mutex mut_newBlock;
boost::condition_variable cv_newBlock;

map<uint256, CTransaction> mapTransactions;
CCriticalSection cs_mapTransactions;
unsigned int nTransactionsUpdated = 0;
Expand Down Expand Up @@ -1382,6 +1385,10 @@ bool CBlock::SetBestChain(CTxDB& txdb, CBlockIndex* pindexNew)
nTransactionsUpdated++;
printf("SetBestChain: new best=%s height=%d work=%s\n", hashBestChain.ToString().substr(0,20).c_str(), nBestHeight, bnBestChainWork.ToString().c_str());

/* When everything is done, notify threads waiting for a change in the
currently best chain. */
cv_newBlock.notify_all ();

return true;
}

Expand Down
Loading

0 comments on commit 1bb337b

Please sign in to comment.