diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e69de29 diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..9d18d4c --- /dev/null +++ b/LICENSE @@ -0,0 +1,202 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2010 Google Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/README b/README new file mode 100644 index 0000000..ed88fc2 --- /dev/null +++ b/README @@ -0,0 +1,118 @@ +# Description # + +oldisimulator is a framework to support benchmarks that emulate Online Data- +Intensive (OLDI) workloads, such as web search and social networking. + +# Prerequisites # + +To build oldisim you need to have SCons compiler installed in your system. + +oldisim requires a C++11 compatible compiler, e.g., g++ v.4.7.3 or later +versions. It also depends on several other external libraries. Below are the +commands to install the required packages: + +$ sudo apt-get install build-essential gengetopt libgoogle-perftools-dev +libunwind7-dev libevent-dev scons libboost-all-dev + +Note that Boost version 1.5.3 or greater is required. + +# Build oldisim # + +To build oldisimulator, run `scons` in the root directory of the project. + +If you need to create static libraries, put the following in a new file named +custom.py in the project root: + +RELEASE=1 +STATICLINK=1 +TCMALLOC=1 +CXX='$PATH_TO_g++$' +LD='$PATH_TO_LD' +AR='$PATH_TO_AR' +NM='$PATH_TO_NM' +CPPPATH=['/usr/include/', ''] +LIBPATH='/usr/lib/' + +Note that you don’t need to build the boost library, as the dependency on lock +free queues does not require a built libboost. + +To speedup compilation, scons supports parallel compilation, e.g. `scons +-j12` to compile with 12 threads in parallel. There are two build modes, +**release** and **debug**. The default build mode is **release**. The build +mode is specified via the **mode** flag, e.g. `scons mode=release`. +The output of the builds will be put into *BUILD_MODE*/ + +There are several output directories in the build, corresponding to the +different parts of oldisimulator. + ++ *BUILD_MODE*/oldisim contains the oldisim framework libraries ++ *BUILD_MODE*/workloads contains the binaries of the workloads built + +# Run oldisim: search on the cluster # + +This benchmark emulates the fanout and request time distribution for web search. +It models an example tree-based search topology. A user query is first processed +by a front-end server, and eventually fanned out to a set of leaf nodes. + +The search benchmark consists of four modules - RootNode, LeafNode, DriverNode, +and LoadBalancer. Note that LoadBalancer is only needed when there exist more +than one root. + +## Prepare the cluster ## + +To emulate a tree topology with M roots and N leafs, your cluster needs to have +M machines to run RootNode, N machines to run LeafNode and one machine to run +DriverNode. + +If M is larger than 1, one more machine is needed to enable LoadBalancer. + +Memory container groups and network container groups need to be disabled on each +machine. You can achieve this by archer a kernel with appropriate flags, i.e., + +archer file -m "" -a "cgroup_disable=net,memory" + +## Run oldisim ## + +### step 1. Start LeafNode ### + +Copy the binary (release/workloads/search/LeafNode) to all the machines +allocated for LeafNode. + +Run the following command: +$PATH_TO_BINARY/LeafNode + +You can run "$PATH_TO_BINARY/LeafNode --help" for more usage details. + +### step 2. Start RootNode ### + +Copy the binary (release/workloads/search/ParentNode) to all the machines +allocated for RootNode. + +Run the following command: +$PATH_TO_BINARY/ParentNode --leaf= ... + --leaf= + +You can run "$PATH_TO_BINARY/ParentNode --help" for more usage details. + +### step 3. Start LoadBalancer (optional) ### + +Copy the binary (release/workloads/search/LoadBalancerNode) to the +machine allocated for LoadBalancerNode. + +Run the following command: +$PATH_TO_BINARY/LoadBalancerNode --parent= ... + --parent= + +You can run "$PATH_TO_BINARY/LoadBalancerNode --help" for more usage details. + +### step 4. Start DriverNode ### + +Copy the binary (release/workloads/search/DriverNode) to the machine +allocated for DriverNode. + +Run the following command: +$PATH_TO_BINARY/DriverNode --server= ... + --server= + +You can run "$PATH_TO_BINARY/DriverNode --help" for more usage details. + diff --git a/SConstruct b/SConstruct new file mode 100644 index 0000000..d4dd7a2 --- /dev/null +++ b/SConstruct @@ -0,0 +1,61 @@ +# Copyright 2015 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# user specified variables +vars = Variables('custom.py') +vars.Add(BoolVariable('RELEASE', 'Set to 1 to build for release', 1)) +vars.Add(BoolVariable('STATICLINK', 'Set to 1 to link libraries statically', 0)) +vars.Add(BoolVariable('TCMALLOC', 'Set to 1 to use tcmalloc', 1)) +vars.Add(('CXX', 'The default C++ compiler used')) +vars.Add(('LD', 'The default linker used')) +vars.Add(('AR', 'The default make library archive used')) +vars.Add(('NM', 'The default symbol table viewer used')) +vars.Add(('CPPPATH', 'System default C++ include directory')) +vars.Add(('LIBPATH', 'System default library directory')) + +env = Environment(variables = vars) + +if env['RELEASE']: + mymode = 'release' + env.Append(CCFLAGS=['-O3', '-DNDEBUG']) +else: + mymode = 'debug' + +#tell the user what we're doing +print '**** Compiling in ' + mymode + ' mode...' +if env['STATICLINK']: + print '**** Using static linking...' + env.Append(LINKFLAGS=['--static']) + +buildroot = '#' + mymode #holds the root of the build directory tree + +# add project-wide includes +env.Append(CPPPATH = ['#oldisim/include/', '#third_party/cereal/include/', '#third_party/boost_1_53_0']) + +# use pthreads +env.Append(CXXFLAGS=['-pthread']) +env.Append(LINKFLAGS=['-pthread']) + +#make sure the sconscripts can get to the variables +Export('env', 'buildroot') + +#put all .sconsign files in one place +env.SConsignFile() + +#specify the sconscript for myprogram +project = 'oldisim' +SConscript(project + '/SConscript', exports=['project']) + +project = 'workloads' +SConscript(project + '/SConscript', exports=['project']) diff --git a/oldisim/SConscript b/oldisim/SConscript new file mode 100644 index 0000000..d01566c --- /dev/null +++ b/oldisim/SConscript @@ -0,0 +1,68 @@ +# Copyright 2015 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import glob + +#get all the build variables we need +Import('env', 'project', 'buildroot') +localenv = env.Clone() + +builddir = buildroot + '/' + project #holds the build directory for this project +targetpath = builddir + '/' + project #holds the path to the executable in the build directory + +#specify the build directory +localenv.VariantDir(builddir, ".") + +localenv.Append(CCFLAGS = '-std=c++11 -D_GNU_SOURCE -ggdb') + +conf = localenv.Configure(config_h = "config.h") +conf.Define("__STDC_FORMAT_MACROS") + +if not conf.CheckCXX(): + print "A compiler with C++11 support is required." + Exit(1) + +conf.CheckLib("rt", "clock_gettime", language="C++") + +if not conf.CheckLibWithHeader("event", "event2/event.h", "C++"): + print "libevent required" + Exit(1) + +import SCons +if SCons.Conftest.CheckLib(SCons.SConf.CheckContext(conf), ["event_pthreads"], "evthread_use_pthreads", extra_libs=["event", "rt"], language="C++"): + print "libevent_pthreads required" + Exit(1) + +if not conf.CheckLibWithHeader("pthread", "pthread.h", "C++"): + print "pthread required" + Exit(1) + +if not conf.CheckCXXHeader("boost/lockfree/queue.hpp"): + print "libboost with lockfree required" + Exit(1) + +# Use TCMalloc +if env['TCMALLOC']: + # only need unwind on a static link + tcmalloc_extralibs = [] + if env['STATICLINK']: + tcmalloc_extralibs = ["unwind"] + if SCons.Conftest.CheckLib(SCons.SConf.CheckContext(conf), ["tcmalloc"], extra_libs=tcmalloc_extralibs, language="C++"): + print "tcmalloc required" + Exit(1) + +localenv = conf.Finish() + +srclst = map(lambda x: builddir + '/' + x, glob.glob('src/*.cc')) +localenv.Library(targetpath, source=srclst) diff --git a/oldisim/include/oldisim/Callbacks.h b/oldisim/include/oldisim/Callbacks.h new file mode 100644 index 0000000..b09da53 --- /dev/null +++ b/oldisim/include/oldisim/Callbacks.h @@ -0,0 +1,50 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_CALLBACKS_H +#define OLDISIM_CALLBACKS_H + +#include +#include + +namespace oldisim { + +class ChildConnection; +class FanoutManager; +class NodeThread; +class ParentConnection; +class QueryContext; +class ResponseContext; +class TestDriver; + +typedef std::function AcceptCallback; + +typedef std::function LeafNodeThreadStartupCallback; +typedef std::function LeafNodeQueryCallback; + +typedef std::function + ParentNodeThreadStartupCallback; +typedef std::function + ParentNodeQueryCallback; + +typedef std::function + DriverNodeThreadStartupCallback; +typedef std::function + DriverNodeResponseCallback; +typedef std::function + DriverNodeMakeRequestCallback; +} // namespace oldisim + +#endif // OLDISIM_CALLBACKS_H + diff --git a/oldisim/include/oldisim/ChildConnection.h b/oldisim/include/oldisim/ChildConnection.h new file mode 100644 index 0000000..f5033db --- /dev/null +++ b/oldisim/include/oldisim/ChildConnection.h @@ -0,0 +1,61 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_CHILD_CONNECTION_H +#define OLDISIM_CHILD_CONNECTION_H + +#include +#include +#include +#include + +#include +#include + +#include "oldisim/ChildConnectionStats.h" + +namespace oldisim { + +class ConnectionUtil; +class Query; + +/** + * This classes represents one part of a bi-directional connection between + * two nodes in the fanout tree. Specifically, this class is owned by the + * parent node in the tree and represents the connection established to the + * child node by the parent node of the tree. This class allows the parent to + * send requests to the child node. + */ +class ChildConnection { + friend ConnectionUtil; + + public: + ~ChildConnection(); + + void IssueRequest(uint32_t type, uint64_t request_id, const void* payload, + uint32_t length); + void Reset(); + + void set_priority(int pri); + int GetNumOutstandingRequests() const; + + private: + class ChildConnectionImpl; + const std::unique_ptr impl_; + explicit ChildConnection(std::unique_ptr impl); +}; +} // namespace oldisim + +#endif // OLDISIM_CHILD_CONNECTION_H + diff --git a/oldisim/include/oldisim/ChildConnectionStats.h b/oldisim/include/oldisim/ChildConnectionStats.h new file mode 100644 index 0000000..5ee7a25 --- /dev/null +++ b/oldisim/include/oldisim/ChildConnectionStats.h @@ -0,0 +1,136 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_CHILD_CONNECTION_STATS_H +#define OLDISIM_CHILD_CONNECTION_STATS_H + +#include + +#include +#include +#include +#include + +#include "oldisim/Log.h" +#include "oldisim/LogHistogramSampler.h" +#include "oldisim/Query.h" +#include "oldisim/Response.h" +#include "oldisim/Util.h" + +namespace oldisim { + +class ChildConnectionStats { + public: + explicit ChildConnectionStats(const std::set& query_types) { + const int kHistogramBins = 1000; + for (auto type : query_types) { + // query_samplers_.emplace(type, std::unique_ptr(new + // LogHistogramSampler(kHistogramBins))); + query_samplers_.insert( + std::make_pair(type, LogHistogramSampler(kHistogramBins))); + query_processing_time_samplers_.insert( + std::make_pair(type, LogHistogramSampler(kHistogramBins))); + tx_bytes_[type] = 0; + rx_bytes_[type] = 0; + query_counts_[type] = 0; + dropped_requests_[type] = 0; + } + start_time_ = GetTimeAccurateNano(); + } + + uint64_t start_time_; + uint64_t end_time_; + std::map query_samplers_; + std::map query_processing_time_samplers_; + std::map tx_bytes_; + std::map rx_bytes_; + std::map query_counts_; + std::map dropped_requests_; + + void LogRequest(const Query& request) { + assert(tx_bytes_.count(request.GetType()) > 0); + assert(query_counts_.count(request.GetType()) > 0); + + tx_bytes_.at(request.GetType()) += request.GetQueryPacketSize(); + query_counts_.at(request.GetType())++; + } + + void LogResponse(const Query& originating_request, const Response& response) { + assert(query_samplers_.count(originating_request.GetType()) > 0); + assert(query_processing_time_samplers_.count( + originating_request.GetType()) > 0); + assert(tx_bytes_.count(response.GetType()) > 0); + + query_samplers_.at(originating_request.GetType()) + .sample(originating_request.Time()); + query_processing_time_samplers_.at(originating_request.GetType()) + .sample(response.GetProcessingTime()); + rx_bytes_.at(response.GetType()) += response.GetResponsePacketSize(); + } + + void LogDroppedRequest(uint32_t request_type) { + assert(dropped_requests_.count(request_type) > 0); + dropped_requests_.at(request_type)++; + } + + void Accumulate(const ChildConnectionStats& cs) { + assert(cs.query_samplers_.size() == query_samplers_.size()); + assert(cs.query_processing_time_samplers_.size() == + query_processing_time_samplers_.size()); + assert(cs.tx_bytes_.size() == tx_bytes_.size()); + assert(cs.rx_bytes_.size() == rx_bytes_.size()); + assert(cs.query_counts_.size() == query_counts_.size()); + assert(cs.dropped_requests_.size() == dropped_requests_.size()); + + for (const auto& sampler : cs.query_samplers_) { + query_samplers_.at(sampler.first).accumulate(sampler.second); + } + + for (const auto& sampler : cs.query_processing_time_samplers_) { + query_processing_time_samplers_.at(sampler.first) + .accumulate(sampler.second); + } + + for (const auto& stat : cs.tx_bytes_) { + tx_bytes_[stat.first] += stat.second; + } + + for (const auto& stat : cs.rx_bytes_) { + rx_bytes_[stat.first] += stat.second; + } + + for (const auto& stat : cs.query_counts_) { + query_counts_[stat.first] += stat.second; + } + + for (const auto& stat : cs.dropped_requests_) { + dropped_requests_[stat.first] += stat.second; + } + } + + void Reset() { + for (const auto& stat : query_samplers_) { + query_samplers_.at(stat.first).Reset(); + query_processing_time_samplers_.at(stat.first).Reset(); + tx_bytes_[stat.first] = 0; + rx_bytes_[stat.first] = 0; + query_counts_[stat.first] = 0; + dropped_requests_[stat.first] = 0; + } + start_time_ = GetTimeAccurateNano(); + } +}; +} // namespace oldisim + +#endif // OLDISIM_CHILD_CONNECTION_STATS_H diff --git a/oldisim/include/oldisim/DriverNode.h b/oldisim/include/oldisim/DriverNode.h new file mode 100644 index 0000000..0a0b2e3 --- /dev/null +++ b/oldisim/include/oldisim/DriverNode.h @@ -0,0 +1,80 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_DRIVER_NODE_H +#define OLDISIM_DRIVER_NODE_H + +#include + +#include +#include + +#include "oldisim/Callbacks.h" + +namespace oldisim { + +class ChildConnection; +class NodeThread; + +class DriverNode { + public: + DriverNode(const std::string& hostname, uint16_t port); + ~DriverNode(); + void Run(uint32_t num_threads, bool thread_pinning, + uint32_t num_connections_per_thread, uint32_t max_connection_depth); + void Shutdown(); + + /** + * Set the callback to run after a thread has started up. + * It will run in the context of the newly started thread. + */ + void SetThreadStartupCallback( + const DriverNodeThreadStartupCallback& callback); + + /** + * Set the callback to run when the driver needs a request to send to the + * service under test. It will run in the context of the driver thread. + */ + void SetMakeRequestCallback(const DriverNodeMakeRequestCallback& callback); + + /** + * Set the callback to run after a reply is received from the workload. + * It will run in the context of the event thread that is responsible + * for the connection. The callback will be used for incoming replies + * of the given type. + */ + void RegisterReplyCallback(uint32_t type, + const DriverNodeResponseCallback& callback); + + /** + * Inform the driver node that it can send requests of the specified + * type + */ + void RegisterRequestType(uint32_t type); + + /** + * Enable remote statistics monitoring at a given port. + * It exposes a HTTP server with several URLs that provide diagnostic + * and monitoring information + */ + void EnableMonitoring(uint16_t port); + + private: + struct DriverNodeImpl; + struct DriverNodeThread; + std::unique_ptr impl_; +}; +} // namespace oldisim + +#endif // OLDISIM_DRIVER_NODE_H diff --git a/oldisim/include/oldisim/FanoutManager.h b/oldisim/include/oldisim/FanoutManager.h new file mode 100644 index 0000000..f570b4f --- /dev/null +++ b/oldisim/include/oldisim/FanoutManager.h @@ -0,0 +1,80 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_FAN_OUT_MANAGER_H +#define OLDISIM_FAN_OUT_MANAGER_H + +#include +#include + +#include +#include + +namespace oldisim { + +class ChildConnection; +class ParentNodeServer; +class QueryContext; + +struct FanoutRequest { + uint32_t child_node_id; + uint32_t request_type; + const void* request_data; + uint32_t request_data_length; +}; + +struct FanoutReply { + bool timed_out; + uint32_t child_node_id; + uint32_t request_type; + std::unique_ptr reply_data; + uint32_t reply_data_length; + float latency_ms; +}; + +struct FanoutReplyTracker { + uint64_t starting_request_id; + int num_requests; + int num_replies_received; + std::vector replies; + bool closed; // Marked as such when timed out or when all replies received + uint64_t start_time; +}; + +class FanoutManager { + friend ParentNodeServer; + + public: + // Methods to create child connections, used by user programs + void MakeChildConnection(uint32_t child_node_id); + void MakeChildConnections(uint32_t child_node_id, int num); + + // Methods to perform fanout queries + typedef std::function + FanoutDoneCallback; + void Fanout(QueryContext&& originating_query, const FanoutRequest* requests, + int num_requests, const FanoutDoneCallback& callback, + double timeout_ms = 0.0); + void FanoutAll(QueryContext&& originating_query, const FanoutRequest& request, + const FanoutDoneCallback& callback, double timeout_ms = 0.0); + + private: + struct FanoutManagerImpl; + std::unique_ptr impl_; + explicit FanoutManager(std::unique_ptr impl); +}; +} // namespace oldisim + +#endif // OLDISIM_FAN_OUT_MANAGER_H + diff --git a/oldisim/include/oldisim/LeafNodeServer.h b/oldisim/include/oldisim/LeafNodeServer.h new file mode 100644 index 0000000..4b7fca0 --- /dev/null +++ b/oldisim/include/oldisim/LeafNodeServer.h @@ -0,0 +1,84 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_LEAF_NODE_SERVER_H +#define OLDISIM_LEAF_NODE_SERVER_H + +#include + +#include + +#include "oldisim/Callbacks.h" + +namespace oldisim { + +class NodeThread; +class ParentConnection; +class QueryContext; +class LeafNodeServerThread; + +class LeafNodeServer { + public: + explicit LeafNodeServer(uint16_t port); + ~LeafNodeServer(); + + /* Set various configuration parameters for the leaf node server */ + void SetNumThreads(uint32_t num_threads); + void SetThreadPinning(bool use_thread_pinning); + void SetThreadLoadBalancing(bool use_thread_lb); + void SetThreadLoadBalancingParams(int lb_process_connections_batch_size, + int lb_process_request_batch_size); + + void Run(); + void Shutdown(); + + /** + * Set the callback to run after a thread has started up. + * It will run in the context of the newly started thread. + */ + void SetThreadStartupCallback(const LeafNodeThreadStartupCallback& callback); + + /** + * Set the callback to run after an incoming connection is accepted and a + * ParentConnection object representing that connection has been made. + * It will run in the context of the thread that the connection is assigned + * to. + */ + void SetAcceptCallback(const AcceptCallback& callback); + + /** + * Set the callback to run after an incoming query is received. + * It will run in the context of the event thread that is responsible + * for the connection. The callback will be used for incoming queries + * of a given type + */ + void RegisterQueryCallback(uint32_t type, + const LeafNodeQueryCallback& callback); + + /** + * Enable remote statistics monitoring at a given port. + * It exposes a HTTP server with several URLs that provide diagnostic + * and monitoring information + */ + void EnableMonitoring(uint16_t port); + + private: + struct LeafNodeServerImpl; + struct LeafNodeServerThread; + std::unique_ptr impl_; +}; +} // namespace oldisim + +#endif // OLDISIM_LEAF_NODE_SERVER_H + diff --git a/oldisim/include/oldisim/LeafNodeStats.h b/oldisim/include/oldisim/LeafNodeStats.h new file mode 100644 index 0000000..5c9708a --- /dev/null +++ b/oldisim/include/oldisim/LeafNodeStats.h @@ -0,0 +1,108 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_LEAF_NODE_STATS_H +#define OLDISIM_LEAF_NODE_STATS_H + +#include +#include + +#include +#include + +#include "oldisim/Log.h" +#include "oldisim/LogHistogramSampler.h" +#include "oldisim/QueryContext.h" +#include "oldisim/Response.h" + +namespace oldisim { + +class LeafNodeStats { + public: + explicit LeafNodeStats(const std::set& query_types) { + const int kHistogramBins = 200; + for (auto type : query_types) { + tx_bytes_[type] = 0; + rx_bytes_[type] = 0; + query_counts_[type] = 0; + response_counts_[type] = 0; + processing_time_samplers_.insert( + std::make_pair(type, LogHistogramSampler(kHistogramBins))); + } + } + + std::map tx_bytes_; + std::map rx_bytes_; + std::map query_counts_; + std::map response_counts_; + std::map processing_time_samplers_; + + void LogQuery(const QueryContext& query) { + assert(rx_bytes_.count(query.type) > 0); + assert(query_counts_.count(query.type) > 0); + rx_bytes_.at(query.type) += query.packet_length; + query_counts_.at(query.type)++; + } + + void LogResponse(const Response& response) { + assert(tx_bytes_.count(response.GetType()) > 0); + assert(response_counts_.count(response.GetType()) > 0); + assert(processing_time_samplers_.count(response.GetType()) > 0); + tx_bytes_.at(response.GetType()) += response.GetResponsePacketSize(); + response_counts_.at(response.GetType())++; + processing_time_samplers_.at(response.GetType()) + .sample(response.GetProcessingTime()); + } + + void Accumulate(const LeafNodeStats& cs) { + assert(cs.tx_bytes_.size() == tx_bytes_.size()); + assert(cs.rx_bytes_.size() == rx_bytes_.size()); + assert(cs.query_counts_.size() == query_counts_.size()); + assert(cs.processing_time_samplers_.size() == + processing_time_samplers_.size()); + + for (auto& stat : cs.tx_bytes_) { + tx_bytes_.at(stat.first) += stat.second; + } + + for (auto& stat : cs.rx_bytes_) { + rx_bytes_.at(stat.first) += stat.second; + } + + for (auto& stat : cs.query_counts_) { + query_counts_.at(stat.first) += stat.second; + } + + for (auto& stat : cs.response_counts_) { + response_counts_.at(stat.first) += stat.second; + } + + for (const auto& sampler : cs.processing_time_samplers_) { + processing_time_samplers_.at(sampler.first).accumulate(sampler.second); + } + } + + void Reset() { + for (auto& stat : tx_bytes_) { + tx_bytes_[stat.first] = 0; + rx_bytes_[stat.first] = 0; + query_counts_[stat.first] = 0; + response_counts_[stat.first] = 0; + processing_time_samplers_.at(stat.first).Reset(); + } + } +}; +} // namespace oldisim + +#endif // OLDISIM_LEAF_NODE_STATS_H diff --git a/oldisim/include/oldisim/Log.h b/oldisim/include/oldisim/Log.h new file mode 100644 index 0000000..d062833 --- /dev/null +++ b/oldisim/include/oldisim/Log.h @@ -0,0 +1,46 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_LOG_H +#define OLDISIM_LOG_H + +#include + +enum log_level_t { DEBUG, VERBOSE, INFO, WARN, QUIET }; +extern log_level_t log_level; + +void log_file_line(log_level_t level, const char* file, int line, + const char* format, ...); +#define L(level, args...) log_file_line(level, __FILE__, __LINE__, args) + +#define D(args...) L(DEBUG, args) +#define V(args...) L(VERBOSE, args) +#define I(args...) L(INFO, args) +#define W(args...) L(WARN, args) + +#define DIE(args...) \ + do { \ + W(args); \ + exit(-1); \ + } while (0) + +#define NOLOG(x) \ + do { \ + log_level_t old = log_level; \ + log_level = QUIET; \ + (x); \ + log_level = old; \ + } while (0) + +#endif // OLDISIM_LOG_H diff --git a/oldisim/include/oldisim/LogHistogramSampler.h b/oldisim/include/oldisim/LogHistogramSampler.h new file mode 100644 index 0000000..a234bc2 --- /dev/null +++ b/oldisim/include/oldisim/LogHistogramSampler.h @@ -0,0 +1,142 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_LOGHISTOGRAMSAMPLER_H +#define OLDISIM_LOGHISTOGRAMSAMPLER_H + +#include +#include +#include + +#include +#include +#include + +#define _POW 1.1 + +namespace oldisim { + +class LogHistogramSampler { + public: + std::vector bins_; + + double sum_; + double sum_sq_; + + LogHistogramSampler() = delete; + explicit LogHistogramSampler(int _bins) : sum_(0), sum_sq_(0) { + assert(_bins > 0); + + bins_.resize(_bins + 1, 0); + } + + void sample(double s) { + assert(s >= 0); + size_t bin = log(s) / log(_POW); + + sum_ += s; + sum_sq_ += s * s; + + if ((int64_t)bin < 0) { + bin = 0; + } else if (bin >= bins_.size()) { + bin = bins_.size() - 1; + } + + bins_[bin]++; + } + + double average() const { + if (total() == 0) { + return std::numeric_limits::quiet_NaN(); + } + return sum_ / total(); + } + + double stddev() const { + if (total() == 0) { + return std::numeric_limits::quiet_NaN(); + } + return sqrt(sum_sq_ / total() - pow(sum_ / total(), 2.0)); + } + + double minimum() const { + if (total() == 0) { + return std::numeric_limits::quiet_NaN(); + } + for (size_t i = 0; i < bins_.size(); i++) { + if (bins_.at(i) > 0) { + return pow(_POW, static_cast(i) + 0.5); + } + } + DIE("Not implemented"); + } + + double get_nth(double nth) const { + if (total() == 0) { + return std::numeric_limits::quiet_NaN(); + } + + uint64_t count = total(); + uint64_t n = 0; + double target = count * nth / 100; + + for (size_t i = 0; i < bins_.size(); i++) { + n += bins_.at(i); + + if (n > target) { // The nth is inside bins_[i]. + double left = target - (n - bins_.at(i)); + return pow(_POW, static_cast(i)) + + left / bins_.at(i) * + (pow(_POW, static_cast(i + 1)) - + pow(_POW, static_cast(i))); + } + } + + return pow(_POW, bins_.size()); + } + + uint64_t total() const { + uint64_t sum_ = 0.0; + + for (auto i : bins_) { + sum_ += i; + } + + return sum_; + } + + void accumulate(const LogHistogramSampler& h) { + assert(bins_.size() == h.bins_.size()); + + for (size_t i = 0; i < bins_.size(); i++) { + bins_[i] += h.bins_[i]; + } + + sum_ += h.sum_; + sum_sq_ += h.sum_sq_; + } + + void Reset() { + for (auto& i : bins_) { + i = 0; + } + sum_ = 0; + sum_sq_ = 0; + } +}; +} // namespace oldisim + +#endif // OLDISIM_LOGHISTOGRAMSAMPLER_H + diff --git a/oldisim/include/oldisim/NodeThread.h b/oldisim/include/oldisim/NodeThread.h new file mode 100644 index 0000000..0310446 --- /dev/null +++ b/oldisim/include/oldisim/NodeThread.h @@ -0,0 +1,50 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_NODE_THREAD_H +#define OLDISIM_NODE_THREAD_H + +#include +#include + +#include +#include + +namespace oldisim { + +class ChildConnection; +class LeafNodeServer; +class ParentNodeServer; +class DriverNode; + +class NodeThread { + friend LeafNodeServer; + friend ParentNodeServer; + friend DriverNode; + + public: + int get_thread_num() const; + pthread_t get_pthread() const; + event_base* get_event_base() const; + + private: + struct NodeThreadImpl; + std::unique_ptr impl_; + + NodeThread(); +}; +} // namespace oldisim + +#endif // OLDISIM_NODE_THREAD_H + diff --git a/oldisim/include/oldisim/ParentConnection.h b/oldisim/include/oldisim/ParentConnection.h new file mode 100644 index 0000000..48cfeb5 --- /dev/null +++ b/oldisim/include/oldisim/ParentConnection.h @@ -0,0 +1,61 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_PARENT_CONNECTION_H +#define OLDISIM_PARENT_CONNECTION_H + +#include +#include + +#include +#include +#include +#include + +namespace oldisim { + +class ConnectionUtil; +class QueryContext; +class ParentConnectionStats; +class Response; + +/** + * This classes represents one part of a bi-directional connection between + * two nodes in the fanout tree. Specifically, this class is owned by the + * child node in the tree and represents the connection established to the child + * node by the parent node of the tree. This class allows the child to + * send replies to requests sent to the child by the parent node. + */ +class ParentConnection { + friend QueryContext; + friend ConnectionUtil; + + public: + ~ParentConnection(); + ParentConnection(const ParentConnection& that) = delete; + void SendResponse(uint32_t response_type, uint64_t query_id, + uint64_t start_time, uint64_t processing_time, + const void* data, uint32_t data_length, + std::function logger = nullptr); + + private: + struct ParentConnectionImpl; + const std::unique_ptr impl_; + + explicit ParentConnection(std::unique_ptr impl); +}; +} // namespace oldisim + +#endif // OLDISIM_PARENT_CONNECTION_H + diff --git a/oldisim/include/oldisim/ParentNodeServer.h b/oldisim/include/oldisim/ParentNodeServer.h new file mode 100644 index 0000000..9ddfdd2 --- /dev/null +++ b/oldisim/include/oldisim/ParentNodeServer.h @@ -0,0 +1,91 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_PARENT_NODE_SERVER_H +#define OLDISIM_PARENT_NODE_SERVER_H + +#include + +#include +#include + +#include "oldisim/Callbacks.h" + +namespace oldisim { + +class ChildConnection; +class NodeThread; +class ParentConnection; +class QueryContext; +class ResponseContext; + +class ParentNodeServer { + public: + explicit ParentNodeServer(uint16_t port); + ~ParentNodeServer(); + void Run(uint32_t num_threads, bool thread_pinning); + void Shutdown(); + + /** + * Set the callback to run after a thread has started up. + * It will run in the context of the newly started thread. + */ + void SetThreadStartupCallback( + const ParentNodeThreadStartupCallback& callback); + + /** + * Set the callback to run after an incoming connection from a higher + * level parent is accepted and a + * ParentConnection object representing that connection has been made. + * It will run in the context of main event loop thread. + */ + void SetAcceptCallback(const AcceptCallback& callback); + + /** + * Set the callback to run after an incoming query is received from a parent. + * It will run in the context of the event thread that is responsible + * for the connection. The callback will be used for incoming queries + * of the given type. + */ + void RegisterQueryCallback(uint32_t type, + const ParentNodeQueryCallback& callback); + + /** + * Inform the parent node server that it can send requests of the specified + * type + */ + void RegisterRequestType(uint32_t type); + + /** + * Add a hostname:port as a child node that requests can be sent to. + * Note that it is up to the thread to create the actual connections. + */ + void AddChildNode(std::string hostname, uint16_t port); + + /** + * Enable remote statistics monitoring at a given port. + * It exposes a HTTP server with several URLs that provide diagnostic + * and monitoring information + */ + void EnableMonitoring(uint16_t port); + + private: + struct ParentNodeServerImpl; + struct ParentNodeServerThread; + std::unique_ptr impl_; +}; +} // namespace oldisim + +#endif // OLDISIM_PARENT_NODE_SERVER_H + diff --git a/oldisim/include/oldisim/Query.h b/oldisim/include/oldisim/Query.h new file mode 100644 index 0000000..3be22d8 --- /dev/null +++ b/oldisim/include/oldisim/Query.h @@ -0,0 +1,68 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_QUERY_H +#define OLDISIM_QUERY_H + +#include + +#include +#include + +namespace oldisim { + +struct __attribute__((__packed__)) QueryPacketHeader { + uint32_t type; + uint64_t request_id; + uint64_t start_time; + uint32_t payload_length; // does not include header length +}; + +/** + * Internal tracking data structure of a query. + * The user does not directly interact with this, only with blobs + * of data + */ +class Query { + public: + uint64_t end_time_; + QueryPacketHeader query_header_; + + uint64_t Time() const { return (end_time_ - query_header_.start_time); } + + uint32_t GetQueryPacketSize() const { + return query_header_.payload_length + sizeof(query_header_); + } + + uint32_t GetPayloadLength() const { return query_header_.payload_length; } + + uint32_t GetType() const { return query_header_.type; } + + uint64_t GetRequestID() const { return query_header_.request_id; } + + uint64_t GetStartTime() const { return query_header_.start_time; } + + QueryPacketHeader GetHeaderNetworkOrder() const { + QueryPacketHeader header = query_header_; + header.type = htobe32(header.type); + header.request_id = htobe64(header.request_id); + header.start_time = htobe64(header.start_time); + header.payload_length = htobe32(header.payload_length); + + return header; + } +}; +} // namespace oldisim + +#endif // OLDISIM_QUERY_H diff --git a/oldisim/include/oldisim/QueryContext.h b/oldisim/include/oldisim/QueryContext.h new file mode 100644 index 0000000..93d125f --- /dev/null +++ b/oldisim/include/oldisim/QueryContext.h @@ -0,0 +1,63 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_QUERY_CONTEXT_H +#define OLDISIM_QUERY_CONTEXT_H + +#include +#include + +#include "oldisim/ParentConnection.h" + +namespace oldisim { + +class ParentConnection; +class LeafNodeServer; +class ParentNodeServer; + +class QueryContext { + friend ParentConnection; + friend LeafNodeServer; + friend ParentNodeServer; + + public: + QueryContext(QueryContext&& other); + QueryContext(const QueryContext& other) = delete; + ~QueryContext(); + + const uint32_t type; + const uint64_t request_id; + const uint64_t start_time; + const uint64_t received_time; + const uint32_t payload_length; + const uint32_t packet_length; + void* const payload; + void SendResponse(const void* data, uint32_t data_length); + + private: + ParentConnection& connection; + bool response_sent; + bool is_active; + bool is_payload_heap; + std::function logger; + + QueryContext(ParentConnection& _connection, uint32_t _type, + uint64_t _request_id, uint64_t start_time, + uint32_t _payload_length, uint32_t _packet_length, + void* _payload, bool _is_payload_heap); +}; +} // namespace oldisim + +#endif // OLDISIM_QUERY_CONTEXT_H + diff --git a/oldisim/include/oldisim/Response.h b/oldisim/include/oldisim/Response.h new file mode 100644 index 0000000..e09d62b --- /dev/null +++ b/oldisim/include/oldisim/Response.h @@ -0,0 +1,113 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_RESPONSE_H +#define OLDISIM_RESPONSE_H + +#include + +#include +#include + +#include "oldisim/Query.h" + +namespace oldisim { + +struct __attribute__((__packed__)) ResponsePacketHeader { + uint32_t type; + uint64_t request_id; + uint64_t start_time; + uint64_t processing_time; + uint32_t payload_length; // does not include header length + + ResponsePacketHeader(uint32_t _type, uint64_t _request_id, + uint64_t _start_time, uint64_t _processing_time, + uint32_t _payload_length) + : type(_type), + request_id(_request_id), + start_time(_start_time), + processing_time(_processing_time), + payload_length(_payload_length) {} +}; + +/** + * Internal tracking data structure of a response going back up the tree. + * The user does not directly interact with this, only with blobs + * of data + */ +class Response { + public: + ResponsePacketHeader response_header_; + const void* payload_; // only used to store received response + + Response() : response_header_(0, 0, 0, 0, 0), payload_(nullptr) {} + + Response(uint32_t type, uint64_t request_id, uint64_t start_time, + uint64_t processing_time, uint32_t payload_length) + : response_header_(type, request_id, start_time, processing_time, + payload_length), + payload_(nullptr) {} + + ResponsePacketHeader GetHeaderNetworkOrder() const { + ResponsePacketHeader header = response_header_; + header.type = htobe32(header.type); + header.request_id = htobe64(header.request_id); + header.start_time = htobe64(header.start_time); + header.processing_time = htobe64(header.processing_time); + header.payload_length = htobe32(header.payload_length); + + return header; + } + + static Response FromHeaderNetworkOrder(const ResponsePacketHeader* header) { + Response result; + ResponsePacketHeader& result_header = result.response_header_; + result_header.type = be32toh(header->type); + result_header.request_id = be64toh(header->request_id); + result_header.start_time = be64toh(header->start_time); + result_header.processing_time = be64toh(header->processing_time); + result_header.payload_length = be32toh(header->payload_length); + + return result; + } + + uint32_t GetResponsePacketSize() const { + return response_header_.payload_length + sizeof(response_header_); + } + + uint32_t GetPayloadLength() const { return response_header_.payload_length; } + + uint32_t GetType() const { return response_header_.type; } + + uint64_t GetRequestID() const { return response_header_.request_id; } + + uint64_t GetStartTime() const { return response_header_.start_time; } + + uint64_t GetProcessingTime() const { + return response_header_.processing_time; + } + + Query RebuildOriginatingQuery() const { + Query originating_query; + originating_query.query_header_.type = response_header_.type; + originating_query.query_header_.request_id = response_header_.request_id; + originating_query.query_header_.start_time = response_header_.start_time; + + return originating_query; + } +}; +} // namespace oldisim + +#endif // OLDISIM_RESPONSE_H + diff --git a/oldisim/include/oldisim/ResponseContext.h b/oldisim/include/oldisim/ResponseContext.h new file mode 100644 index 0000000..f5d23d9 --- /dev/null +++ b/oldisim/include/oldisim/ResponseContext.h @@ -0,0 +1,48 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_RESPONSE_CONTEXT_H +#define OLDISIM_RESPONSE_CONTEXT_H + +#include + +#include + +namespace oldisim { + +class ChildConnection; + +class ResponseContext { + friend ChildConnection; + + public: + uint32_t type; + uint64_t request_id; + uint32_t payload_length; + uint32_t packet_length; + const void* payload; + bool timed_out; + uint64_t request_timestamp; + uint64_t response_timestamp; + + private: + ResponseContext(uint32_t _type, uint64_t _request_id, + uint32_t _payload_length, uint32_t _packet_length, + const void* _payload, bool _timed_out, + uint64_t _request_timestamp, uint64_t _response_timestamp); +}; +} // namespace oldisim + +#endif // OLDISIM_RESPONSE_CONTEXT_H + diff --git a/oldisim/include/oldisim/TestDriver.h b/oldisim/include/oldisim/TestDriver.h new file mode 100644 index 0000000..2c1bded --- /dev/null +++ b/oldisim/include/oldisim/TestDriver.h @@ -0,0 +1,46 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_TEST_DRIVER_H +#define OLDISIM_TEST_DRIVER_H + +#include + +#include + +#include "oldisim/Callbacks.h" + +namespace oldisim { + +class ChildConnectionStats; +class DriverNode; + +class TestDriver { + friend DriverNode; + + public: + void Start(); + void SendRequest(uint32_t type, const void* payload, uint32_t payload_length, + uint64_t next_request_delay_us); + const ChildConnectionStats& GetConnectionStats() const; + + private: + struct TestDriverImpl; + std::unique_ptr impl_; + TestDriver(); +}; +} // namespace oldisim + +#endif // OLDISIM_TEST_DRIVER_H + diff --git a/oldisim/include/oldisim/Util.h b/oldisim/include/oldisim/Util.h new file mode 100644 index 0000000..15ecf2e --- /dev/null +++ b/oldisim/include/oldisim/Util.h @@ -0,0 +1,132 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef OLDISIM_UTIL_H +#define OLDISIM_UTIL_H + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "oldisim/Log.h" + +#define USE_CLOCK_GETTIME 1 + +inline std::string MakeAddress(std::string hostname, uint16_t port) { + std::stringstream ss; + ss << hostname << ":" << port; + return ss.str(); +} + +inline void MicroToTv(uint64_t val, struct timeval *tv) { + uint64_t secs = val / 1000000; + uint64_t usecs = val % 1000000; + + tv->tv_sec = secs; + tv->tv_usec = usecs; +} + +inline uint64_t TvToNano(struct timeval *tv) { + return (((uint64_t)tv->tv_sec) * 1000000 + tv->tv_usec) * 1000; +} + +inline void NanoToTv(uint64_t val, struct timeval *tv) { + uint64_t secs = val / 1000000000; + uint64_t usecs = (val % 1000000000) / 1000; + + tv->tv_sec = secs; + tv->tv_usec = usecs; +} + +inline double TvToDouble(struct timeval *tv) { + return tv->tv_sec + static_cast(tv->tv_usec) / 1000000; +} + +inline void DoubleToTv(double val, struct timeval *tv) { + uint64_t secs = (int64_t)val; + uint64_t usecs = (int64_t)((val - secs) * 1000000); + + tv->tv_sec = secs; + tv->tv_usec = usecs; +} + +inline uint64_t GetTimeAccurateNano() { +#if USE_CLOCK_GETTIME + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC_RAW, &ts); + return ((uint64_t)ts.tv_sec) * 1000000000 + ts.tv_nsec; +#else + struct timeval tv; + gettimeofday(&tv, nullptr); + return TvToNano(&tv); +#endif +} + +inline uint64_t GetTimeNano() { + struct timeval tv; + gettimeofday(&tv, nullptr); + return TvToNano(&tv); +} + +inline double GetTimeAccurate() { +#if USE_CLOCK_GETTIME + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC_RAW, &ts); + return ts.tv_sec + static_cast(ts.tv_nsec) / 1000000000; +#else + struct timeval tv; + gettimeofday(&tv, nullptr); + return TvToDouble(&tv); +#endif +} + +inline double GetTime() { + struct timeval tv; + gettimeofday(&tv, nullptr); + return TvToDouble(&tv); +} + +inline addrinfo *ResolveHost(std::string hostname, uint16_t port) { + addrinfo hints; + addrinfo *result = nullptr; + int err; + + // Set to resolve IP address + std::memset(&hints, 0, sizeof(hints)); + hints.ai_socktype = SOCK_STREAM; + hints.ai_family = AF_INET; + + // Resolve + if ((err = getaddrinfo(hostname.c_str(), NULL, &hints, &result)) != 0) { + DIE("Could not resolve %s, got error %d\n", hostname.c_str(), err); + } + + // Set port in ai_addr, treat as IPv4 + ((struct sockaddr_in *)(result->ai_addr))->sin_port = htobe16(port); + + return result; +} + +void sleep_time(double duration); + +#endif // OLDISIM_UTIL_H diff --git a/oldisim/src/AutoSnapshot.h b/oldisim/src/AutoSnapshot.h new file mode 100644 index 0000000..3b258e9 --- /dev/null +++ b/oldisim/src/AutoSnapshot.h @@ -0,0 +1,96 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef AUTO_SNAPSHOT_H +#define AUTO_SNAPSHOT_H + +#include +#include + +#include +#include +#include + +namespace oldisim { + +template +class AutoSnapshot { + public: + typedef std::function SnapshotDataCallback; + typedef std::function PostSnapshotCallback; + + AutoSnapshot(event_base* base, int snapshot_interval_sec, + const SnapshotDataCallback& data_callback, + const PostSnapshotCallback& post_callback = nullptr) + : snapshot_interval_sec_(snapshot_interval_sec), + data_callback_(data_callback), + post_callback_(post_callback) { + timer_event_ = evtimer_new(base, SnapshotTimerHandler, this); + } + + ~AutoSnapshot() { event_free(timer_event_); } + + void Enable() { AddTimer(*this); } + + void Disable() { evtimer_del(timer_event_); } + + unsigned int GetNumberSnapshots() const { + std::lock_guard lock(snapshots_lock_); + return snapshots_.size(); + } + + T PopSnapshot() { + std::lock_guard lock(snapshots_lock_); + T front = snapshots_.front(); + snapshots_.pop_front(); + return front; + } + + static void SnapshotTimerHandler(evutil_socket_t listener, int16_t flags, + void* arg) { + AutoSnapshot* self = reinterpret_cast(arg); + + // Insert into snapshots + T data = std::move(self->data_callback_()); + { + std::lock_guard lock(self->snapshots_lock_); + self->snapshots_.emplace_back(std::move(data)); + } + + // Call post snapshot function if any + if (self->post_callback_ != nullptr) { + self->post_callback_(); + } + + // Re-add the timer + AddTimer(*self); + } + + static void AddTimer(const AutoSnapshot& snapshotter) { + timeval t = {snapshotter.snapshot_interval_sec_, 0}; + evtimer_add(snapshotter.timer_event_, &t); + } + + private: + std::deque snapshots_; + mutable std::mutex snapshots_lock_; + event* timer_event_; + int snapshot_interval_sec_; + SnapshotDataCallback data_callback_; + PostSnapshotCallback post_callback_; +}; +} // namespace oldisim + +#endif // AUTO_SNAPSHOT_H + diff --git a/oldisim/src/CerealMapAsJSObject.h b/oldisim/src/CerealMapAsJSObject.h new file mode 100644 index 0000000..e6a9e8f --- /dev/null +++ b/oldisim/src/CerealMapAsJSObject.h @@ -0,0 +1,46 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef CEREAL_MAP_AS_JS_OBJECT_H +#define CEREAL_MAP_AS_JS_OBJECT_H + +#include +#include +#include + +// Clever hack to make cereal serialize a map as a JavaScript object +// Answer adopted from +// https://stackoverflow.com/questions/22569832/is-there-a-way-to-specify-a-simpler-json-de-serialization-for-stdmap-using-c +namespace cereal { +//! Saving for std::map +template +inline void save(Archive& ar, const std::map& map) { + for (const auto& i : map) { + ar(cereal::make_nvp(i.first, i.second)); + } +} + +//! Saving for std::map +template +inline void save(Archive& ar, const std::map& map) { + for (const auto& i : map) { + std::stringstream ss; + ss << i.first; + ar(cereal::make_nvp(ss.str(), i.second)); + } +} +} // namespace cereal + +#endif // CEREAL_MAP_AS_JS_OBJECT_H + diff --git a/oldisim/src/ChildConnection.cc b/oldisim/src/ChildConnection.cc new file mode 100644 index 0000000..a6678fa --- /dev/null +++ b/oldisim/src/ChildConnection.cc @@ -0,0 +1,261 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "oldisim/ChildConnection.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "ChildConnectionImpl.h" +#include "oldisim/Response.h" +#include "oldisim/ResponseContext.h" +#include "oldisim/Util.h" + +namespace oldisim { + +ChildConnection::ChildConnection(std::unique_ptr impl) + : impl_(std::move(impl)) {} + +ChildConnection::~ChildConnection() { + bufferevent_free(impl_->bev_); +} + +void ChildConnection::Reset() { + assert(impl_->num_outstanding_requests == 0); + impl_->read_state_ = ChildConnectionImpl::ReadState::WAITING; +} + +void ChildConnection::IssueRequest(uint32_t type, uint64_t request_id, + const void* payload, uint32_t length) { + // Start tracking the query in the system + Query query_internal; + + // Set the query header + query_internal.query_header_.type = type; + query_internal.query_header_.request_id = request_id; + query_internal.query_header_.payload_length = length; + + // Start timing begin of operation + query_internal.query_header_.start_time = GetTimeAccurateNano(); + + // Write out operation on the wire + QueryPacketHeader packet_header = + std::move(query_internal.GetHeaderNetworkOrder()); + bufferevent_write(impl_->bev_, &packet_header, sizeof(packet_header)); + if (length > 0) { + bufferevent_write(impl_->bev_, payload, length); + } + + // Log the request + impl_->thread_conn_stats_.LogRequest(query_internal); + + // Keep track of one more request_id + impl_->num_outstanding_requests++; +} + +void ChildConnection::set_priority(int pri) { + if (bufferevent_priority_set(impl_->bev_, pri)) + DIE("bufferevent_set_priority(bev_, %d) failed", pri); +} + +int ChildConnection::GetNumOutstandingRequests() const { + return impl_->num_outstanding_requests; +} + +/** + * Implementation details for ChildConnectionImpl + */ +ChildConnection::ChildConnectionImpl::ChildConnectionImpl( + const ResponseCallback& response_handler, const ClosedCallback& _closed_cb, + event_base* base, const addrinfo* address, + ChildConnectionStats& thread_conn_stats, bool store_queries, bool no_delay) + : base_(base), + closed_cb(_closed_cb), + response_cb(response_handler), + no_delay_(no_delay), + start_time_(GetTimeAccurate()), + thread_conn_stats_(thread_conn_stats), + read_state_(ReadState::WAITING), + num_outstanding_requests(0) { + // Open socket + int sockfd = 0; + if ((sockfd = socket(address->ai_family, address->ai_socktype, + address->ai_protocol)) < 0) { + DIE("\n Error : Could not create socket \n"); + } + + if (connect(sockfd, address->ai_addr, address->ai_addrlen) < 0) { + char ipstr[INET6_ADDRSTRLEN]; + void* addr; + + // Get the pointer to the address itself, + // different fields in IPv4 and IPv6: + if (address->ai_family == AF_INET) { // IPv4 + struct sockaddr_in* ipv4 = (struct sockaddr_in*)address->ai_addr; + addr = &(ipv4->sin_addr); + } else { // IPv6 + struct sockaddr_in6* ipv6 = (struct sockaddr_in6*)address->ai_addr; + addr = &(ipv6->sin6_addr); + } + inet_ntop(address->ai_family, addr, ipstr, sizeof ipstr); + DIE("\n Error : Connect Failed to %s \n", ipstr); + } + + // Make it send back without delay + if (no_delay_) { + int optval = 1; + if (setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &optval, sizeof(optval))) { + DIE("setsockopt(TCP_NODELAY) failed: %s", strerror(errno)); + } + } + + // Make it non-blocking + evutil_make_socket_nonblocking(sockfd); + + // Make buffer event + bev_ = bufferevent_socket_new(base_, sockfd, BEV_OPT_CLOSE_ON_FREE); +} + +// The followings are C trampolines for libevent callbacks. +void ChildConnection::ChildConnectionImpl::bev_event_cb(struct bufferevent* bev, + int16_t events, + void* ptr) { + ChildConnection* conn = reinterpret_cast(ptr); + + if (events & BEV_EVENT_TIMEOUT) { + DIE("Timeout from child"); + } else if (events & BEV_EVENT_ERROR) { + } else if (events & BEV_EVENT_EOF) { + D("Child closed connection"); + conn->impl_->read_state_ = ReadState::CLOSED; + bufferevent_disable(conn->impl_->bev_, EV_READ | EV_WRITE); + if (conn->impl_->closed_cb != nullptr) { + conn->impl_->closed_cb(*conn); + } + } +} + +/** + * Check to see if the buffer contains at least one full response that is ready + * for retrieval. If so, it returns true; false otherwise + * + * @param input evbuffer to read response from + * @return true if a response is ready to be read, false if not enough data in + *buffer + */ +static bool BufferContainsResponse(evbuffer* input) { + // Check length of input buffer + size_t buffer_length = evbuffer_get_length(input); + const size_t header_length = sizeof(ResponsePacketHeader); + if (buffer_length < header_length) { + return false; + } + ResponsePacketHeader* h = reinterpret_cast( + evbuffer_pullup(input, header_length)); + assert(h); + + // Not whole query + uint32_t packet_length = header_length + be32toh(h->payload_length); + if (buffer_length < packet_length) { + return false; + } + + // Must be full query + return true; +} + +void ChildConnection::ChildConnectionImpl::bev_read_cb(struct bufferevent* bev, + void* ptr) { + ChildConnection* conn = reinterpret_cast(ptr); + struct evbuffer* input = bufferevent_get_input(conn->impl_->bev_); + + // Protocol processing loop. + if (conn->impl_->num_outstanding_requests == 0) { + V("Spurious read callback."); + return; + } + + while (true) { + switch (conn->impl_->read_state_) { + case ReadState::CLOSED: { + DIE("event from closed connection"); + } + case ReadState::WAITING: { + if (BufferContainsResponse(input)) { + // Read out response header + const size_t header_length = sizeof(ResponsePacketHeader); + + ResponsePacketHeader* h = reinterpret_cast( + evbuffer_pullup(input, header_length)); + assert(h); + Response response = Response::FromHeaderNetworkOrder(h); + + // Remove the header + evbuffer_drain(input, header_length); + + // Linearize evbuffer to allow for payload reading + void* payload = + evbuffer_pullup(input, response.response_header_.payload_length); + response.payload_ = payload; + + uint64_t request_id = response.GetRequestID(); + + // Log query statistics + Query originating_query = response.RebuildOriginatingQuery(); + originating_query.end_time_ = GetTimeAccurateNano(); + + // Drained one request + conn->impl_->num_outstanding_requests--; + + // Call user provided callback, handing off full packet + ResponseContext context( + response.GetType(), response.GetRequestID(), + response.GetPayloadLength(), response.GetResponsePacketSize(), + response.payload_, false, originating_query.GetStartTime(), + originating_query.end_time_); + assert(conn->impl_->response_cb != nullptr); + conn->impl_->response_cb(context); + + // Remove the payload from the evbuffer + evbuffer_drain(input, response.response_header_.payload_length); + + conn->impl_->thread_conn_stats_.LogResponse(originating_query, + response); + break; + } else { + return; + } + } + default: { DIE("not implemented"); } + } + } +} + +void ChildConnection::ChildConnectionImpl::bev_write_cb(struct bufferevent* bev, + void* ptr) { + ChildConnection* conn = reinterpret_cast(ptr); + // Currently write cb does nothing +} +} // namespace oldisim + diff --git a/oldisim/src/ChildConnectionImpl.h b/oldisim/src/ChildConnectionImpl.h new file mode 100644 index 0000000..cf584dd --- /dev/null +++ b/oldisim/src/ChildConnectionImpl.h @@ -0,0 +1,77 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef CHILD_CONNECTION_IMPL_H +#define CHILD_CONNECTION_IMPL_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include "oldisim/ChildConnection.h" +#include "oldisim/ChildConnectionStats.h" +#include "InternalCallbacks.h" + +namespace oldisim { + +class Query; + +class ChildConnection::ChildConnectionImpl { + public: + event_base *base_; + bufferevent *bev_; + bool no_delay_; + + double start_time_; // Time when this connection began operations. + + ChildConnectionStats & + thread_conn_stats_; // Aggregates data over whole thread + + enum class ReadState { + WAITING, + CLOSED, + }; + ReadState read_state_; + + uint32_t num_outstanding_requests; + + typedef std::function ClosedCallback; + ClosedCallback closed_cb; + + ResponseCallback response_cb; + + ChildConnectionImpl(const ResponseCallback &response_handler, + const ClosedCallback &_closed_cb, event_base *base, + const addrinfo *address, + ChildConnectionStats &thread_conn_stats, + bool store_queries, bool no_delay); + + // The followings are C trampolines for libevent callbacks. + static void bev_event_cb(struct bufferevent *bev, int16_t events, void *ptr); + static void bev_read_cb(struct bufferevent *bev, void *ptr); + static void bev_write_cb(struct bufferevent *bev, void *ptr); +}; +} // namespace oldisim + +#endif // CHILD_CONNECTION_IMPL_H + diff --git a/oldisim/src/ConnectionUtil.cc b/oldisim/src/ConnectionUtil.cc new file mode 100644 index 0000000..ac32c64 --- /dev/null +++ b/oldisim/src/ConnectionUtil.cc @@ -0,0 +1,177 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ConnectionUtil.h" + +#include +#include +#include +#include +#include + +#include +#include + +#include "ChildConnectionImpl.h" +#include "ConnectionUtil.h" +#include "ParentConnectionImpl.h" +#include "oldisim/ChildConnectionStats.h" +#include "oldisim/LeafNodeStats.h" +#include "oldisim/Log.h" +#include "oldisim/NodeThread.h" +#include "oldisim/ParentConnection.h" +#include "oldisim/Query.h" + +namespace oldisim { + +std::unique_ptr ConnectionUtil::MakeParentConnection( + const ParentConnectionReceivedCallback& request_handler, + const ParentConnection::ParentConnectionImpl::ClosedCallback& close_handler, + const NodeThread& node_thread, int socket_fd, bool store_queries, + bool use_locking) { + typedef ParentConnection::ParentConnectionImpl ParentConnectionImpl; + + // Set socket to send without delay + int optval = 1; + if (setsockopt(socket_fd, IPPROTO_TCP, TCP_NODELAY, &optval, + sizeof(optval))) { + DIE("setsockopt(TCP_NODELAY) failed: %s", strerror(errno)); + } + evutil_make_socket_nonblocking(socket_fd); + + // Create buffer event for connection, associate it with an event base for + // a thread + int locking_opts = 0; + if (use_locking) { + locking_opts = BEV_OPT_THREADSAFE; + } + bufferevent* const bev = + bufferevent_socket_new(node_thread.get_event_base(), socket_fd, + BEV_OPT_CLOSE_ON_FREE | locking_opts); + + // Construct implementation details and connection + std::unique_ptr impl(new ParentConnectionImpl( + request_handler, close_handler, bev, use_locking)); + std::unique_ptr conn(new ParentConnection(std::move(impl))); + + // Set handlers for event base now that ParentConnection is constructed + bufferevent_setcb(bev, ParentConnectionImpl::bev_read_cb, NULL, + ParentConnectionImpl::bev_event_cb, conn.get()); + bufferevent_setwatermark(bev, EV_READ, sizeof(QueryPacketHeader), 0); + + return std::move(conn); +} + +void ConnectionUtil::EnableParentConnection(ParentConnection& connection) { + typedef ParentConnection::ParentConnectionImpl ParentConnectionImpl; + + connection.impl_->read_state = ParentConnectionImpl::ReadState::WAITING; + bufferevent_enable(connection.impl_->bev, EV_READ | EV_WRITE); +} + +std::unique_ptr ConnectionUtil::MakeChildConnection( + const ResponseCallback& response_handler, + const ChildConnection::ChildConnectionImpl::ClosedCallback& close_handler, + const NodeThread& node_thread, const addrinfo* address, + ChildConnectionStats& thread_conn_stats, bool store_queries, + bool no_delay) { + typedef ChildConnection::ChildConnectionImpl ChildConnectionImpl; + + // Construct implemntation details and connection + std::unique_ptr impl(new ChildConnectionImpl( + response_handler, close_handler, node_thread.get_event_base(), address, + thread_conn_stats, store_queries, no_delay)); + std::unique_ptr conn(new ChildConnection(std::move(impl))); + + // Set handlers for event base now that ParentConnection is constructed + bufferevent_setcb(conn->impl_->bev_, ChildConnectionImpl::bev_read_cb, NULL, + ChildConnectionImpl::bev_event_cb, conn.get()); + bufferevent_enable(conn->impl_->bev_, EV_READ | EV_WRITE); + bufferevent_setwatermark(conn->impl_->bev_, EV_READ, + sizeof(ResponsePacketHeader), 0); + + return std::move(conn); +} + +std::map> +ConnectionUtil::MakeChildConnectionStatsMap(const ChildConnectionStats& stats, + double elapsed_time) { + // Return QPS, RX BW, TX BW, mean, 50%, 90%, 95%, 99% latencies + std::map> results; + // Create stats for each query type + for (const auto& sampler_pair : stats.query_samplers_) { + uint32_t type = sampler_pair.first; + auto& sampler = sampler_pair.second; + double qps = stats.query_counts_.at(type) / elapsed_time; + double rx_mbps = stats.rx_bytes_.at(type) / elapsed_time / 1024 / 1024; + double tx_mbps = stats.tx_bytes_.at(type) / elapsed_time / 1024 / 1024; + double latency_mean = stats.query_samplers_.at(type).average() / 1000000; + double latency_50p = stats.query_samplers_.at(type).get_nth(50) / 1000000; + double latency_90p = stats.query_samplers_.at(type).get_nth(90) / 1000000; + double latency_95p = stats.query_samplers_.at(type).get_nth(95) / 1000000; + double latency_99p = stats.query_samplers_.at(type).get_nth(99) / 1000000; + double dropped_requests = stats.dropped_requests_.at(type) / elapsed_time; + results.insert( + std::make_pair(type, std::map( + {{"qps", qps}, + {"rx_mbps", rx_mbps}, + {"tx_mbps", tx_mbps}, + {"latency_mean", latency_mean}, + {"latency_50p", latency_50p}, + {"latency_90p", latency_90p}, + {"latency_95p", latency_95p}, + {"latency_99p", latency_99p}, + {"dropped_requests", dropped_requests}}))); + } + + return results; +} + +std::map> +ConnectionUtil::MakeLeafNodeStatsMap(const LeafNodeStats& stats, + double elapsed_time) { + // Return QPS, RX BW, TX BW, mean, 50%, 90%, 95%, 99% latencies + std::map> results; + // Create stats for each query type + for (const auto& sampler_pair : stats.processing_time_samplers_) { + uint32_t type = sampler_pair.first; + auto& sampler = sampler_pair.second; + double qps = stats.query_counts_.at(type) / elapsed_time; + double rx_mbps = stats.rx_bytes_.at(type) / elapsed_time / 1024 / 1024; + double tx_mbps = stats.tx_bytes_.at(type) / elapsed_time / 1024 / 1024; + double latency_mean = + stats.processing_time_samplers_.at(type).average() / 1000000; + double latency_50p = + stats.processing_time_samplers_.at(type).get_nth(50) / 1000000; + double latency_90p = + stats.processing_time_samplers_.at(type).get_nth(90) / 1000000; + double latency_95p = + stats.processing_time_samplers_.at(type).get_nth(95) / 1000000; + double latency_99p = + stats.processing_time_samplers_.at(type).get_nth(99) / 1000000; + results.insert(std::make_pair( + type, std::map({{"qps", qps}, + {"rx_mbps", rx_mbps}, + {"tx_mbps", tx_mbps}, + {"latency_mean", latency_mean}, + {"latency_50p", latency_50p}, + {"latency_90p", latency_90p}, + {"latency_95p", latency_95p}, + {"latency_99p", latency_99p}}))); + } + + return results; +} +} // namespace oldisim + diff --git a/oldisim/src/ConnectionUtil.h b/oldisim/src/ConnectionUtil.h new file mode 100644 index 0000000..1b3f1de --- /dev/null +++ b/oldisim/src/ConnectionUtil.h @@ -0,0 +1,77 @@ +// Copyright 2015 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef CONNECTION_UTIL_H +#define CONNECTION_UTIL_H + +#include +#include +#include + +#include +#include +#include +#include +#include + +#include "oldisim/Callbacks.h" +#include "oldisim/ChildConnection.h" +#include "ChildConnectionImpl.h" +#include "oldisim/ParentConnection.h" +#include "ParentConnectionImpl.h" + +namespace oldisim { + +class ChildConnectionStats; +class LeafNodeStats; + +class ConnectionUtil { + public: + template