diff --git a/googleurl b/googleurl deleted file mode 160000 index bd7a971c..00000000 --- a/googleurl +++ /dev/null @@ -1 +0,0 @@ -Subproject commit bd7a971c6179fca9b0705db573346f4b58be58dd diff --git a/jml b/jml deleted file mode 160000 index 8b684799..00000000 --- a/jml +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 8b684799236668951b0286c0ad5e1550395309b9 diff --git a/jml-build b/jml-build deleted file mode 160000 index b60c8629..00000000 --- a/jml-build +++ /dev/null @@ -1 +0,0 @@ -Subproject commit b60c8629fd38b3d622c2c09b16d6769b42872006 diff --git a/jsoncpp/json_reader.cpp b/jsoncpp/json_reader.cpp index c19b0d50..524d2b7c 100644 --- a/jsoncpp/json_reader.cpp +++ b/jsoncpp/json_reader.cpp @@ -156,10 +156,6 @@ Json::Value parse(std::istream & ifs) Json::Value parseFromFile(const std::string& filename) { std::ifstream ifs(filename); - if (!ifs.is_open()) { - std::string msg = "unable to open JSON file: " + filename; - throw Exception(msg); - } return parse(ifs); } diff --git a/jsoncpp/json_value.cpp b/jsoncpp/json_value.cpp index 67f4a636..d381a9cd 100644 --- a/jsoncpp/json_value.cpp +++ b/jsoncpp/json_value.cpp @@ -563,8 +563,7 @@ Value::operator=( const Value &other ) Value & Value::operator=( Value &&other ) { - Value temp( other ); - swap( temp ); + swap( other ); return *this; } @@ -1179,6 +1178,13 @@ Value::append( const Value &value ) } +Value & +Value::append( Value &&value ) +{ + return (*this)[size()] = std::move(value); +} + + Value Value::get( const char *key, const Value &defaultValue ) const diff --git a/jsoncpp/value.h b/jsoncpp/value.h index 5c412b4a..5a4acd9c 100644 --- a/jsoncpp/value.h +++ b/jsoncpp/value.h @@ -402,6 +402,7 @@ namespace Json { /// /// Equivalent to jsonvalue[jsonvalue.size()] = value; Value &append( const Value &value ); + Value &append( Value &&value ); /// Access an object value by name, create a null member if it does not exist. Value &atStr( const char *key ); diff --git a/launcher/launcher.h b/launcher/launcher.h index 5b2fdca7..5959bbbd 100644 --- a/launcher/launcher.h +++ b/launcher/launcher.h @@ -37,7 +37,7 @@ struct Launcher { struct Task { - Task() : pid(-1), log(false), delay(45.0) { + Task() : pid(-1), log(false), delay(45.0), once(false) { } std::string const & getName() const { @@ -52,8 +52,10 @@ struct Launcher } void restart(std::string const & node) { - stop(); - start(node); + if (!once) { + stop(); + start(node); + } } void start(std::string const & node) { @@ -150,6 +152,9 @@ struct Launcher else if(i.memberName() == "delay") { result.delay = i->asDouble(); } + else if(i.memberName() == "once") { + result.once = i->asBool(); + } else if(i.memberName() == "arg") { auto & json = *i; if(!json.empty() && !json.isArray()) { @@ -275,6 +280,7 @@ struct Launcher std::vector arg; bool log; double delay; + bool once; }; struct Node diff --git a/logger/kvp_logger_interface.cc b/logger/kvp_logger_interface.cc index b44eeed6..51a71b02 100644 --- a/logger/kvp_logger_interface.cc +++ b/logger/kvp_logger_interface.cc @@ -1,5 +1,4 @@ #include "kvp_logger_interface.h" -#include "kvp_logger_mongodb.h" #include "kvp_logger_void.h" #include @@ -8,9 +7,7 @@ namespace Datacratic{ std::shared_ptr IKvpLogger ::kvpLoggerFactory(const std::string& type, const KvpLoggerParams& params){ - if(type == "mongodb"){ - return std::shared_ptr(new KvpLoggerMongoDb(params)); - }else if(type == "void"){ + if (type == "void"){ return std::shared_ptr(new KvpLoggerVoid()); }else if(type == "metricsLogger"){ @@ -27,11 +24,7 @@ ::kvpLoggerFactory(const std::string& configKey){ json_parser::read_json(getenv("CONFIG"), pt); pt = pt.get_child(configKey); string type = pt.get("type"); - if(type == "mongodb"){ - - }else if(type == "void"){ - - }else if(type == "mongodbMetrics"){ + if(type == "void"){ } throw ML::Exception("Unknown KvpLogger [" + type + "]"); diff --git a/logger/kvp_logger_mongodb.cc b/logger/kvp_logger_mongodb.cc index 0afa1b9b..ebfd2281 100644 --- a/logger/kvp_logger_mongodb.cc +++ b/logger/kvp_logger_mongodb.cc @@ -12,8 +12,7 @@ ::makeInitFct(const string& hostAndPort, const string& db, { function init = [&] (){ cerr << hostAndPort << endl; - mongo::HostAndPort mongoHostAndPort(hostAndPort); - conn.connect(mongoHostAndPort); + conn.connect(hostAndPort); string err; if(!conn.auth(db, user, pwd, err)){ throw ML::Exception("MongoDB connection failed with msg [" diff --git a/logger/logger.mk b/logger/logger.mk index f29504e4..b8dbfdd3 100644 --- a/logger/logger.mk +++ b/logger/logger.mk @@ -17,8 +17,8 @@ $(eval $(call library,logger,$(LIBLOGGER_SOURCES),$(LIBLOGGER_LINK))) $(eval $(call nodejs_addon,logger,logger_js.cc filter_js.cc,logger js sigslot)) LIBLOG_METRICS_SOURCES := \ - kvp_logger_interface.cc kvp_logger_mongodb.cc easy_kvp_logger.cc logger_metrics_interface.cc \ - logger_metrics_mongo.cc logger_metrics_term.cc + kvp_logger_interface.cc easy_kvp_logger.cc logger_metrics_interface.cc \ + logger_metrics_term.cc LIBLOG_METRICS_LINK := \ mongoclient boost_filesystem boost_program_options types diff --git a/logger/logger_metrics_interface.cc b/logger/logger_metrics_interface.cc index ed035aba..d48ccdda 100644 --- a/logger/logger_metrics_interface.cc +++ b/logger/logger_metrics_interface.cc @@ -1,146 +1,92 @@ -/* logger_metrics_interface.cc - François-Michel L'Heureux, 21 May 2013 - Copyright (c) 2013 Datacratic. All rights reserved. -*/ - -#include "soa/jsoncpp/reader.h" -#include "jml/utils/exc_assert.h" #include "soa/logger/logger_metrics_interface.h" -#include "soa/logger/logger_metrics_mongo.h" #include "soa/logger/logger_metrics_void.h" #include "soa/logger/logger_metrics_term.h" +#include "soa/jsoncpp/reader.h" +namespace Datacratic{ using namespace std; -using namespace Datacratic; - -namespace { std::mutex m; -std::shared_ptr logger; - -// getenv that sanely deals with empty strings -string getEnv(const char * variable) -{ - const char * c = getenv(variable); - return c ? c : ""; -} - -} // file scope - - -/****************************************************************************/ -/* LOGGER METRICS */ -/****************************************************************************/ - +std::mutex m2; +string ILoggerMetrics::parentObjectId = ""; +bool mustSetup = true; +shared_ptr logger; const string ILoggerMetrics::METRICS = "metrics"; const string ILoggerMetrics::PROCESS = "process"; const string ILoggerMetrics::META = "meta"; +bool ILoggerMetrics::failSafe; -bool ILoggerMetrics::failSafe(true); -string ILoggerMetrics::parentObjectId; - -shared_ptr -ILoggerMetrics:: -setup(const string & configKey, const string & coll, const string & appName) +// getenv that sanely deals with empty strings +static std::string getEnv(const char * variable) { - std::lock_guard lock(m); - - if (logger) { - throw ML::Exception("Cannot setup more than once"); - } - - string configFile(getEnv("CONFIG")); - - if (configFile.empty()) { - cerr << ("Logger Metrics setup: CONFIG is not defined," - " logging disabled\n"); - logger.reset(new LoggerMetricsVoid(coll)); - } - else if (configKey.empty()) { - cerr << ("Logger Metrics setup: configKey is empty," - " logging disabled\n"); - logger.reset(new LoggerMetricsVoid(coll)); - } - else { - Json::Value config = Json::parseFromFile(getEnv("CONFIG")); - config = config[configKey]; - - setupLogger(config, coll, appName); - } - - return logger; + const char * c = getenv(variable); + return c ? c : ""; } -shared_ptr -ILoggerMetrics:: -setupFromJson(const Json::Value & config, - const string & coll, const string & appName) +shared_ptr ILoggerMetrics +::setup(const string& configKey, const string& coll, + const string& appName) { std::lock_guard lock(m); - - if (logger) { - throw ML::Exception("Cannot setup more than once"); - } - - setupLogger(config, coll, appName); - - return logger; -} - -void -ILoggerMetrics:: -setupLogger(const Json::Value & config, - const string & coll, const string & appName) -{ - ExcAssert(!config.isNull()); - - ILoggerMetrics::parentObjectId = getEnv("METRICS_PARENT_ID"); - - if (config["type"].isNull()) { - throw ML::Exception("Your LoggerMetrics config needs to " - "specify a [type] key."); - } - - string loggerType = config["type"].asString(); - failSafe = config["failSafe"].asBool(); - auto fct = [&] { - if (loggerType == "mongo") { - logger.reset(new LoggerMetricsMongo(config, coll, appName)); - } - else if (loggerType == "term" || loggerType == "terminal") { - logger.reset(new LoggerMetricsTerm(config, coll, appName)); - } - else if (loggerType == "void") { - logger.reset(new LoggerMetricsVoid(config, coll, appName)); - } - else { - throw ML::Exception("Unknown logger type [%s]", loggerType.c_str()); - } - }; - - if (failSafe) { - try { - fct(); - } - catch (const exception & exc) { - cerr << "Logger fail safe caught: " << exc.what() << endl; + if(mustSetup){ + mustSetup = false; + parentObjectId = getEnv("METRICS_PARENT_ID"); + if(!getenv("CONFIG") || configKey == ""){ + cerr << "Logger Metrics Setup: either CONFIG is not defined " + "or configKey empty. " + "Will not log." << endl; + Json::Value fooConfig; logger = shared_ptr( - new LoggerMetricsTerm(config, coll, appName)); + new LoggerMetricsVoid(fooConfig, coll, appName)); + }else{ + Json::Value config = Json::parseFromFile(getEnv("CONFIG")); + config = config[configKey]; + if(config.isNull()){ + throw ML::Exception("Your configKey [" + configKey + "] is invalid or your " + "config file is empty"); + } + if(config["type"].isNull()){ + throw ML::Exception("Your LoggerMetrics config needs to " + "specify a [type] key."); + } + string loggerType = config["type"].asString(); + failSafe = config["failSafe"].asBool(); + function fct = [&]{ + if(loggerType == "term" || loggerType == "terminal"){ + logger = shared_ptr( + new LoggerMetricsTerm(config, coll, appName)); + }else if(loggerType == "void"){ + logger = shared_ptr( + new LoggerMetricsVoid(config, coll, appName)); + }else{ + throw ML::Exception("Unknown logger type [%s]", loggerType.c_str()); + } + }; + if(failSafe){ + try{ + fct(); + }catch(const exception& exc){ + cerr << "Logger fail safe caught: " << exc.what() << endl; + logger = shared_ptr( + new LoggerMetricsTerm(config, coll, appName)); + } + }else{ + fct(); + } } - } - else { - fct(); + }else{ + throw ML::Exception("Cannot setup more than once"); } - auto getCmdResult = [&] (const char* cmd) { + function getCmdResult = [](const char* cmd) -> string{ FILE* pipe = popen(cmd, "r"); - if (!pipe) { - return string("ERROR"); + if(!pipe){ + return "ERROR"; } char buffer[128]; stringstream result; - while (!feof(pipe)) { - if (fgets(buffer, 128, pipe) != NULL) { + while(!feof(pipe)){ + if(fgets(buffer, 128, pipe) != NULL){ result << buffer; } } @@ -164,87 +110,74 @@ setupLogger(const Json::Value & config, // Log environment variable RUNID. Useful to give a name to an // experiment. v["runid"] = getEnv("RUNID"); - logger->logProcess(v); setenv("METRICS_PARENT_ID", logger->getProcessId().c_str(), 1); -} -shared_ptr -ILoggerMetrics:: -getSingleton() -{ - std::lock_guard lock(m); + return logger; +} - if (!logger) { - cerr << ("Calling getSingleton without calling setup first," - " logging implicitly disabled.\n"); - logger.reset(new LoggerMetricsVoid("")); +shared_ptr ILoggerMetrics +::getSingleton(){ + if(mustSetup){ + std::lock_guard lock(m2); + if(mustSetup){ + cerr << "Calling getSingleton without calling setup first." + << "Will return a logger metrics terminal." << endl; + return setup("", "", ""); + } } - return logger; } -void -ILoggerMetrics:: -logMetrics(const Json::Value & json) -{ +void ILoggerMetrics::logMetrics(const Json::Value& json){ vector stack; - std::function doit; - doit = [&] (const Json::Value & v) { - for (auto it = v.begin(); it != v.end(); ++it) { - string memberName = it.memberName(); - if (v[memberName].isObject()) { - stack.push_back(memberName); - doit(v[memberName]); + function doit; + doit = [&](const Json::Value& v){ + for(auto it = v.begin(); it != v.end(); ++it){ + if(v[it.memberName()].isObject()){ + stack.push_back(it.memberName()); + doit(v[it.memberName()]); stack.pop_back(); - } - else { - const Json::Value & current = v[memberName]; - if (!(current.isInt() || current.isUInt() || current.isDouble() - || current.isNumeric())) { + }else{ + Json::Value current = v[it.memberName()]; + if(!(current.isInt() || current.isUInt() || current.isDouble() + || current.isNumeric())) + { stringstream key; - for (const string & s: stack) { + for(string s: stack){ key << s << "."; } - key << memberName; + key << it.memberName(); string value = current.toString(); cerr << value << endl; value = value.substr(1, value.length() - 3); - throw ML::Exception("logMetrics only accepts numerical" - " values. Key [%s] has value [%s].", - key.str().c_str(), value.c_str()); + throw new ML::Exception("logMetrics only accepts numerical " + "values. Key [%s] has value [%s].", + key.str().c_str(), value.c_str()); } } } }; - auto fct = [&] () { + function fct = [&]{ doit(json); logInCategory(METRICS, json); }; failSafeHelper(fct); } -void -ILoggerMetrics:: -failSafeHelper(const std::function & fct) -{ - if (failSafe) { - try { +void ILoggerMetrics::failSafeHelper(std::function fct){ + if(failSafe){ + try{ fct(); - } - catch (const exception & exc) { + }catch(const exception& exc){ cerr << "Logger fail safe caught: " << exc.what() << endl; } - } - else { + }else{ fct(); } } -void -ILoggerMetrics:: -close() -{ +void ILoggerMetrics::close(){ Json::Value v; Date endDate = Date::now(); v["endDate"] = endDate.printClassic(); @@ -252,3 +185,5 @@ close() logInCategory(PROCESS, v); logProcess(v); } + +} diff --git a/logger/logger_metrics_interface.h b/logger/logger_metrics_interface.h index 016a4178..493e90f0 100644 --- a/logger/logger_metrics_interface.h +++ b/logger/logger_metrics_interface.h @@ -1,26 +1,18 @@ -/* logger_metrics_interface.h -*- C++ -*- - François-Michel L'Heureux, 21 May 2013 - Copyright (c) 2013 Datacratic. All rights reserved. -*/ - #pragma once #include #include #include -#include -#include -#include "boost/variant.hpp" +#include #include "jml/arch/exception.h" #include "soa/jsoncpp/json.h" +#include +#include "boost/variant.hpp" +#include #include "soa/types/date.h" -namespace Datacratic { - -/****************************************************************************/ -/* LOGGER METRICS */ -/****************************************************************************/ +namespace Datacratic{ /** * KvpLogger are key-value-pair loggers @@ -30,122 +22,107 @@ namespace Datacratic { * - Provides adaptor functions to avoid defining redundant functions in * implementations */ -struct ILoggerMetrics { - // ORDER OF VARIANT IMPORTANT! - typedef boost::variant Numeric; - typedef boost::variant NumOrStr; - - ILoggerMetrics() = delete; - - static std::shared_ptr setup(const std::string & configKey, - const std::string & coll, - const std::string & appName); - static std::shared_ptr - setupFromJson(const Json::Value & config, - const std::string & coll, const std::string & appName); - - /** - * Factory like getter for kvp - */ - static std::shared_ptr getSingleton(); - - void logMetrics(const Json::Value &); - template - void logMetrics(const jsonifiable & j) - { - auto fct = [&] () { - Json::Value root = j.toJson(); - logMetrics(root); +class ILoggerMetrics{ + + private: + static bool failSafe; + const Date startDate; + ILoggerMetrics(){}; + + protected: + // ORDER OF VARIANT IMPORTANT! + typedef boost::variant Numeric; + typedef boost::variant NumOrStr; + + const static std::string METRICS; + const static std::string PROCESS; + const static std::string META; + + const std::string coll; + static std::string parentObjectId; + + ILoggerMetrics(const std::string& coll) : + startDate(Date::now()), coll(coll){}; + virtual void logInCategory(const std::string& category, + const std::vector& path, + const NumOrStr& val) = 0; + virtual void logInCategory(const std::string& category, + const Json::Value& j) = 0; + + void failSafeHelper(std::function); + virtual const std::string getProcessId() const = 0; + + public: + static std::shared_ptr setup( + const std::string& configKey, + const std::string& coll, + const std::string& appName); + /** + * Factory like getter for kvp + */ + static std::shared_ptr getSingleton(); + + void logMetrics(const Json::Value&); + void logProcess(const Json::Value& j){ + std::function fct = [&](){ + logInCategory(PROCESS, j); + }; + failSafeHelper(fct); + } + void logMeta(const Json::Value& j){ + std::function fct = [&](){ + logInCategory(META, j); + }; + failSafeHelper(fct); + } + + template + void logMetrics(const jsonifiable& j){ + std::function fct = [&](){ + Json::Value root = j.toJson(); + logMetrics(root); + }; + failSafeHelper(fct); }; - failSafeHelper(fct); - }; - void logMetrics(const std::vector & path, const Numeric & val) - { - auto fct = [&] () { - logInCategory(METRICS, path, val); + template + void logProcess(const jsonifiable& j){ + std::function fct = [&](){ + Json::Value root = j.toJson(); + logProcess(root); + }; + failSafeHelper(fct); }; - failSafeHelper(fct); - } - - void logProcess(const Json::Value & j) - { - auto fct = [&]() { - logInCategory(PROCESS, j); - }; - failSafeHelper(fct); - } - template - void logProcess(const jsonifiable & j) - { - auto fct = [&] () { - Json::Value root = j.toJson(); - logProcess(root); + template + void logMeta(const jsonifiable& j){ + std::function fct = [&](){ + Json::Value root = j.toJson(); + logMeta(root); + }; + failSafeHelper(fct); }; - failSafeHelper(fct); - }; - void logProcess(const std::vector & path, const NumOrStr & val) - { - auto fct = [&] () { - logInCategory(PROCESS, path, val); - }; - failSafeHelper(fct); - } - void logMeta(const Json::Value & j) - { - auto fct = [&] () { - logInCategory(META, j); - }; - failSafeHelper(fct); - } - template - void logMeta(const jsonifiable & j) - { - auto fct = [&] () { - Json::Value root = j.toJson(); - logMeta(root); - }; - failSafeHelper(fct); - }; - void logMeta(const std::vector & path, const NumOrStr & val) - { - auto fct = [&] () { - logInCategory(META, path, val); - }; - failSafeHelper(fct); - } - - void close(); - virtual ~ILoggerMetrics(){}; - -protected: - const static std::string METRICS; - const static std::string PROCESS; - const static std::string META; - - ILoggerMetrics(const std::string & coll) - : coll(coll), startDate(Date::now()) - { - } - virtual void logInCategory(const std::string & category, - const std::vector & path, - const NumOrStr & val) = 0; - virtual void logInCategory(const std::string & category, - const Json::Value & j) = 0; - virtual std::string getProcessId() const = 0; - - void failSafeHelper(const std::function & fct); - - std::string coll; - -private: - static void setupLogger(const Json::Value & config, - const std::string & coll, - const std::string & appName); - static bool failSafe; - static std::string parentObjectId; - - const Date startDate; + void logMetrics(const std::vector& path, const Numeric& val){ + std::function fct = [&](){ + logInCategory(METRICS, path, val); + }; + failSafeHelper(fct); + } + void logProcess(const std::vector& path, const NumOrStr& val){ + std::function fct = [&](){ + logInCategory(PROCESS, path, val); + }; + failSafeHelper(fct); + } + void logMeta(const std::vector& path, const NumOrStr& val){ + std::function fct = [&](){ + logInCategory(META, path, val); + }; + failSafeHelper(fct); + } + + void close(); + virtual ~ILoggerMetrics(){}; + }; +}//namespace Datacratic -} // namespace Datacratic diff --git a/logger/logger_metrics_mongo.cc b/logger/logger_metrics_mongo.cc index 14ae4b05..adcd368c 100644 --- a/logger/logger_metrics_mongo.cc +++ b/logger/logger_metrics_mongo.cc @@ -1,13 +1,10 @@ -/* logger_metrics_interface.cc - François-Michel L'Heureux, 21 May 2013 - Copyright (c) 2013 Datacratic. All rights reserved. -*/ - +#include "logger_metrics_mongo.h" #include "mongo/bson/bson.h" #include "mongo/util/net/hostandport.h" #include "jml/utils/string_functions.h" -#include "logger_metrics_mongo.h" +#include "soa/utils/mongo_init.h" +namespace Datacratic{ using namespace std; using namespace mongo; @@ -23,8 +20,8 @@ LoggerMetricsMongo(Json::Value config, const string & coll, const string & appName) : ILoggerMetrics(coll) { - for (const string & s: {"hostAndPort", "database", "user", "pwd"}) { - if (config[s].isNull()) { + for(string s: {"hostAndPort", "database", "user", "pwd"}){ + if(config[s].isNull()){ throw ML::Exception("Missing LoggerMetricsMongo parameter [%s]", s.c_str()); } @@ -34,59 +31,72 @@ LoggerMetricsMongo(Json::Value config, const string & coll, if (hapStrs.size() > 1) { vector haps; for (const string & hapStr: hapStrs) { - haps.emplace_back(hapStr); + haps.emplace_back(hapStr); } conn.reset(new mongo::DBClientReplicaSet(hapStrs[0], haps, 100)); } else { - auto tmpConn = make_shared(); + std::shared_ptr tmpConn = + make_shared(); tmpConn->connect(hapStrs[0]); conn = tmpConn; } db = config["database"].asString(); - string err; - if (!conn->auth(db, config["user"].asString(), config["pwd"].asString(), - err)) { - throw ML::Exception("MongoDB connection failed with msg [%s]", - err.c_str()); + + auto impl = [&] (string mechanism) { + BSONObj b = BSON("user" << config["user"].asString() + << "pwd" << config["pwd"].asString() + << "mechanism" << mechanism + << "db" << db); + try { + conn->auth(b); + } + catch (const UserException & _) { + return false; + } + return true; + }; + + if (!impl("SCRAM-SHA-1")) { + cerr << "Failed to authenticate with SCRAM-SHA-1, " + "trying with MONGODB-CR" << endl; + if (!impl("MONGODB-CR")) { + cerr << "Failed with MONGODB-CR as well" << endl; + throw ("Failed to auth"); + } } + BSONObj obj = BSON(GENOID); conn->insert(db + "." + coll, obj); objectId = obj["_id"].OID(); logToTerm = config["logToTerm"].asBool(); } -void -LoggerMetricsMongo:: -logInCategory(const string & category, const Json::Value & json) +void LoggerMetricsMongo::logInCategory(const string& category, + const Json::Value& json) { + BSONObjBuilder bson; vector stack; - function doit; - - doit = [&] (const Json::Value & v) { - for (auto it = v.begin(); it != v.end(); ++it) { - string memberName = it.memberName(); - if (v[memberName].isObject()) { - if (memberName.find(".") != std::string::npos) { - throw ML::Exception("mongo does not support dotted keys," - " hence \"%s\" is invalid", memberName.c_str()); - } - stack.push_back(memberName); - doit(v[memberName]); + function doit; + + doit = [&](const Json::Value& v){ + for(auto it = v.begin(); it != v.end(); ++it){ + if(v[it.memberName()].isObject()){ + stack.push_back(it.memberName()); + doit(v[it.memberName()]); stack.pop_back(); - } - else { - Json::Value current = v[memberName]; + }else{ + Json::Value current = v[it.memberName()]; stringstream key; key << category; - for (const string & s: stack) { + for(string s: stack){ key << "." << s; } - key << "." << memberName; - if (current.isArray()) { + key << "." << it.memberName(); + if(current.isArray()){ BSONArrayBuilder arr; - for (const Json::Value el: current) { + for(const Json::Value el: current){ if (el.isInt()) { arr.append(el.asInt()); } @@ -121,32 +131,29 @@ logInCategory(const string & category, const Json::Value & json) }; doit(json); - if (logToTerm) { + if(logToTerm){ cout << objectId << "." << coll << "." << category << ": " << json.toStyledString() << endl; } - conn->update(db + "." + coll, BSON("_id" << objectId), - BSON("$set" << bson.obj()), true); + conn->update(db + "." + coll, + BSON("_id" << objectId), + BSON("$set" << bson.obj()), + true); } -void -LoggerMetricsMongo:: -logInCategory(const std::string & category, - const std::vector & path, - const NumOrStr & val) +void LoggerMetricsMongo +::logInCategory(const std::string& category, + const std::vector& path, + const NumOrStr& val) { - if (path.empty()) { - throw ML::Exception("You need to specify a path where to log" - " the value"); + if(path.size() == 0){ + throw new ML::Exception( + "You need to specify a path where to log the value"); } stringstream newCat; newCat << category; - for (const string & part: path) { - if (part.find(".") != std::string::npos) { - throw ML::Exception("mongo does not support dotted keys," - " hence \"%s\" is invalid", part.c_str()); - } + for(string part: path){ newCat << "." << part; } string newCatStr = newCat.str(); @@ -179,11 +186,13 @@ logInCategory(const std::string & category, } bsonObj = BSON(newCatStr << str); } - if (logToTerm) { + if(logToTerm){ cerr << bsonObj.toString() << endl; } - conn->update(db + "." + coll, BSON("_id" << objectId), - BSON("$set" << bsonObj), true); + conn->update(db + "." + coll, + BSON("_id" << objectId), + BSON("$set" << bsonObj), + true); } std::string @@ -191,5 +200,8 @@ LoggerMetricsMongo:: getProcessId() const { - return objectId.toString(); + return objectId.toString(); } + + +}//namespace Datacratic diff --git a/logger/logger_metrics_mongo.h b/logger/logger_metrics_mongo.h index 77abea35..9718d210 100644 --- a/logger/logger_metrics_mongo.h +++ b/logger/logger_metrics_mongo.h @@ -1,38 +1,28 @@ -/* logger_metrics_mongo.h -*- C++ -*- - François-Michel L'Heureux, 21 May 2013 - Copyright (c) 2013 Datacratic. All rights reserved. -*/ - #pragma once -#include "mongo/client/dbclient.h" #include "logger_metrics_interface.h" +#include "mongo/client/dbclient.h" - -namespace Datacratic { - -/****************************************************************************/ -/* LOGGER METRICS MONGO */ -/****************************************************************************/ - -struct LoggerMetricsMongo : public ILoggerMetrics { +namespace Datacratic{ +class LoggerMetricsMongo : public ILoggerMetrics{ friend class ILoggerMetrics; -protected: - mongo::OID objectId; - std::string db; - std::shared_ptr conn; - - LoggerMetricsMongo(Json::Value config, const std::string & coll, - const std::string & appName); - void logInCategory(const std::string &, const Json::Value &); - void logInCategory(const std::string & category, - const std::vector & path, - const NumOrStr & val); - std::string getProcessId() const; - -private: - bool logToTerm; + protected: + mongo::OID objectId; + std::string db; + std::shared_ptr conn; + + LoggerMetricsMongo(Json::Value config, + const std::string& coll, + const std::string& appName); + void logInCategory(const std::string&, + const Json::Value&); + void logInCategory(const std::string& category, + const std::vector& path, + const NumOrStr& val); + const std::string getProcessId() const; + + private: + bool logToTerm; }; - }//namespace Datacratic diff --git a/logger/logger_metrics_term.cc b/logger/logger_metrics_term.cc index 80a365ea..0dd2ef3a 100644 --- a/logger/logger_metrics_term.cc +++ b/logger/logger_metrics_term.cc @@ -1,25 +1,13 @@ -/* logger_metrics_term.cc - François-Michel L'Heureux, 3 June 2013 - Copyright (c) 2013 Datacratic. All rights reserved. -*/ - +#include "logger_metrics_term.h" #include #include -#include "logger_metrics_term.h" +namespace Datacratic{ using namespace std; -using namespace Datacratic; - -/****************************************************************************/ -/* LOGGER METRICS TERM */ -/****************************************************************************/ - -LoggerMetricsTerm:: -LoggerMetricsTerm(Json::Value config, - const string & coll, const string & appName) - : ILoggerMetrics(coll) +LoggerMetricsTerm::LoggerMetricsTerm(Json::Value config, + const string& coll, const string& appName) : ILoggerMetrics(coll) { stringstream ss; ss << getpid(); @@ -27,39 +15,35 @@ LoggerMetricsTerm(Json::Value config, cout << "Logger Metrics terminal: app " << appName << " under pid " << pid << endl; } -void -LoggerMetricsTerm:: -logInCategory(const string & category, - const Json::Value & json) +void LoggerMetricsTerm::logInCategory(const string& category, + const Json::Value& json) { cout << pid << "." << coll << "." << category << ": " << json.toStyledString() << endl; } -void -LoggerMetricsTerm:: -logInCategory(const std::string & category, - const std::vector & path, - const NumOrStr & val) +void LoggerMetricsTerm +::logInCategory(const std::string& category, + const std::vector& path, + const NumOrStr& val) { - if (path.size() == 0) { - throw ML::Exception("You need to specify a path where to log" - " the value"); + if(path.size() == 0){ + throw new ML::Exception( + "You need to specify a path where to log the value"); } stringstream ss; ss << val; stringstream newCat; newCat << pid << "." << coll << "." << category; - for (const string & part: path) { + for(string part: path){ newCat << "." << part; } cout << newCat.str() << ": " << ss.str() << endl; } -std::string -LoggerMetricsTerm:: -getProcessId() - const -{ +const std::string LoggerMetricsTerm::getProcessId() const{ return pid; } + + +}//namespace Datacratic diff --git a/logger/logger_metrics_term.h b/logger/logger_metrics_term.h index 0cbe7e0e..bd51f75b 100644 --- a/logger/logger_metrics_term.h +++ b/logger/logger_metrics_term.h @@ -1,34 +1,23 @@ -/* logger_metrics_term.h -*- C++ -*- - François-Michel L'Heureux, 3 June 2013 - Copyright (c) 2013 Datacratic. All rights reserved. -*/ - #pragma once #include "logger_metrics_interface.h" - -namespace Datacratic { - -/****************************************************************************/ -/* LOGGER METRICS TERM */ -/****************************************************************************/ - -struct LoggerMetricsTerm : public ILoggerMetrics { +namespace Datacratic{ +class LoggerMetricsTerm : public ILoggerMetrics{ friend class ILoggerMetrics; -protected: - LoggerMetricsTerm(Json::Value config, - const std::string & coll, - const std::string & appName); - void logInCategory(const std::string &, const Json::Value &); - void logInCategory(const std::string & category, - const std::vector & path, - const NumOrStr & val); - std::string getProcessId() const; - -private: - std::string pid; + protected: + LoggerMetricsTerm(Json::Value config, + const std::string& coll, + const std::string& appName); + void logInCategory(const std::string&, + const Json::Value&); + void logInCategory(const std::string& category, + const std::vector& path, + const NumOrStr& val); + const std::string getProcessId() const; + + private: + std::string pid; }; - -} // namespace Datacratic +}//namespace Datacratic diff --git a/logger/logger_metrics_void.h b/logger/logger_metrics_void.h index d015d509..79469b09 100644 --- a/logger/logger_metrics_void.h +++ b/logger/logger_metrics_void.h @@ -1,48 +1,20 @@ -/* logger_metrics_void.h -*- C++ -*- - François-Michel L'Heureux, 21 May 2013 - Copyright (c) 2013 Datacratic. All rights reserved. -*/ - #pragma once #include "logger_metrics_interface.h" - -namespace Datacratic { - -/****************************************************************************/ -/* LOGGER METRICS VOID */ -/****************************************************************************/ - -struct LoggerMetricsVoid : public ILoggerMetrics { +namespace Datacratic{ +class LoggerMetricsVoid : public ILoggerMetrics{ friend class ILoggerMetrics; -protected: - LoggerMetricsVoid(Json::Value config, - const std::string & coll, - const std::string & appName) - : ILoggerMetrics(coll) - { - } - LoggerMetricsVoid(const std::string & coll) - : ILoggerMetrics(coll) - { - } - - void logInCategory(const std::string &, const Json::Value &) - { - } - void logInCategory(const std::string & category, - const std::vector & path, - const NumOrStr & val) - { - } - - std::string getProcessId() - const - { - return ""; - } + protected: + LoggerMetricsVoid(Json::Value config, + const std::string& coll, + const std::string& appName) : ILoggerMetrics(coll){} + void logInCategory(const std::string&, + const Json::Value&){} + void logInCategory(const std::string& category, + const std::vector& path, + const NumOrStr& val){} + const std::string getProcessId() const{ return ""; } }; - -} // namespace Datacratic +}//namespace Datacratic diff --git a/logger/testing/logger_metrics_config.json b/logger/testing/logger_metrics_config.json index 62bf1189..a4993817 100644 --- a/logger/testing/logger_metrics_config.json +++ b/logger/testing/logger_metrics_config.json @@ -1,7 +1,7 @@ { "metricsLogger" : { "type" : "mongo", - "failSafe" : true, + "failSafe" : false, "hostAndPort" : "localhost:28356", "database" : "test", "user" : "testuser", diff --git a/logger/testing/logger_metrics_test.cc b/logger/testing/logger_metrics_test.cc index 8ef2b969..4d0ed359 100644 --- a/logger/testing/logger_metrics_test.cc +++ b/logger/testing/logger_metrics_test.cc @@ -21,41 +21,39 @@ #include "mongo/bson/bsonobj.h" #include "mongo/client/dbclient.h" #include "jml/utils/filter_streams.h" -#include "jml/arch/timers.h" #include "soa/service/testing/mongo_temporary_server.h" #include "soa/logger/logger_metrics_interface.h" -using namespace std; using namespace ML; using namespace Datacratic; using namespace mongo; -using namespace bson; -#if 1 BOOST_AUTO_TEST_CASE( test_logger_metrics ) { Mongo::MongoTemporaryServer mongo; - string filename("soa/logger/testing/logger_metrics_config.json"); - Json::Value config = Json::parseFromFile(filename); - const Json::Value & metricsLogger = config["metricsLogger"]; - auto logger = ILoggerMetrics::setupFromJson(metricsLogger, - "metrics_test", "test_app"); + setenv("CONFIG", "logger/testing/logger_metrics_config.json", 1); + shared_ptr logger = + ILoggerMetrics::setup("metricsLogger", "lalmetrics", "test"); logger->logMeta({"a", "b"}, "taratapom"); - string database = metricsLogger["database"].asString(); + Json::Value config; + filter_istream cfgStream("logger/testing/logger_metrics_config.json"); + cfgStream >> config; + + Json::Value metricsLogger = config["metricsLogger"]; auto conn = std::make_shared(); conn->connect(metricsLogger["hostAndPort"].asString()); string err; - if (!conn->auth(database, + if (!conn->auth(metricsLogger["database"].asString(), metricsLogger["user"].asString(), metricsLogger["pwd"].asString(), err)) { throw ML::Exception("Failed to log to mongo tmp server: %s", err.c_str()); } - BOOST_CHECK_EQUAL(conn->count("test.metrics_test"), 1); - auto cursor = conn->query("test.metrics_test", mongo::BSONObj()); + BOOST_CHECK_EQUAL(conn->count("test.lalmetrics"), 1); + auto cursor = conn->query("test.lalmetrics", mongo::BSONObj()); BOOST_CHECK(cursor->more()); { mongo::BSONObj p = cursor->next(); @@ -63,8 +61,8 @@ BOOST_AUTO_TEST_CASE( test_logger_metrics ) } logger->logMetrics({"fooValue"}, 123); - ML::sleep(2.0); // Leave time for async write - cursor = conn->query("test.metrics_test", mongo::BSONObj()); + ML::sleep(1); // Leave time for async write + cursor = conn->query("test.lalmetrics", mongo::BSONObj()); BOOST_CHECK(cursor->more()); { mongo::BSONObj p = cursor->next(); @@ -77,8 +75,8 @@ BOOST_AUTO_TEST_CASE( test_logger_metrics ) block["coco"] = Json::objectValue; block["coco"]["sanchez"] = 3; logger->logMetrics(block); - ML::sleep(2.0); // Leave time for async write - cursor = conn->query("test.metrics_test", mongo::BSONObj()); + ML::sleep(1); // Leave time for async write + cursor = conn->query("test.lalmetrics", mongo::BSONObj()); BOOST_CHECK(cursor->more()); { mongo::BSONObj p = cursor->next(); @@ -105,28 +103,23 @@ BOOST_AUTO_TEST_CASE( test_logger_metrics ) mongo::OID objectId(objectIdStr); mongo::BSONObj where = BSON("_id" << objectId); cursor = conn->query(database + ".metrics_test", where); + BOOST_CHECK(cursor->more()); { mongo::BSONObj p = cursor->next(); cerr << p.toString() << endl; // conn->remove(database + ".metrics_test", p, 1); - BOOST_CHECK_EQUAL(p["process"]["appName"].toString(), - "appName: \"test_app\""); - BOOST_CHECK_EQUAL(p["metrics"]["coco"].toString(), "coco: 123"); - BOOST_CHECK_EQUAL(p["meta"]["octo"].toString(), "octo: \"sanchez\""); - BOOST_CHECK_EQUAL(p["process"]["expos"]["city"].toString(), - "city: \"baseball\""); - BOOST_CHECK_EQUAL(p["process"]["expos"]["sport"].toString(), - "sport: \"montreal\""); - vector players; - BSONObj bsonPlayers = p.getObjectField("process") - .getObjectField("expos") - .getObjectField("players"); - bsonPlayers.Vals(players); + BOOST_CHECK_EQUAL(p["process"]["appName"].String(), "test_app"); + BOOST_CHECK_EQUAL(p["metrics"]["coco"].Long(), 123); + BOOST_CHECK_EQUAL(p["meta"]["octo"].String(), "sanchez"); + BOOST_CHECK_EQUAL(p["process"]["expos"]["city"].String(), "baseball"); + BOOST_CHECK_EQUAL(p["process"]["expos"]["sport"].String(), "montreal"); + //BSONObj bsonPlayers = BSON("process.expos.players" << players); + auto players = p.getFieldDotted("process.expos.players").Array(); BOOST_CHECK_EQUAL(players.size(), 3); - BOOST_CHECK_EQUAL(players[0], "pedro"); - BOOST_CHECK_EQUAL(players[1], "mario"); - BOOST_CHECK_EQUAL(players[2], "octo"); + BOOST_CHECK_EQUAL(players[0].String(), "pedro"); + BOOST_CHECK_EQUAL(players[1].String(), "mario"); + BOOST_CHECK_EQUAL(players[2].String(), "octo"); BOOST_CHECK(p["process"]["endDate"].toString() != "EOO"); BOOST_CHECK(p["process"]["duration"].toString() != "EOO"); } @@ -143,4 +136,3 @@ BOOST_AUTO_TEST_CASE( test_logger_metrics ) BOOST_CHECK_EQUAL(objectIdStr, result); } -#endif diff --git a/logger/testing/logger_testing.mk b/logger/testing/logger_testing.mk index e6c59c65..b8b372fc 100644 --- a/logger/testing/logger_testing.mk +++ b/logger/testing/logger_testing.mk @@ -12,4 +12,4 @@ $(eval $(call test,rotating_file_logger_test,logger,manual boost)) $(eval $(call vowscoffee_test,logger_metrics_interface_js_test,iloggermetricscpp)) -$(eval $(call test,logger_metrics_test,log_metrics mongo_tmp_server utils,manual boost)) +#$(eval $(call test,logger_metrics_test,log_metrics mongo_tmp_server utils,manual boost)) diff --git a/service/carbon_connector.cc b/service/carbon_connector.cc index 5a426f00..1fb9808b 100644 --- a/service/carbon_connector.cc +++ b/service/carbon_connector.cc @@ -124,7 +124,7 @@ StatAggregator * createNewOutcome(const std::vector& percentiles) void MultiAggregator:: record(const std::string & stat, - EventType type, + StatEventType type, float value, std::initializer_list extra) { diff --git a/service/carbon_connector.h b/service/carbon_connector.h index 47048a62..67887cf1 100644 --- a/service/carbon_connector.h +++ b/service/carbon_connector.h @@ -59,7 +59,7 @@ struct MultiAggregator { /** Record, generic version. */ void record(const std::string & stat, - EventType type = ET_COUNT, + StatEventType type = ET_COUNT, float value = 1.0, std::initializer_list extra = DefaultOutcomePercentiles); diff --git a/service/chunked_http_endpoint.cc b/service/chunked_http_endpoint.cc index 7df06c47..8ca8d281 100644 --- a/service/chunked_http_endpoint.cc +++ b/service/chunked_http_endpoint.cc @@ -92,7 +92,7 @@ status() const void ChunkedHttpHandler:: doEvent(const char * eventName, - EventType type, + StatEventType type, float value, const char * units) { diff --git a/service/chunked_http_endpoint.h b/service/chunked_http_endpoint.h index 0b506532..60bcdb1d 100644 --- a/service/chunked_http_endpoint.h +++ b/service/chunked_http_endpoint.h @@ -44,7 +44,7 @@ struct ChunkedHttpHandler virtual void handleError(const std::string & message); void doEvent(const char * eventName, - EventType type = ET_COUNT, + StatEventType type = ET_COUNT, float value = 1.0, const char * units = ""); @@ -79,14 +79,14 @@ struct ChunkedHttpEndpoint: public HttpEndpoint { 4. The units of the event (currently unused) */ typedef boost::function OnEvent; OnEvent onEvent; protected: void doEvent(const std::string & eventName, - EventType type = ET_COUNT, + StatEventType type = ET_COUNT, float value = 1.0, const char * units = "") { diff --git a/service/endpoint.cc b/service/endpoint.cc index 65e46f38..807d2809 100644 --- a/service/endpoint.cc +++ b/service/endpoint.cc @@ -117,7 +117,7 @@ spinup(int num_threads, bool synchronous) threadsActive_ = 0; - totalSleepTime.resize(num_threads, 1.0); + resourceUsage.resize(num_threads); for (unsigned i = 0; i < num_threads; ++i) { boost::thread * thread @@ -539,6 +539,7 @@ EndpointBase:: doMinCtxSwitchPolling(int threadNum, int numThreads) { bool debug = false; + int epoch = 0; //debug = name() == "Backchannel"; //debug = threadNum == 7; @@ -601,6 +602,23 @@ doMinCtxSwitchPolling(int threadNum, int numThreads) << endl; } + // sync with the loop monitor request + int i = resourceEpoch; + if(i != epoch) { + // query the kernel for performance metrics + rusage now; + getrusage(RUSAGE_THREAD, &now); + + // if we're just started, assume we know nothing and don't update the usage + long s = now.ru_utime.tv_sec+now.ru_stime.tv_sec; + if(s > 1) { + std::lock_guard guard(usageLock); + resourceUsage[threadNum] = now; + } + + epoch = i; + } + // Are we in our timeslice? if (/* forceInSlice || */(fracms >= myStartUs && fracms < myEndUs)) { @@ -609,7 +627,6 @@ doMinCtxSwitchPolling(int threadNum, int numThreads) if (usToWait < 0 || usToWait > timesliceUs) usToWait = timesliceUs; - totalSleepTime[threadNum] += double(usToWait) / 1000000.0; int numHandled = handleEvents(usToWait, 4, handleEvent, beforeSleep, afterSleep); if (debug && false) @@ -640,7 +657,6 @@ doMinCtxSwitchPolling(int threadNum, int numThreads) cerr << "sleeping for " << usToSleep << " micros" << endl; double secToSleep = double(usToSleep) / 1000000.0; - totalSleepTime[threadNum] += secToSleep; ML::sleep(secToSleep); duty.notifyAfterSleep(); @@ -660,27 +676,27 @@ void EndpointBase:: doMinLatencyPolling(int threadNum, int numThreads) { - bool wasBusy = false; - Date sleepStart = Date::now(); + int epoch = 0; while (!shutdown_) { - // Busy loop polling which reduces the latency jitter caused by - // the fancy polling scheme below. Should eventually be replaced - // something a little less CPU intensive. - Date beforePoll = Date::now(); - bool isBusy = handleEvents(0, 4, handleEvent) > 0; - - // This ensures that our load sampling mechanism is still somewhat - // meaningfull even though we never sleep. - if (wasBusy != isBusy) { - - if (wasBusy && !isBusy) sleepStart = beforePoll; - - // We don't want to include the time we spent doing stuff. - else totalSleepTime[threadNum] += beforePoll - sleepStart; + // sync with the loop monitor request + int i = resourceEpoch; + if(i != epoch) { + // query the kernel for performance metrics + rusage now; + getrusage(RUSAGE_THREAD, &now); + + // if we're just started, assume we know nothing and don't update the usage + long s = now.ru_utime.tv_sec+now.ru_stime.tv_sec; + if(s > 1) { + std::lock_guard guard(usageLock); + resourceUsage[threadNum] = now; + } - wasBusy = isBusy; + epoch = i; } + + handleEvents(0, 1, handleEvent); } } diff --git a/service/endpoint.h b/service/endpoint.h index dff0e6ae..ebd8cf96 100644 --- a/service/endpoint.h +++ b/service/endpoint.h @@ -23,6 +23,7 @@ #include "soa/service/epoller.h" #include #include +#include namespace Datacratic { @@ -106,7 +107,13 @@ struct EndpointBase : public Epoller { /** Total number of seconds that this message loop has spent sleeping. Can be polled regularly to determine the duty cycle of the loop. */ - std::vector totalSleepSeconds() const { return totalSleepTime; } + std::vector getResourceUsage() const { + resourceEpoch++; + std::vector result; + std::lock_guard guard(usageLock); + result = resourceUsage; + return std::move(result); + } /** Thing to notify when a connection is closed. Will be called before the normal cleanup. @@ -281,6 +288,9 @@ struct EndpointBase : public Epoller { std::map numTransportsByHost; std::vector totalSleepTime; + std::vector resourceUsage; + mutable std::mutex usageLock; + mutable std::atomic resourceEpoch; /** Run a thread to handle events. */ void runEventThread(int threadNum, int numThreads); diff --git a/service/epoll_loop.cc b/service/epoll_loop.cc index f5380125..fdc0fad1 100644 --- a/service/epoll_loop.cc +++ b/service/epoll_loop.cc @@ -68,18 +68,12 @@ loop(int maxEvents, int timeout) for (int i = 0; i < res; i++) { auto * fn = static_cast(events[i].data.ptr); - ExcAssert(fn != nullptr); (*fn)(events[i]); } - map delayedUnregistrations; - { - std::unique_lock guard(callbackLock_); - delayedUnregistrations = move(delayedUnregistrations_); - delayedUnregistrations_.clear(); - } - for (const auto & unreg: delayedUnregistrations) { - unregisterFdCallback(unreg.first, false, unreg.second); + for (auto & unreg: delayedUnregistrations_) { + auto cb = move(unreg.second); + unregisterFdCallback(unreg.first, false, cb); } } catch (const std::exception & exc) { @@ -166,7 +160,6 @@ void EpollLoop:: registerFdCallback(int fd, const EpollCallback & cb) { - std::unique_lock guard(callbackLock_); if (delayedUnregistrations_.count(fd) == 0) { if (fdCallbacks_.find(fd) != fdCallbacks_.end()) { throw ML::Exception("callback already registered for fd"); @@ -183,7 +176,6 @@ EpollLoop:: unregisterFdCallback(int fd, bool delayed, const OnUnregistered & onUnregistered) { - std::unique_lock guard(callbackLock_); if (fdCallbacks_.find(fd) == fdCallbacks_.end()) { throw ML::Exception("callback not registered for fd"); } diff --git a/service/epoll_loop.h b/service/epoll_loop.h index 6d0183ea..9c8c54a0 100644 --- a/service/epoll_loop.h +++ b/service/epoll_loop.h @@ -15,9 +15,8 @@ #include #include -#include #include -#include +#include #include "soa/service/async_event_source.h" @@ -47,7 +46,7 @@ struct EpollLoop : public AsyncEventSource typedef std::function EpollCallback; EpollLoop(const OnException & onException); - virtual ~EpollLoop(); + ~EpollLoop(); /* AsyncEventSource interface */ virtual int selectFd() const @@ -93,7 +92,7 @@ struct EpollLoop : public AsyncEventSource /* Remove a file descriptor from the internal epoll queue. If * "unregisterCallback" is specified, "unregisterFdCallback" will be - * called on the given fd, in delayed mode. */ + * specified on the given fd, in delayed mode. */ void removeFd(int fd, bool unregisterCallback = false); /* Associate a callback with a file descriptor for future epoll @@ -123,7 +122,6 @@ struct EpollLoop : public AsyncEventSource int epollFd_; size_t numFds_; - std::mutex callbackLock_; std::map fdCallbacks_; std::map delayedUnregistrations_; diff --git a/service/http_monitor.h b/service/http_monitor.h index 013777cd..30593282 100644 --- a/service/http_monitor.h +++ b/service/http_monitor.h @@ -44,10 +44,10 @@ struct HttpMonitor : public HttpEndpoint /** Starts the server on the given port. The given argument will be made available to the handlers whenever a connection is created. */ - void start(int port, Arg a = Arg()) + void start(int port, Arg a = Arg(), int numThreads = 1) { arg = a; - init(port, "0.0.0.0"); + init(port, "0.0.0.0", numThreads); } private: @@ -92,6 +92,13 @@ struct HttpMonitorHandler : virtual void doGet(const std::string& resource) {} + /** Handle a PUT message on the given resource and payload. + + Should call either sendResponse() or sendErrorResponse(). + */ + virtual void + doPut(const std::string& resource, const std::string& payload) {} + /** Handle a POST message on the given resource and payload. Should call either sendResponse() or sendErrorResponse(). @@ -142,6 +149,9 @@ handleHttpPayload(const HttpHeader & header, const std::string & payload) if (header.verb == "GET") doGet(header.resource); + else if (header.verb == "PUT") + doPut(header.resource, payload); + else if (header.verb == "POST") doPost(header.resource, payload); diff --git a/service/http_rest_proxy.cc b/service/http_rest_proxy.cc index 01df300a..b5c8b839 100644 --- a/service/http_rest_proxy.cc +++ b/service/http_rest_proxy.cc @@ -265,8 +265,7 @@ doneConnection(curlpp::Easy * conn) JsonRestProxy:: JsonRestProxy(const string & url) - : HttpRestProxy(url), protocolDate(0), - maxRetries(10), maxBackoffTime(900) + : HttpRestProxy(url), maxRetries(10), maxBackoffTime(900) { if (url.find("https://") == 0) { cerr << "warning: no validation will be performed on the SSL cert.\n"; @@ -291,9 +290,6 @@ performWithBackoff(const string & method, const string & resource, if (authToken.size() > 0) { headers.emplace_back(make_pair("Cookie", "token=\"" + authToken + "\"")); } - if (protocolDate > 0) { - headers.emplace_back(make_pair("X-Protocol-Date", to_string(protocolDate))); - } pid_t tid = gettid(); for (int retries = 0; diff --git a/service/http_rest_proxy.h b/service/http_rest_proxy.h index ae33ac28..f9feaa54 100644 --- a/service/http_rest_proxy.h +++ b/service/http_rest_proxy.h @@ -304,8 +304,6 @@ struct JsonRestProxy : HttpRestProxy { /* authentication token */ std::string authToken; - int protocolDate; - /* number of exponential backoffs, -1 = unlimited */ int maxRetries; diff --git a/service/json_endpoint.cc b/service/json_endpoint.cc index 4f9a1fc0..37a32007 100644 --- a/service/json_endpoint.cc +++ b/service/json_endpoint.cc @@ -35,9 +35,16 @@ handleHttpHeader(const HttpHeader & header) throw Exception("invalid content type '%s' for JSON", header.contentType.c_str()); if (header.verb != "POST") - throw Exception("invalid verb"); + handleUnknownHeader(header); } +void +JsonConnectionHandler:: +handleUnknownHeader(const HttpHeader& header) { + throw Exception("invalid verb"); +} + + void JsonConnectionHandler:: handleHttpPayload(const HttpHeader & header, diff --git a/service/json_endpoint.h b/service/json_endpoint.h index 532a7674..e8f7ba3f 100644 --- a/service/json_endpoint.h +++ b/service/json_endpoint.h @@ -32,6 +32,8 @@ struct JsonConnectionHandler : public HttpConnectionHandler { virtual void handleHttpHeader(const HttpHeader & header); + virtual void handleUnknownHeader(const HttpHeader& header); + virtual void handleHttpPayload(const HttpHeader & header, const std::string & payload); diff --git a/service/loop_monitor.cc b/service/loop_monitor.cc index de8c7938..a74b3a4c 100644 --- a/service/loop_monitor.cc +++ b/service/loop_monitor.cc @@ -49,6 +49,7 @@ doLoops(uint64_t numTimeouts) LoadSample maxLoad; maxLoad.sequence = curLoad.sequence + 1; + for (auto& loop : loops) { double load = loop.second(updatePeriod * numTimeouts); if (load < 0.0 || load > 1.0) { @@ -73,14 +74,36 @@ LoopMonitor:: addMessageLoop(const string& name, const MessageLoop* loop) { // acts as a private member variable for sampleFn. - double lastTimeSlept = 0.0; + rusage lastSample; + auto lastTime = Date(); auto sampleFn = [=] (double elapsedTime) mutable { - double timeSlept = loop->totalSleepSeconds(); - double delta = std::min(timeSlept - lastTimeSlept, 1.0); - lastTimeSlept = timeSlept; + auto sample = loop->getResourceUsage(); + + // get how much time elapsed since last time + auto now = Date::now(); + auto dt = now.secondsSince(lastTime); + + // first time? + if (lastTime == Date()) { + lastSample = sample; + } + + lastTime = now; + auto sec = double(sample.ru_utime.tv_sec - lastSample.ru_utime.tv_sec) + + double(sample.ru_stime.tv_sec - lastSample.ru_stime.tv_sec); + auto usec = double(sample.ru_utime.tv_usec - lastSample.ru_utime.tv_usec) + + double(sample.ru_stime.tv_usec - lastSample.ru_stime.tv_usec); + + auto load = sec + usec * 0.000001; + if (load >= dt) { + load = 1.0; + } else { + load /= dt; + } - return 1.0 - (delta / elapsedTime); + lastSample = sample; + return load; }; addCallback(name, sampleFn); diff --git a/service/message_loop.cc b/service/message_loop.cc index 8856cce4..4b0ec80c 100644 --- a/service/message_loop.cc +++ b/service/message_loop.cc @@ -296,10 +296,20 @@ runWorkerThread() return; // Do any outstanding work now - while (processOne()) + int i = 0; + while (processOne()) { if (shutdown_) return; + if (i >= 50) { + getrusage(RUSAGE_THREAD, &resourceUsage); + i = 0; + } + i++; + } + + getrusage(RUSAGE_THREAD, &resourceUsage); + // At this point, we've done as much work as we can (there is no more // work to do). We will now sleep for the maximum allowable delay // time minus the time we spent working. This allows us to batch up diff --git a/service/message_loop.h b/service/message_loop.h index a16a45ae..f43a882a 100644 --- a/service/message_loop.h +++ b/service/message_loop.h @@ -17,6 +17,7 @@ #include "async_event_source.h" #include "typed_message_channel.h" #include "logs.h" +#include "rusage.h" namespace Datacratic { @@ -136,6 +137,7 @@ struct MessageLoop : public Epoller { Can be polled regularly to determine the duty cycle of the loop. */ double totalSleepSeconds() const { return totalSleepTime_; } + rusage getResourceUsage() const { return resourceUsage; } void debug(bool debugOn); @@ -203,6 +205,7 @@ struct MessageLoop : public Epoller { /** Number of secs that the message loop has spent sleeping. */ double totalSleepTime_; + rusage resourceUsage; /** Number of seconds of latency we're allowed to add in order to reduce the number of context switches. diff --git a/service/redis.cc b/service/redis.cc index c1be3488..cccdda84 100644 --- a/service/redis.cc +++ b/service/redis.cc @@ -14,7 +14,6 @@ #include "jml/arch/atomic_ops.h" #include "jml/arch/backtrace.h" #include "jml/arch/futex.h" -#include "jml/arch/wakeup_fd.h" #include "jml/utils/vector_utils.h" @@ -417,7 +416,7 @@ size_t eventLoopsDestroyed = 0; struct AsyncConnection::EventLoop { - ML::Wakeup_Fd wakeupfd; + int wakeupfd[2]; volatile bool finished; AsyncConnection * connection; std::shared_ptr thread; @@ -425,14 +424,18 @@ struct AsyncConnection::EventLoop { volatile int disconnected; EventLoop(AsyncConnection * connection) - : wakeupfd(O_NONBLOCK) - , finished(false) - , connection(connection) - , disconnected(1) + : finished(false), connection(connection), disconnected(1) { ML::atomic_inc(eventLoopsCreated); - fds[0].fd = wakeupfd.fd(); + int res = pipe2(wakeupfd, O_NONBLOCK); + if (res == -1) + throw ML::Exception(errno, "pipe2"); + + //cerr << "connection on fd " << connection->context_->c.fd << endl; + + + fds[0].fd = wakeupfd[0]; fds[0].events = POLLIN; fds[1].fd = connection->context_->c.fd; fds[1].events = 0; @@ -440,6 +443,13 @@ struct AsyncConnection::EventLoop { registerMe(connection->context_); thread.reset(new boost::thread(boost::bind(&EventLoop::run, this))); + +#if 0 + char buf[1]; + res = read(wakeupfd[0], buf, 1); + if (res == -1) + throw ML::Exception(errno, "read"); +#endif } ~EventLoop() @@ -457,11 +467,16 @@ struct AsyncConnection::EventLoop { wakeup(); thread->join(); thread.reset(); + ::close(wakeupfd[0]); + ::close(wakeupfd[1]); } void wakeup() { - wakeupfd.signal(); + int res = write(wakeupfd[1], "x", 1); + if (res == -1) + throw ML::Exception("error waking up fd %d: %s", wakeupfd[1], + strerror(errno)); } void registerMe(redisAsyncContext * context) @@ -516,18 +531,12 @@ struct AsyncConnection::EventLoop { if (fds[0].revents & POLLIN) { //cerr << "got wakeup" << endl; - wakeupfd.read(); + char buf[128]; + int res = read(fds[0].fd, buf, 128); + if (res == -1) + throw ML::Exception(errno, "read from wakeup pipe"); //cerr << "woken up with " << res << " messages" << endl; } - if ((fds[1].revents & POLLHUP) - || (fds[1].revents & POLLERR)) { - /* For now, we simply disconnect when we receive a POLLHUP, but eventually - * we should try to reconnect or at least notify the user through a callback - */ - onDisconnect(REDIS_ERR_IO); - break; - } - if ((fds[1].revents & POLLOUT) && (fds[1].events & POLLOUT)) { //cerr << "got write on " << fds[1].fd << endl; diff --git a/service/runner.cc b/service/runner.cc index 1eb79df0..5c6547df 100644 --- a/service/runner.cc +++ b/service/runner.cc @@ -7,10 +7,13 @@ */ #include +#include #include #include #include #include +#include +#include #include #include #include @@ -32,8 +35,6 @@ #include "soa/types/basic_value_descriptions.h" -#include - using namespace std; using namespace Datacratic; @@ -42,7 +43,7 @@ timevalDescription:: timevalDescription() { addField("tv_sec", &timeval::tv_sec, "seconds"); - addField("tv_usec", &timeval::tv_usec, "micro seconds", (long)0); + addField("tv_usec", &timeval::tv_usec, "micro seconds"); } rusageDescription:: @@ -50,27 +51,30 @@ rusageDescription() { addField("utime", &rusage::ru_utime, "user CPU time used"); addField("stime", &rusage::ru_stime, "system CPU time used"); - addField("maxrss", &rusage::ru_maxrss, "maximum resident set size", (long)0); - addField("ixrss", &rusage::ru_ixrss, "integral shared memory size", (long)0); - addField("idrss", &rusage::ru_idrss, "integral unshared data size", (long)0); - addField("isrss", &rusage::ru_isrss, "integral unshared stack size", (long)0); - addField("minflt", &rusage::ru_minflt, "page reclaims (soft page faults)", (long)0); - addField("majflt", &rusage::ru_majflt, "page faults (hard page faults)", (long)0); - addField("nswap", &rusage::ru_nswap, "swaps", (long)0); - addField("inblock", &rusage::ru_inblock, "block input operations", (long)0); - addField("oublock", &rusage::ru_oublock, "block output operations", (long)0); - addField("msgsnd", &rusage::ru_msgsnd, "IPC messages sent", (long)0); - addField("msgrcv", &rusage::ru_msgrcv, "IPC messages received", (long)0); - addField("nsignals", &rusage::ru_nsignals, "signals received", (long)0); - addField("nvcsw", &rusage::ru_nvcsw, "voluntary context switches", (long)0); - addField("nivcsw", &rusage::ru_nivcsw, "involuntary context switches", (long)0); + addField("maxrss", &rusage::ru_maxrss, "maximum resident set size"); + addField("ixrss", &rusage::ru_ixrss, "integral shared memory size"); + addField("idrss", &rusage::ru_idrss, "integral unshared data size"); + addField("isrss", &rusage::ru_isrss, "integral unshared stack size"); + addField("minflt", &rusage::ru_minflt, "page reclaims (soft page faults)"); + addField("majflt", &rusage::ru_majflt, "page faults (hard page faults)"); + addField("nswap", &rusage::ru_nswap, "swaps"); + addField("inblock", &rusage::ru_inblock, "block input operations"); + addField("oublock", &rusage::ru_oublock, "block output operations"); + addField("msgsnd", &rusage::ru_msgsnd, "IPC messages sent"); + addField("msgrcv", &rusage::ru_msgrcv, "IPC messages received"); + addField("nsignals", &rusage::ru_nsignals, "signals received"); + addField("nvcsw", &rusage::ru_nvcsw, "voluntary context switches"); + addField("nivcsw", &rusage::ru_nivcsw, "involuntary context switches"); } namespace { + + Logging::Category warnings("Runner::warning"); + tuple CreateStdPipe(bool forWriting) { @@ -90,35 +94,63 @@ CreateStdPipe(bool forWriting) } // namespace - namespace Datacratic { -/****************************************************************************/ -/* RUNNER */ -/****************************************************************************/ - -std::string Runner::runnerHelper; +/* ASYNCRUNNER */ Runner:: Runner() - : EpollLoop(nullptr), - closeStdin(false), runRequests_(0), activeRequest_(0), running_(false), + : closeStdin(false), running_(false), startDate_(Date::negativeInfinity()), endDate_(startDate_), - childPid_(-1), childStdinFd_(-1), + childPid_(-1), + wakeup_(EFD_NONBLOCK | EFD_CLOEXEC), statusRemaining_(sizeof(ProcessStatus)) { + Epoller::init(4); + + handleEvent = [&] (const struct epoll_event & event) { + return this->handleEpollEvent(event); + }; } Runner:: ~Runner() { - kill(SIGTERM, false); + waitTermination(); +} + +Epoller::HandleEventResult +Runner:: +handleEpollEvent(const struct epoll_event & event) +{ + if (event.data.ptr == &task_.statusFd) { + // fprintf(stderr, "parent: handle child status input\n"); + handleChildStatus(event); + } + else if (event.data.ptr == &task_.stdOutFd) { + // fprintf(stderr, "parent: handle child output from stdout\n"); + handleOutputStatus(event, task_.stdOutFd, stdOutSink_); + } + else if (event.data.ptr == &task_.stdErrFd) { + // fprintf(stderr, "parent: handle child output from stderr\n"); + handleOutputStatus(event, task_.stdErrFd, stdErrSink_); + } + else if (event.data.ptr == nullptr) { + // stdInSink cleanup for now... + handleWakeup(event); + } + else { + throw ML::Exception("this should never occur"); + } + + return Epoller::DONE; } void Runner:: handleChildStatus(const struct epoll_event & event) { + // cerr << "handleChildStatus\n"; ProcessStatus status; if ((event.events & EPOLLIN) != 0) { @@ -145,6 +177,7 @@ handleChildStatus(const struct epoll_event & event) statusRemaining_ -= s; if (statusRemaining_ > 0) { + cerr << "warning: reading status fd in multiple chunks\n"; continue; } @@ -153,35 +186,52 @@ handleChildStatus(const struct epoll_event & event) // Set up for next message statusRemaining_ = sizeof(statusBuffer_); +#if 0 + cerr << "got status " << status.state + << " " << Task::statusStateAsString(status.state) + << " " << status.pid << " " << status.childStatus + << " " << status.launchErrno << " " + << strerror(status.launchErrno) << " " + << strLaunchError(status.launchErrorCode) + << endl; +#endif + task_.statusState = status.state; task_.runResult.usage = status.usage; if (status.launchErrno || status.launchErrorCode != LaunchError::NONE) { + //cerr << "*** launch error" << endl; // Error task_.runResult.updateFromLaunchError (status.launchErrno, strLaunchError(status.launchErrorCode)); task_.statusState = ProcessState::STOPPED; + childPid_ = -2; + ML::futex_wake(childPid_); + + if (stdInSink_ && stdInSink_->state != OutputSink::CLOSED) { + stdInSink_->requestClose(); + } + attemptTaskTermination(); + break; } switch (status.state) { case ProcessState::LAUNCHING: childPid_ = status.pid; + // cerr << " childPid_ = status.pid (launching)\n"; break; case ProcessState::RUNNING: childPid_ = status.pid; + // cerr << " childPid_ = status.pid (running)\n"; ML::futex_wake(childPid_); break; case ProcessState::STOPPED: - if (task_.runResult.state == RunResult::LAUNCH_ERROR) { - childPid_ = -2; - } - else { - task_.runResult.updateFromStatus(status.childStatus); - childPid_ = -3; - } + childPid_ = -3; + // cerr << " childPid_ = -3 (stopped)\n"; ML::futex_wake(childPid_); + task_.runResult.updateFromStatus(status.childStatus); task_.statusState = ProcessState::DONE; if (stdInSink_ && stdInSink_->state != OutputSink::CLOSED) { stdInSink_->requestClose(); @@ -201,35 +251,13 @@ handleChildStatus(const struct epoll_event & event) } if ((event.events & EPOLLHUP) != 0) { - // This happens when the thread that launched the process exits, - // and the child process follows. - removeFd(task_.statusFd, true); + //cerr << "*** hangup" << endl; + removeFd(task_.statusFd); ::close(task_.statusFd); task_.statusFd = -1; - - if (task_.statusState == ProcessState::RUNNING - || task_.statusState == ProcessState::LAUNCHING) { - cerr << "*************************************************************" << endl; - cerr << " HANGUP ON STATUS FD: RUNNER FORK THREAD EXITED?" << endl; - cerr << "*************************************************************" << endl; - cerr << "state = " << jsonEncode(task_.runResult.state) << endl; - cerr << "statusState = " << (int)task_.statusState << endl; - cerr << "childPid_ = " << childPid_ << endl; - - // We will never get another event, so we need to clean up - // everything here. - childPid_ = -3; - ML::futex_wake(childPid_); - - task_.runResult.state = RunResult::PARENT_EXITED; - task_.runResult.signum = SIGHUP; - task_.statusState = ProcessState::DONE; - if (stdInSink_ && stdInSink_->state != OutputSink::CLOSED) { - stdInSink_->requestClose(); - } - attemptTaskTermination(); - } } + + // cerr << "handleChildStatus done\n"; } void @@ -244,7 +272,9 @@ handleOutputStatus(const struct epoll_event & event, if ((event.events & EPOLLIN) != 0) { while (1) { ssize_t len = ::read(outputFd, buffer, sizeof(buffer)); + // cerr << "returned len: " << len << endl; if (len < 0) { + // perror(" len -1"); if (errno == EWOULDBLOCK) { break; } @@ -270,8 +300,12 @@ handleOutputStatus(const struct epoll_event & event, } if (data.size() > 0) { + // cerr << "sending child output to output handler\n"; sink->notifyReceived(move(data)); } + else { + cerr << "ignoring child output due to size == 0\n"; + } } if (closedFd || (event.events & EPOLLHUP) != 0) { @@ -279,7 +313,7 @@ handleOutputStatus(const struct epoll_event & event, sink->notifyClosed(); sink.reset(); if (outputFd > -1) { - removeFd(outputFd, true); + removeFd(outputFd); ::close(outputFd); outputFd = -1; } @@ -287,6 +321,27 @@ handleOutputStatus(const struct epoll_event & event, } } +void +Runner:: +handleWakeup(const struct epoll_event & event) +{ + // cerr << "handleWakup\n"; + while (!wakeup_.tryRead()); + + if ((event.events & EPOLLIN) != 0) { + if (stdInSink_) { + if (stdInSink_->connectionState_ + == AsyncEventSource::DISCONNECTED) { + attemptTaskTermination(); + removeFd(wakeup_.fd()); + } + else { + wakeup_.signal(); + } + } + } +} + void Runner:: attemptTaskTermination() @@ -315,38 +370,23 @@ attemptTaskTermination() must be performed when they are all met. This is what "attemptTaskTermination" does. */ - if ((!stdInSink_ - || stdInSink_->state == OutputSink::CLOSED - || stdInSink_->state == OutputSink::CLOSING) + if ((!stdInSink_ || stdInSink_->state == OutputSink::CLOSED) && !stdOutSink_ && !stdErrSink_ && childPid_ < 0 && (task_.statusState == ProcessState::STOPPED || task_.statusState == ProcessState::DONE)) { - auto runResult = move(task_.runResult); - auto onTerminate = move(task_.onTerminate); task_.postTerminate(*this); if (stdInSink_) { stdInSink_.reset(); } - endDate_ = Date::now(); - - ExcAssert(onTerminate); - onTerminate(runResult); - - /* Setting running_ to false must be done after "onTerminate" is - invoked, since "waitTermination" guarantees that "onTerminate" has - been called. In async mode, doing it here will not be a problem, - since "running_" will be reset to true when the MessageLoop - processes its delayed jobs. */ + // cerr << "terminated task\n"; running_ = false; + endDate_ = Date::now(); ML::futex_wake(running_); } - /* This block is useful for debugging the termination workflow of the - subprocess, therefore it should be kept 2 years after this date: - 2015-07-02. If uncommented, this date should be updated to the current - date. */ - else if (false) { +#if 0 + else { cerr << "cannot terminate yet because:\n"; if ((stdInSink_ && stdInSink_->state != OutputSink::CLOSED)) { cerr << "stdin sink active\n"; @@ -361,43 +401,32 @@ attemptTaskTermination() cerr << "childPid_ >= 0\n"; } if (!(task_.statusState == ProcessState::STOPPED - || task_.statusState == ProcessState::DONE)) { + || task_.statusState == DONE)) { cerr << "task status != stopped/done\n"; } } +#endif } OutputSink & Runner:: getStdInSink() { - if (stdInSink_) { + if (running_) + throw ML::Exception("already running"); + if (stdInSink_) throw ML::Exception("stdin sink already set"); - } - ExcAssertEqual(childStdinFd_, -1); auto onClose = [&] () { if (task_.stdInFd != -1) { ::close(task_.stdInFd); task_.stdInFd = -1; } - removeFd(stdInSink_->selectFd(), true); - if (task_.wrapperPid > -1) { - attemptTaskTermination(); - } + parent_->removeSource(stdInSink_.get()); + wakeup_.signal(); }; stdInSink_.reset(new AsyncFdOutputSink(onClose, onClose)); - tie(task_.stdInFd, childStdinFd_) = CreateStdPipe(true); - ML::set_file_flag(task_.stdInFd, O_NONBLOCK); - stdInSink_->init(task_.stdInFd); - - auto stdinCopy = stdInSink_; - auto stdinCb = [=] (const epoll_event & event) { - stdinCopy->processOne(); - }; - addFd(stdInSink_->selectFd(), true, false, stdinCb); - return *stdInSink_; } @@ -412,107 +441,23 @@ run(const vector & command, LOG(warnings) << ML::format("Runner %p is not connected to any MessageLoop\n", this); } - if (!onTerminate) { - throw ML::Exception("'onTerminate' parameter is mandatory"); - } - ExcAssert(runRequests_ < std::numeric_limits::max()); - runRequests_++; - - /* We run this in the message loop thread, which becomes the parent of the - child process. This is to avoid problems when the thread we're calling - run from exits, and since it's the parent process of the fork, causes - the subprocess to exit to due to PR_SET_DEATHSIG being set .*/ - auto toRun = [=] () { - try { - if (running_) { - throw ML::Exception("already running"); - } - this->doRunImpl(command, onTerminate, stdOutSink, stdErrSink); - } - catch (const std::exception & exc) { - /* Exceptions must be returned via onTerminate in order to provide - a consistent behaviour when "run" is called from the original - Runner thread or from the MessageLoop thread. "onTerminate" is - mandatory and is thus guaranteed to exist here. */ - RunResult result; - result.updateFromLaunchException(std::current_exception()); - ExcAssert(onTerminate); - onTerminate(result); - } - catch (...) { - cerr << ("FATAL: Runner::runImpl::toRun caught an unhandled" - " exception. MessageLoop thread will die.\n"); - throw; - } - }; - ExcAssert(parent_ != nullptr); - bool res = parent_->runInMessageLoopThread(toRun); - ExcAssert(res); -} -RunResult -Runner:: -runSync(const vector & command, - const shared_ptr & stdOutSink, - const shared_ptr & stdErrSink, - const string & stdInData) -{ - if (running_) { + if (running_) throw ML::Exception("already running"); - } - ExcAssert(runRequests_ < std::numeric_limits::max()); - runRequests_++; - - RunResult result; - bool terminated(false); - auto onTerminate = [&] (const RunResult & newResult) { - result = newResult; - terminated = true; - }; - OutputSink * sink(nullptr); - if (stdInData.size() > 0) { - sink = &getStdInSink(); - } - doRunImpl(command, onTerminate, stdOutSink, stdErrSink); - if (sink) { - sink->write(stdInData); - sink->requestClose(); - } - - while (!terminated) { - loop(-1, -1); - } - - if (result.state == RunResult::LAUNCH_EXCEPTION) { - std::rethrow_exception(result.launchExc); - } - - return result; -} - -void -Runner:: -doRunImpl(const vector & command, - const OnTerminate & onTerminate, - const shared_ptr & stdOutSink, - const shared_ptr & stdErrSink) -{ - running_ = true; - activeRequest_++; - ML::futex_wake(activeRequest_); startDate_ = Date::now(); endDate_ = Date::negativeInfinity(); + running_ = true; + ML::futex_wake(running_); + task_.statusState = ProcessState::UNKNOWN; task_.onTerminate = onTerminate; ProcessFds childFds; tie(task_.statusFd, childFds.statusFd) = CreateStdPipe(false); if (stdInSink_) { - ExcAssert(childStdinFd_ != -1); - childFds.stdIn = childStdinFd_; - childStdinFd_ = -1; + tie(task_.stdInFd, childFds.stdIn) = CreateStdPipe(true); } else if (closeStdin) { childFds.stdIn = -1; @@ -530,46 +475,32 @@ doRunImpl(const vector & command, ::flockfile(stderr); ::fflush_unlocked(NULL); task_.wrapperPid = fork(); - int savedErrno = errno; ::funlockfile(stderr); ::funlockfile(stdout); if (task_.wrapperPid == -1) { - throw ML::Exception(savedErrno, "Runner::run fork"); + throw ML::Exception(errno, "Runner::run fork"); } else if (task_.wrapperPid == 0) { - try { - task_.runWrapper(command, childFds); - } - catch (...) { - ProcessStatus status; - status.state = ProcessState::STOPPED; - status.setErrorCodes(errno, LaunchError::SUBTASK_LAUNCH); - childFds.writeStatus(status); - - exit(-1); - } + task_.runWrapper(command, childFds); } else { task_.statusState = ProcessState::LAUNCHING; ML::set_file_flag(task_.statusFd, O_NONBLOCK); - auto statusCb = [&] (const epoll_event & event) { - handleChildStatus(event); - }; - addFd(task_.statusFd, true, false, statusCb); + if (stdInSink_) { + ML::set_file_flag(task_.stdInFd, O_NONBLOCK); + stdInSink_->init(task_.stdInFd); + parent_->addSource("stdInSink", stdInSink_); + addFd(wakeup_.fd()); + } + addFd(task_.statusFd, &task_.statusFd); if (stdOutSink) { ML::set_file_flag(task_.stdOutFd, O_NONBLOCK); - auto outputCb = [=] (const epoll_event & event) { - handleOutputStatus(event, task_.stdOutFd, stdOutSink_); - }; - addFd(task_.stdOutFd, true, false, outputCb); + addFd(task_.stdOutFd, &task_.stdOutFd); } if (stdErrSink) { ML::set_file_flag(task_.stdErrFd, O_NONBLOCK); - auto outputCb = [=] (const epoll_event & event) { - handleOutputStatus(event, task_.stdErrFd, stdErrSink_); - }; - addFd(task_.stdErrFd, true, false, outputCb); + addFd(task_.stdErrFd, &task_.stdErrFd); } childFds.close(); @@ -612,45 +543,19 @@ waitStart(double secondsToWait) const Date deadline = Date::now().plusSeconds(secondsToWait); while (childPid_ == -1) { + //cerr << "waitStart childPid_ = " << childPid_ << endl; double timeToWait = Date::now().secondsUntil(deadline); if (timeToWait < 0) break; if (isfinite(timeToWait)) ML::futex_wait(childPid_, -1, timeToWait); else ML::futex_wait(childPid_, -1); + //cerr << "waitStart childPid_ now = " << childPid_ << endl; } return childPid_ > 0; } -bool -Runner:: -waitRunning(double secondsToWait) const -{ - bool timeout(false); - - Date deadline = Date::now().plusSeconds(secondsToWait); - while (true) { - int currentActive(activeRequest_); - if (currentActive >= runRequests_) { - break; - } - double timeToWait = Date::now().secondsUntil(deadline); - if (isfinite(timeToWait)) { - if (timeToWait < 0) { - timeout = true; - break; - } - ML::futex_wait(activeRequest_, currentActive, timeToWait); - } - else { - ML::futex_wait(activeRequest_, currentActive); - } - } - - return !timeout; -} - void Runner:: waitTermination() const @@ -689,102 +594,79 @@ void Runner::Task:: runWrapper(const vector & command, ProcessFds & fds) { - // Find runner_helper path - string runnerHelper = findRunnerHelper(); + static const char * appendStr = "../../../" BIN "/runner_helper"; - vector preArgs = { /*"gdb", "--tty", "/dev/pts/48", "--args"*/ /*"../strace-code/strace", "-b", "execve", "-ftttT", "-o", "runner_helper.strace"*/ }; + auto dieWithErrno = [&] (const char * message) { + ProcessStatus status; + status.state = ProcessState::STOPPED; + status.setErrorCodes(errno, LaunchError::SUBTASK_LAUNCH); + fds.writeStatus(status); - // Set up the arguments before we fork, as we don't want to call malloc() - // from the fork, and it can be called from c_str() in theory. - auto len = command.size(); - char * argv[len + 3 + preArgs.size()]; + throw ML::Exception(errno, message); + }; - for (unsigned i = 0; i < preArgs.size(); ++i) - argv[i] = (char *)preArgs[i].c_str(); + /* We need to deduce the absolute path to the helper by using the current + program as reference. The trick is to read the value of the + "/proc/self/exe" link and then to substitute the current program name + with a relative path to the helper program. */ + char exeBuffer[16384]; + ssize_t len = ::readlink("/proc/self/exe", + exeBuffer, sizeof(exeBuffer) - 1); + if (len == -1) { + dieWithErrno("determining current program"); + } + + /* Since readlink does not return a null-terminated string, we need to add + one by hand if we want to avoid buffer problems with strrchr. */ + exeBuffer[len] = '\0'; + char * slash = ::strrchr(exeBuffer, '/'); + slash++; + size_t appendSize = ::strlen(appendStr); + if (slash + appendSize > (exeBuffer + sizeof(exeBuffer) - 2)) { + dieWithErrno("preparing program value"); + } + ::memcpy(slash, appendStr, appendSize); + slash[appendSize] = '\0'; - int idx = preArgs.size(); + // Set up the arguments before we fork, as we don't want to call malloc() + // from the fork, and it can be called from c_str() in theory. + len = command.size(); + char * argv[len + 3]; - argv[idx++] = (char *) runnerHelper.c_str(); + argv[0] = exeBuffer; size_t channelsSize = 4*2*4+3+1; char channels[channelsSize]; fds.encodeToBuffer(channels, channelsSize); - argv[idx++] = channels; + argv[1] = channels; for (int i = 0; i < len; i++) { - argv[idx++] = (char *) command[i].c_str(); + argv[2+i] = (char *) command[i].c_str(); } - argv[idx++] = nullptr; + argv[2+len] = nullptr; - std::vector env; - - char * const * p = environ; - - while (*p) { - env.push_back(*p); - ++p; - } - - env.push_back(nullptr); - - char * const * envp = &env[0]; - - int res = execve(argv[0], argv, envp); + int res = execv(argv[0], argv); if (res == -1) { - throw ML::Exception(errno, "launching runner helper"); + dieWithErrno("launching runner helper"); } throw ML::Exception("You are the King of Time!"); } -string -Runner::Task:: -findRunnerHelper() -{ - string runnerHelper = Runner::runnerHelper; - - if (runnerHelper.empty()) { - static string staticHelper; - - if (staticHelper.empty()) { - string binDir; - char * cBin = ::getenv("BIN"); - if (cBin) { - binDir = cBin; - } - if (binDir.empty()) { - char binBuffer[16384]; - char * res = ::getcwd(binBuffer, 16384); - ExcAssert(res != NULL); - binDir = res; - binDir += "/" BIN; - } - staticHelper = binDir + "/runner_helper"; - - // Make sure the deduced path is right - struct stat sb; - int res = ::stat(staticHelper.c_str(), &sb); - if (res != 0) { - throw ML::Exception(errno, "checking static helper"); - } - } - runnerHelper = staticHelper; - } - - return runnerHelper; -} - /* This method *must* be called from attemptTaskTermination, in order to * respect the natural order of things. */ void Runner::Task:: postTerminate(Runner & runner) { + // cerr << "postTerminate\n"; + if (wrapperPid <= 0) { throw ML::Exception("wrapperPid <= 0, has postTerminate been executed before?"); } + // cerr << "waiting for wrapper pid: " << wrapperPid << endl; int wrapperPidStatus; while (true) { int res = ::waitpid(wrapperPid, &wrapperPidStatus, 0); @@ -802,8 +684,10 @@ postTerminate(Runner & runner) } wrapperPid = -1; + //cerr << "finished waiting for wrapper with status " << wrapperPidStatus + // << endl; + if (stdInFd != -1) { - runner.removeFd(stdInFd, true); ::close(stdInFd); stdInFd = -1; } @@ -812,7 +696,7 @@ postTerminate(Runner & runner) if (fd > -1) { JML_TRACE_EXCEPTIONS(false); try { - runner.removeFd(fd, true); + runner.removeFd(fd); } catch (const ML::Exception & exc) { } @@ -824,9 +708,12 @@ postTerminate(Runner & runner) unregisterFd(stdErrFd); command.clear(); + + if (onTerminate) { + onTerminate(runResult); + onTerminate = nullptr; + } runResult = RunResult(); - onTerminate = nullptr; - statusState = ProcessState::UNKNOWN; } @@ -881,14 +768,6 @@ processStatus() return status; } -void -RunResult:: -updateFromLaunchException(const std::exception_ptr & excPtr) -{ - state = LAUNCH_EXCEPTION; - launchExc = excPtr; -} - void RunResult:: updateFromLaunchError(int launchErrno, @@ -912,11 +791,9 @@ to_string(const RunResult::State & state) { switch (state) { case RunResult::UNKNOWN: return "UNKNOWN"; - case RunResult::LAUNCH_EXCEPTION: return "LAUNCH_EXCEPTION"; case RunResult::LAUNCH_ERROR: return "LAUNCH_ERROR"; case RunResult::RETURNED: return "RETURNED"; case RunResult::SIGNALED: return "SIGNALED"; - case RunResult::PARENT_EXITED: return "PARENT_EXITED"; } return ML::format("RunResult::State(%d)", state); @@ -953,7 +830,6 @@ RunResultStateDescription() "Command was unable to be launched"); addValue("RETURNED", RunResult::RETURNED, "Command returned"); addValue("SIGNALED", RunResult::SIGNALED, "Command exited with a signal"); - addValue("PARENT_EXITED", RunResult::PARENT_EXITED, "Parent process exited forcing child to die"); } @@ -967,14 +843,31 @@ execute(MessageLoop & loop, const string & stdInData, bool closeStdin) { - static bool notified(false); + RunResult result; + auto onTerminate = [&](const RunResult & runResult) { + result = runResult; + }; - if (!notified) { - cerr << "warning: the \"MessageLoop\"-based \"execute\" function is deprecated\n"; - notified = true; + Runner runner; + + loop.addSource("runner", runner); + + if (stdInData.size() > 0) { + auto & sink = runner.getStdInSink(); + runner.run(command, onTerminate, stdOutSink, stdErrSink); + sink.write(stdInData); + sink.requestClose(); + } + else { + runner.closeStdin = closeStdin; + runner.run(command, onTerminate, stdOutSink, stdErrSink); } - return execute(command, stdOutSink, stdErrSink, stdInData, closeStdin); + runner.waitTermination(); + loop.removeSource(&runner); + runner.waitConnectionState(AsyncEventSource::DISCONNECTED); + + return result; } RunResult @@ -984,11 +877,14 @@ execute(const vector & command, const string & stdInData, bool closeStdin) { - Runner runner; + MessageLoop loop(1, 0, -1); - runner.closeStdin = closeStdin; + loop.start(); + RunResult result = execute(loop, command, stdOutSink, stdErrSink, + stdInData, closeStdin); + loop.shutdown(); - return runner.runSync(command, stdOutSink, stdErrSink, stdInData); + return result; } } // namespace Datacratic diff --git a/service/runner.h b/service/runner.h index 73af105d..f377576d 100644 --- a/service/runner.h +++ b/service/runner.h @@ -20,7 +20,7 @@ #include "soa/types/date.h" #include "soa/types/value_description.h" -#include "epoll_loop.h" +#include "epoller.h" #include "runner_common.h" #include "sink.h" @@ -59,9 +59,6 @@ struct RunResult { /** Extract the process return code as would be returned by a shell. */ int processStatus() const; - /** Update the state in response to a launch error. */ - void updateFromLaunchException(const std::exception_ptr & excPtr); - /** Update the state in response to a launch error. */ void updateFromLaunchError(int launchErrno, const std::string & launchError); @@ -70,17 +67,14 @@ struct RunResult { enum State { UNKNOWN, ///< State is not known LAUNCH_ERROR, ///< Command was unable to be launched - LAUNCH_EXCEPTION, ///< Exception thrown when launching the command RETURNED, ///< Command returned - SIGNALED, ///< Command exited with a signal - PARENT_EXITED ///< Parent exited, killing the child + SIGNALED ///< Command exited with a signal }; - + State state; int signum; ///< Signal number it returned with int returnCode; ///< Return code if command exited - std::exception_ptr launchExc; /// OnTerminate; Runner(); @@ -118,20 +109,12 @@ struct Runner : public EpollLoop { /* Close stdin at launch time if stdin sink was not queried. */ bool closeStdin; - /** Run a program asynchronously, requiring to be attached to a - * MessageLoop. */ + /** Run the subprocess. */ void run(const std::vector & command, - const OnTerminate & onTerminate, + const OnTerminate & onTerminate = nullptr, const std::shared_ptr & stdOutSink = nullptr, const std::shared_ptr & stdErrSink = nullptr); - /** Run a program synchronously. This method does not need any preliminary - * registration to a MessageLoop. */ - RunResult runSync(const std::vector & command, - const std::shared_ptr & stdOutSink = nullptr, - const std::shared_ptr & stdErrSink = nullptr, - const std::string & stdInData = ""); - /** Kill the subprocess with the given signal, then wait for it to terminate. @@ -154,19 +137,10 @@ struct Runner : public EpollLoop { /** Synchronous wait for the subprocess to start. Returns true if the process started, or false if it wasn't able to start. - Will wait for a maximum of secondsToWait seconds. Returns "true" when - the condition was met or "false" in case of a timeout. + Will wait for a maximum of secondsToWait seconds. */ bool waitStart(double secondsToWait = INFINITY) const; - /** Synchronous wait for the subprocess to be marked as started from the - MessageLoop thread. - - Will wait for a maximum of secondsToWait seconds. Returns "true" when - the condition was met or "false" in case of a timeout. - */ - bool waitRunning(double secondsToWait = INFINITY) const; - /** Synchronous wait for termination of the subprocess and the closing of * all related resources. */ void waitTermination() const; @@ -187,20 +161,6 @@ struct Runner : public EpollLoop { double duration() const; private: - void runImpl(const std::vector & command, - const OnTerminate & onTerminate = nullptr, - const std::shared_ptr & stdOutSink = nullptr, - const std::shared_ptr & stdErrSink = nullptr); - - /** Implementation of the runImpl function, which is called inside the - message loop thread so that it knows the parent thread will not - go away and cause issues with death signals of the child process. - */ - void doRunImpl(const std::vector & command, - const OnTerminate & onTerminate = nullptr, - const std::shared_ptr & stdOutSink = nullptr, - const std::shared_ptr & stdErrSink = nullptr); - struct Task { Task(); @@ -209,8 +169,7 @@ struct Runner : public EpollLoop { void flushStdInBuffer(); void runWrapper(const std::vector & command, ProcessFds & fds); - std::string findRunnerHelper(); - + void postTerminate(Runner & runner); std::vector command; @@ -228,16 +187,16 @@ struct Runner : public EpollLoop { }; void prepareChild(); + Epoller::HandleEventResult + handleEpollEvent(const struct epoll_event & event); void handleChildStatus(const struct epoll_event & event); void handleOutputStatus(const struct epoll_event & event, int & fd, std::shared_ptr & sink); + void handleWakeup(const struct epoll_event & event); void attemptTaskTermination(); - int runRequests_; - int activeRequest_; - int32_t running_; - + int running_; Date startDate_; Date endDate_; @@ -248,8 +207,9 @@ struct Runner : public EpollLoop { */ pid_t childPid_; + ML::Wakeup_Fd wakeup_; + std::shared_ptr stdInSink_; - int childStdinFd_; std::shared_ptr stdOutSink_; std::shared_ptr stdErrSink_; @@ -267,17 +227,16 @@ struct Runner : public EpollLoop { Runner object and using it to run a single command. */ -/** Execute a command synchronously. */ -RunResult execute(const std::vector & command, +/** Execute a command synchronously using the specified message loop. */ +RunResult execute(MessageLoop & loop, + const std::vector & command, const std::shared_ptr & stdOutSink = nullptr, const std::shared_ptr & stdErrSink = nullptr, const std::string & stdInData = "", bool closeStdin = false); -/** (Deprecated) Execute a command synchronously using the specified message - * loop. */ -RunResult execute(MessageLoop & loop, - const std::vector & command, +/** Execute a command synchronously using its own message loop. */ +RunResult execute(const std::vector & command, const std::shared_ptr & stdOutSink = nullptr, const std::shared_ptr & stdErrSink = nullptr, const std::string & stdInData = "", diff --git a/service/s3.cc b/service/s3.cc index 750172a5..d1a42c8b 100644 --- a/service/s3.cc +++ b/service/s3.cc @@ -1569,7 +1569,7 @@ download(const std::string & bucket, // << " and offset : " << part.offset << endl; auto partResult = get(bucket, "/" + object, part); - if (partResult.code_ != 206) { + if (!(partResult.code_ == 206 || partResult.code_ == 200)) { cerr << "error getting part " << i << ": " << partResult.bodyXmlStr() << endl; failed = true; @@ -1873,8 +1873,7 @@ struct StreamingDownloadSource { auto partResult = owner->get(bucket, "/" + object, S3Api::Range(start, chunkSize)); - - if (partResult.code_ != 206) { + if (!(partResult.code_ == 206 || partResult.code_ == 200)) { throw ML::Exception("http error " + to_string(partResult.code_) + " while getting part " diff --git a/service/service_base.cc b/service/service_base.cc index 3660702f..e42b0075 100644 --- a/service/service_base.cc +++ b/service/service_base.cc @@ -79,7 +79,7 @@ void NullEventService:: onEvent(const std::string & name, const char * event, - EventType type, + StatEventType type, float value, std::initializer_list) { @@ -124,7 +124,7 @@ void CarbonEventService:: onEvent(const std::string & name, const char * event, - EventType type, + StatEventType type, float value, std::initializer_list extra) { @@ -683,7 +683,7 @@ EventRecorder(const std::string & eventPrefix, void EventRecorder:: -recordEventFmt(EventType type, +recordEventFmt(StatEventType type, float value, std::initializer_list extra, const char * fmt, ...) const diff --git a/service/service_base.h b/service/service_base.h index 17407103..49305584 100644 --- a/service/service_base.h +++ b/service/service_base.h @@ -48,7 +48,7 @@ struct EventService { virtual void onEvent(const std::string & name, const char * event, - EventType type, + StatEventType type, float value, std::initializer_list extra = DefaultOutcomePercentiles) = 0; @@ -72,7 +72,7 @@ struct NullEventService : public EventService { virtual void onEvent(const std::string & name, const char * event, - EventType type, + StatEventType type, float value, std::initializer_list extra = DefaultOutcomePercentiles); @@ -98,7 +98,7 @@ struct CarbonEventService : public EventService { virtual void onEvent(const std::string & name, const char * event, - EventType type, + StatEventType type, float value, std::initializer_list extra = std::initializer_list()); @@ -471,7 +471,7 @@ struct EventRecorder { units: the units of the event (eg, ms). Default is unitless. */ void recordEvent(const char * eventName, - EventType type = ET_COUNT, + StatEventType type = ET_COUNT, float value = 1.0, std::initializer_list extra = DefaultOutcomePercentiles) const { @@ -488,7 +488,7 @@ struct EventRecorder { es->onEvent(eventPrefix_, eventName, type, value, extra); } - void recordEventFmt(EventType type, + void recordEventFmt(StatEventType type, float value, std::initializer_list extra, const char * fmt, ...) const JML_FORMAT_STRING(5, 6); diff --git a/service/service_utils.h b/service/service_utils.h index 88957d30..0268bbde 100644 --- a/service/service_utils.h +++ b/service/service_utils.h @@ -13,8 +13,109 @@ #include #include #include +#include // ::getenv() #include +#include // dlopen() +#include "jml/utils/string_functions.h" // ML::split() +#include "jml/utils/file_functions.h" // ML::fileExists() +#include "jml/utils/json_parsing.h" +#include "jml/arch/exception.h" // ML::Exception +#include "jml/utils/environment.h" // ML::Environment +namespace { + +/******************************************************************************/ +/* HELPER FUNCTIONS FOR PRELOADING LIBS DYNAMICALLY */ +/******************************************************************************/ + +void loadLib(const std::string & file) +{ + void * handle = dlopen(file.c_str(), RTLD_NOW); + if (!handle) { + std::cerr << dlerror() << std::endl; + throw ML::Exception("couldn't load library from %s", file.c_str()); + } +} + +void loadJsonLib(const std::string & jsonFile) +{ + using std::string; + using std::vector; + + ML::File_Read_Buffer buf(jsonFile); + Json::Value jsonListLibs = Json::parse(std::string(buf.start(), buf.end())); + + if (jsonListLibs == Json::Value::null) return; + + if (!jsonListLibs.isArray()) + throw ML::Exception("Library list must be an array"); + + string file; + for (const auto & fileAsJson : jsonListLibs) { + + file = fileAsJson.asString(); + + if (file.empty()) throw ML::Exception("File name cannot be empty"); + + if (ML::endsWith(file,".so")) { + if (!ML::fileExists(file)) + throw ML::Exception("File %s in %s does not exist", file.c_str(), jsonFile.c_str()); + loadLib(file); + continue; + } + + file = file + ".so"; + if (!ML::fileExists(file)) + throw ML::Exception("File %s in %s does not exist", file.c_str(), jsonFile.c_str()); + loadLib(file); + } +} + +void loadLibList(const std::string & fileList) +{ + using std::string; + using std::vector; + + vector listToPreload=ML::split(fileList,','); + string fileSo; + string fileJson; + + for (const string & file : listToPreload) { + + if (file.empty()) throw ML::Exception("File name cannot be empty"); + + if (ML::endsWith(file,".so")) { + if (!ML::fileExists(file)) + throw ML::Exception("File %s does not exist", file.c_str()); + loadLib(file); + continue; + } + + if (ML::endsWith(file,".json")) { + if (!ML::fileExists(file)) + throw ML::Exception("File %s does not exist", file.c_str()); + loadJsonLib(file); + continue; + } + + fileSo = file + ".so"; + if (ML::fileExists(fileSo)) { + loadLib(fileSo); + continue; + } + + fileJson = file + ".json"; + if (ML::fileExists(fileJson)) { + loadJsonLib(fileJson); + continue; + } + + throw ML::Exception("File %s does not exist in neither .so nor .json format", file.c_str()); + } + +} + +} // namespace anonymuous namespace Datacratic { @@ -50,7 +151,9 @@ struct ServiceProxyArguments ("installation,I", value(&installation), "name of the current installation") ("location,L", value(&location), - "Name of the current location"); + "Name of the current location") + ("preload,P", value(&preload), + "Comma separated list of libraries to preload and/or json files"); if (opt == WITH_ZOOKEEPER) { options.add_options() @@ -69,6 +172,8 @@ struct ServiceProxyArguments std::shared_ptr makeServiceProxies(ConfigurationServiceType configurationType = CS_ZOOKEEPER) { + preloadDynamicLibs(); + auto services = std::make_shared(); if (!bootstrap.empty()) @@ -101,11 +206,24 @@ struct ServiceProxyArguments std::string carbonUri; std::string installation; std::string location; + std::string preload; private: std::string serviceName_; + void preloadDynamicLibs() const + { + char * envPreloadC = ::getenv("RTBKIT_PRELOAD"); + std::string envPreload = envPreloadC ? envPreloadC : ""; + + if(!envPreload.empty()) + loadLibList(envPreload); + + if (!preload.empty()) + loadLibList(preload); + } + }; } // namespace Datacratic diff --git a/service/stats_events.h b/service/stats_events.h index b0fb9cce..2b5aa906 100644 --- a/service/stats_events.h +++ b/service/stats_events.h @@ -10,7 +10,7 @@ namespace Datacratic { -enum EventType { +enum StatEventType { ET_HIT, ///< Represents an extra count on a counter ET_COUNT, ///< Represents an extra value accumulated ET_STABLE_LEVEL, ///< Represents the current level of a stable something diff --git a/service/testing/custom_preload_1.cc b/service/testing/custom_preload_1.cc new file mode 100644 index 00000000..275efb0b --- /dev/null +++ b/service/testing/custom_preload_1.cc @@ -0,0 +1,19 @@ +/** custom_preload_1.cc -*- C++ -*- + Sirma Cagil Altay, 20 Oct 2015 + Copyright (c) 2015 Datacratic. All rights reserved. + + File to be dynamically loaded during service_util test +*/ + +#include "soa/service/testing/dynamic_loading_test_table.h" + +namespace { + +struct AtInit { + AtInit() + { + TEST::DynamicLoading::custom_lib_1 = 1; + } +} AtInit; + +} // namespace anonymous diff --git a/service/testing/custom_preload_2.cc b/service/testing/custom_preload_2.cc new file mode 100644 index 00000000..24e299d6 --- /dev/null +++ b/service/testing/custom_preload_2.cc @@ -0,0 +1,19 @@ +/** custom_preload_2.cc -*- C++ -*- + Sirma Cagil Altay, 20 Oct 2015 + Copyright (c) 2015 Datacratic. All rights reserved. + + File to be dynamically loaded during service_util test +*/ + +#include "soa/service/testing/dynamic_loading_test_table.h" + +namespace { + +struct AtInit { + AtInit() + { + TEST::DynamicLoading::custom_lib_2 = 1; + } +} AtInit; + +} // namespace anonymous diff --git a/service/testing/custom_preload_3.cc b/service/testing/custom_preload_3.cc new file mode 100644 index 00000000..8ac70430 --- /dev/null +++ b/service/testing/custom_preload_3.cc @@ -0,0 +1,19 @@ +/** custom_preload_3.cc -*- C++ -*- + Sirma Cagil Altay, 20 Oct 2015 + Copyright (c) 2015 Datacratic. All rights reserved. + + File to be dynamically loaded during service_util test +*/ + +#include "soa/service/testing/dynamic_loading_test_table.h" + +namespace { + +struct AtInit { + AtInit() + { + TEST::DynamicLoading::custom_lib_3 = 1; + } +} AtInit; + +} // namespace anonymous diff --git a/service/testing/custom_preload_4.cc b/service/testing/custom_preload_4.cc new file mode 100644 index 00000000..6ea9c574 --- /dev/null +++ b/service/testing/custom_preload_4.cc @@ -0,0 +1,19 @@ +/** custom_preload_4.cc -*- C++ -*- + Sirma Cagil Altay, 20 Oct 2015 + Copyright (c) 2015 Datacratic. All rights reserved. + + File to be dynamically loaded during service_util test +*/ + +#include "soa/service/testing/dynamic_loading_test_table.h" + +namespace { + +struct AtInit { + AtInit() + { + TEST::DynamicLoading::custom_lib_4 = 1; + } +} AtInit; + +} // namespace anonymous diff --git a/service/testing/dynamic_loading_test_table.h b/service/testing/dynamic_loading_test_table.h new file mode 100644 index 00000000..d7a925b0 --- /dev/null +++ b/service/testing/dynamic_loading_test_table.h @@ -0,0 +1,26 @@ +/** dynamic_loading_test_table.h + Sirma Cagil Altay, 20 Oct 2015 + Copyright (c) 2015 Datacratic. All rights reserved. + + Class created for testing the dynamic library + loading during service_util test +*/ + +#pragma once + +namespace TEST{ + +class DynamicLoading { +public: + static int custom_lib_1; + static int custom_lib_2; + static int custom_lib_3; + static int custom_lib_4; +}; + +int DynamicLoading::custom_lib_1 = 0; +int DynamicLoading::custom_lib_2 = 0; +int DynamicLoading::custom_lib_3 = 0; +int DynamicLoading::custom_lib_4 = 0; + +} // namespace TEST diff --git a/service/testing/libs-to-dynamically-load.json b/service/testing/libs-to-dynamically-load.json new file mode 100644 index 00000000..98caf989 --- /dev/null +++ b/service/testing/libs-to-dynamically-load.json @@ -0,0 +1,4 @@ +[ + "build/x86_64/bin/libcustom_preload_2.so", + "build/x86_64/bin/libcustom_preload_3" +] diff --git a/service/testing/mongo_temporary_server.cc b/service/testing/mongo_temporary_server.cc index cc66f84e..2ac60a5f 100644 --- a/service/testing/mongo_temporary_server.cc +++ b/service/testing/mongo_temporary_server.cc @@ -5,113 +5,78 @@ **/ -#include -#include -#include "jml/utils/exc_assert.h" #include "mongo_temporary_server.h" -using namespace std; using namespace Mongo; -namespace fs = boost::filesystem; -using namespace Datacratic; +using namespace std; -MongoTemporaryServer:: -MongoTemporaryServer(const string & uniquePath, const int portNum) - : state(Inactive), uniquePath_(uniquePath) +MongoTemporaryServer::MongoTemporaryServer(string uniquePath) + : state(Inactive) { - static int index(0); + static int index; ++index; - if (uniquePath_.empty()) { + if (uniquePath == "") { ML::Env_Option tmpDir("TMP", "./tmp"); - uniquePath_ = ML::format("%s/mongo-temporary-server-%d-%d", - tmpDir.get(), getpid(), index); - cerr << ("starting mongo temporary server under unique path " - + uniquePath_ + "\n"); - } - - if (portNum == 0) { - int freePort = 0; - for (int i = 0; i < 100; ++ i) { - struct sockaddr_in addr; - addr.sin_family = AF_INET; - auto sockfd = socket(AF_INET, SOCK_STREAM, 0); - freePort = rand() % 15000 + 5000; // range 15000 - 20000 - addr.sin_port = htons(freePort); - addr.sin_addr.s_addr = INADDR_ANY; - int res = ::bind(sockfd, (struct sockaddr *) &addr, sizeof(addr)); - if (res == 0) { - close(sockfd); - break; - } - freePort = 0; - } - if (freePort == 0) { - throw ML::Exception("Failed to find free port"); - } - this->portNum = freePort; - } - else { - this->portNum = portNum; + uniquePath = ML::format("%s/mongo-temporary-server-%d-%d", + tmpDir.get(), getpid(), index); + cerr << "starting mongo temporary server under unique path " + << uniquePath << endl; } + this->uniquePath_ = uniquePath; start(); } -MongoTemporaryServer:: -~MongoTemporaryServer() -{ +MongoTemporaryServer::~MongoTemporaryServer() { shutdown(); } -void -MongoTemporaryServer:: -testConnection() -{ +void MongoTemporaryServer::testConnection() { // 3. Connect to the server to make sure it works - int sock = ::socket(AF_UNIX, SOCK_STREAM, 0); + int sock = socket(AF_UNIX, SOCK_STREAM, 0); if (sock == -1) { throw ML::Exception(errno, "socket"); } sockaddr_un addr; addr.sun_family = AF_UNIX; - // Wait for it to start up + namespace fs = boost::filesystem; fs::directory_iterator endItr; - fs::path socketdir(socketPath_); - bool connected(false); - for (unsigned i = 0; i < 100 && !connected; ++i) { + boost::filesystem::path socketdir(socketPath_); + int res=0; + for (unsigned i = 0; i < 1000; ++i) { // read the directory to wait for the socket file to appear - for (fs::directory_iterator itr(socketdir); itr != endItr; ++itr) { - ::strcpy(addr.sun_path, itr->path().string().c_str()); - int res = ::connect(sock, - (const sockaddr *) &addr, SUN_LEN(&addr)); - if (res == 0) { - connected = true; - } - else if (res == -1) { - if (errno != ECONNREFUSED && errno != ENOENT) { - throw ML::Exception(errno, "connect"); - } - } + bool found = false; + for( fs::directory_iterator itr(socketdir) ; itr!=endItr ; ++itr) + { + strcpy(addr.sun_path, itr->path().string().c_str()); + found = true; + } + if(found) + { + res = connect(sock, (const sockaddr *)&addr, SUN_LEN(&addr)); + if (res == 0) break; + if (res == -1 && errno != ECONNREFUSED && errno != ENOENT) + throw ML::Exception(errno, "connect"); + } + else { + ML::sleep(0.01); } - ML::sleep(0.1); } - if (!connected) { + if (res != 0) { throw ML::Exception("mongod didn't start up in 10 seconds"); } - ::close(sock); + close(sock); + cerr << "Connection to mongodb socket established " << endl; } -void -MongoTemporaryServer:: -start() -{ +void MongoTemporaryServer::start() { + namespace fs = boost::filesystem; // Check the unique path - if (uniquePath_ == "" || uniquePath_[0] == '/' || uniquePath_ == "." - || uniquePath_ == "..") { + if (uniquePath_.empty() || uniquePath_ == "." || uniquePath_ == "..") { throw ML::Exception("unacceptable unique path"); } @@ -120,12 +85,16 @@ start() // First check that it doesn't exist struct stat stats; int res = ::stat(uniquePath_.c_str(), &stats); - if (res != -1 || (errno != EEXIST && errno != ENOENT)) { - throw ML::Exception(errno, "unique path " + uniquePath_ - + " already exists"); + if (res == -1) { + if (errno != EEXIST && errno != ENOENT) { + throw ML::Exception(errno, "unhandled exception"); + } + } else if (res == 0) { + throw ML::Exception("unique path " + uniquePath_ + " already exists"); } + cerr << "creating directory " << uniquePath_ << endl; - if (!fs::create_directory(fs::path(uniquePath_))) { + if (!fs::create_directories(fs::path(uniquePath_))) { throw ML::Exception("could not create unique path " + uniquePath_); } @@ -138,8 +107,8 @@ start() } // Create unix socket directory - fs::path unixdir(socketPath_); - if (!fs::create_directory(unixdir)) { + boost::filesystem::path unixdir(socketPath_); + if( !boost::filesystem::create_directory(unixdir)) { throw ML::Exception(errno, "couldn't create unix socket directory for Mongo"); } @@ -152,48 +121,35 @@ start() loop_.addSource("runner", runner_); loop_.start(); - auto onTerminate = [&] (const RunResult & result) { - }; - runner_.run({"/usr/bin/mongod", - "--bind_ip", "localhost", "--port", to_string(portNum), - "--logpath", logfile_, "--dbpath", uniquePath_, - "--unixSocketPrefix", socketPath_, "--nojournal"}, - onTerminate, nullptr, stdOutSink); + cerr << "about to run command using runner " << endl; + runner_.run({ + "/usr/bin/mongod", + "--port", "28356", + "--logpath",logfile_.c_str(),"--bind_ip", + "localhost","--dbpath",uniquePath_.c_str(),"--unixSocketPrefix", + socketPath_.c_str(),"--nojournal"}, nullptr, nullptr, stdOutSink); // connect to the socket to make sure everything is working fine testConnection(); - string payload("db.createUser({user: 'testuser', pwd: 'testpw'," - "roles: ['userAdmin', 'dbAdmin']})"); - RunResult runRes = execute({"/usr/bin/mongo", - "localhost:" + to_string(portNum)}, - nullptr, nullptr, payload); - ExcAssertEqual(runRes.processStatus(), 0); - execute({"/usr/bin/mongo", "localhost:" + to_string(portNum)}, - nullptr, nullptr, "db.getUsers()"); - + string payload("db.addUser('testuser','testpw',true)"); + execute(loop_,{"/usr/bin/mongo","localhost:28356"}, nullptr, nullptr, + payload); state = Running; } -void -MongoTemporaryServer:: -suspend() -{ +void MongoTemporaryServer::suspend() { runner_.kill(SIGSTOP); state = Suspended; } -void -MongoTemporaryServer:: -resume() -{ +void MongoTemporaryServer::resume() { runner_.kill(SIGCONT); state = Running; } -void -MongoTemporaryServer:: -shutdown() -{ - if (runner_.childPid() < 0) { + +void MongoTemporaryServer::shutdown() { + namespace fs = boost::filesystem; + if(runner_.childPid() < 0) { return; } runner_.kill(); diff --git a/service/testing/mongo_temporary_server.h b/service/testing/mongo_temporary_server.h index d9fe720a..3be1b1a1 100644 --- a/service/testing/mongo_temporary_server.h +++ b/service/testing/mongo_temporary_server.h @@ -26,13 +26,11 @@ #include "soa/service/message_loop.h" #include "soa/service/runner.h" #include "soa/service/sink.h" - - namespace Mongo { struct MongoTemporaryServer : boost::noncopyable { - MongoTemporaryServer(const std::string & uniquePath = "", - const int portNum = 28356); + + MongoTemporaryServer(std::string uniquePath = ""); ~MongoTemporaryServer(); void testConnection(); @@ -40,9 +38,6 @@ struct MongoTemporaryServer : boost::noncopyable { void suspend(); void resume(); void shutdown(); - int getPortNum() { - return portNum; - } private: enum State { Inactive, Stopped, Suspended, Running }; @@ -53,7 +48,6 @@ struct MongoTemporaryServer : boost::noncopyable { int serverPid; Datacratic::MessageLoop loop_; Datacratic::Runner runner_; - int portNum; }; } // namespace Mongo diff --git a/service/testing/py/mongo_temp_server_wrapping.cc b/service/testing/py/mongo_temp_server_wrapping.cc index dbe60b57..9df40b71 100644 --- a/service/testing/py/mongo_temp_server_wrapping.cc +++ b/service/testing/py/mongo_temp_server_wrapping.cc @@ -18,7 +18,7 @@ struct MongoTemporaryServerPtr { MongoTemporaryServerPtr(const std::string & uniquePath = "", const int portNum = 28356) : - mongoTmpServer(new MongoTemporaryServer(uniquePath, portNum)) + mongoTmpServer(new MongoTemporaryServer(uniquePath /*, portNum */)) { } diff --git a/service/testing/redis_temporary_server.h b/service/testing/redis_temporary_server.h index a916e5e5..a50cf3db 100644 --- a/service/testing/redis_temporary_server.h +++ b/service/testing/redis_temporary_server.h @@ -31,8 +31,10 @@ struct RedisTemporaryServer : boost::noncopyable { using namespace std; if (uniquePath == "") { ML::Env_Option tmpDir("TMP", "./tmp"); + std::string dir = tmpDir; + if(dir[0] == '/') dir.insert(0, 1, '.'); uniquePath = ML::format("%s/redis-temporary-server-%d-%d", - tmpDir.get(), getpid(), index); + dir, getpid(), index); cerr << "starting redis temporary server under unique path " << uniquePath << endl; } @@ -51,8 +53,7 @@ struct RedisTemporaryServer : boost::noncopyable { using namespace std; // Check the unique path - if (uniquePath == "" || uniquePath[0] == '/' || uniquePath == "." - || uniquePath == "..") + if (uniquePath.empty() || uniquePath == "." || uniquePath == "..") throw ML::Exception("unacceptable unique path"); // 1. Create the directory diff --git a/service/testing/runner_test.cc b/service/testing/runner_test.cc index e101f03c..080ae3ea 100644 --- a/service/testing/runner_test.cc +++ b/service/testing/runner_test.cc @@ -15,7 +15,6 @@ #include "jml/arch/timers.h" #include "jml/arch/threads.h" #include "jml/utils/exc_assert.h" -#include "jml/utils/guard.h" #include "jml/utils/string_functions.h" #include "jml/utils/vector_utils.h" #include "soa/service/message_loop.h" @@ -214,7 +213,6 @@ BOOST_AUTO_TEST_CASE( test_runner_normal_exit ) stdInSink.write(string(command)); } stdInSink.requestClose(); - runner.waitRunning(); runner.waitTermination(); BOOST_CHECK_EQUAL(result.state, RunResult::RETURNED); @@ -245,7 +243,6 @@ BOOST_AUTO_TEST_CASE( test_runner_normal_exit ) stdInSink.write(string(command)); } stdInSink.requestClose(); - runner.waitRunning(); runner.waitTermination(); BOOST_CHECK_EQUAL(result.state, RunResult::SIGNALED); @@ -281,7 +278,6 @@ BOOST_AUTO_TEST_CASE( test_runner_missing_exe ) cerr << "running 1" << endl; runner.run({"/this/command/is/missing"}, onTerminate); cerr << "running 1b" << endl; - runner.waitRunning(); runner.waitTermination(); BOOST_CHECK_EQUAL(result.state, RunResult::LAUNCH_ERROR); @@ -298,7 +294,6 @@ BOOST_AUTO_TEST_CASE( test_runner_missing_exe ) loop.addSource("runner2", runner); runner.run({"/dev/null"}, onTerminate); - runner.waitRunning(); runner.waitTermination(); BOOST_CHECK_EQUAL(result.state, RunResult::LAUNCH_ERROR); @@ -314,7 +309,6 @@ BOOST_AUTO_TEST_CASE( test_runner_missing_exe ) loop.addSource("runner2", runner); runner.run({"/dev"}, onTerminate); - runner.waitRunning(); runner.waitTermination(); BOOST_CHECK_EQUAL(result.state, RunResult::LAUNCH_ERROR); @@ -369,9 +363,6 @@ BOOST_AUTO_TEST_CASE( test_runner_cleanup ) auto nullSink = make_shared(); - auto onTerminate = [&] (const RunResult & runResult) { - }; - auto performLoop = [&] (const string & loopData) { RunnerTestHelperCommands commands; commands.sendOutput(true, loopData); @@ -388,12 +379,11 @@ BOOST_AUTO_TEST_CASE( test_runner_cleanup ) auto & stdInSink = runner.getStdInSink(); runner.run({"build/x86_64/bin/runner_test_helper"}, - onTerminate, stdOutSink, nullSink); + nullptr, stdOutSink, nullSink); for (const string & command: commands) { stdInSink.write(string(command)); } stdInSink.requestClose(); - runner.waitRunning(); runner.waitTermination(); BOOST_CHECK_EQUAL(ML::hexify_string(receivedStdOut), @@ -464,20 +454,16 @@ test_runner_no_output_delay_helper(bool stdout) loop.addSource("runner", runner); loop.start(); - auto onTerminate = [&] (const RunResult & result) { - }; - auto & stdInSink = runner.getStdInSink(); runner.run({"/usr/bin/stdbuf", "-o0", "build/x86_64/bin/runner_test_helper"}, - onTerminate, stdOutSink, stdErrSink); + nullptr, stdOutSink, stdErrSink); for (const string & command: commands) { while (!stdInSink.write(string(command))) { ML::sleep(0.1); } } stdInSink.requestClose(); - runner.waitRunning(); runner.waitTermination(); BOOST_CHECK_EQUAL(sizes[0], 6); @@ -563,7 +549,6 @@ BOOST_AUTO_TEST_CASE( test_runner_fast_execution_multiple_threads ) } #endif -#if 1 BOOST_AUTO_TEST_CASE( test_timeval_value_description ) { /* printing */ @@ -586,9 +571,7 @@ BOOST_AUTO_TEST_CASE( test_timeval_value_description ) BOOST_CHECK_EQUAL(tv.tv_usec, 3456); } } -#endif -#if 1 /* This test ensures that running a program from a thread does not cause the * program to be killed when the thread exits, due to prctl PR_SET_PDEATHSIG * (http://man7.org/linux/man-pages/man2/prctl.2.html) being active when @@ -633,82 +616,3 @@ BOOST_AUTO_TEST_CASE( test_set_prctl_from_thread ) BOOST_CHECK_EQUAL(runResult.returnCode, 0); BOOST_CHECK_EQUAL(runResult.signum, -1); } -#endif - -#if 1 -/* This test ensures that onTerminate is called with the appropriate RunResult - * when the runWrapper process fails. */ -BOOST_AUTO_TEST_CASE( test_unexisting_runner_helper ) -{ - BlockedSignals blockedSigs2(SIGCHLD); - ML::Call_Guard guard([&] { Runner::runnerHelper.clear(); }); - Runner::runnerHelper = "/this/executable/does/not/exist"; - - auto runResult = execute({"/bin/sleep", "1"}); - - BOOST_CHECK_EQUAL(runResult.state, RunResult::LAUNCH_ERROR); - BOOST_CHECK_EQUAL(runResult.returnCode, -1); - BOOST_CHECK_EQUAL(runResult.signum, -1); - BOOST_CHECK_EQUAL(runResult.processStatus(), 127); /* "command not found" */ -} -#endif - -#if 1 -/* This test ensures that onTerminate is called with the appropriate RunResult - * when the runWrapper process fails and that the handling of file descriptors - * properly separates the channels between the previous and following - * processes. */ -BOOST_AUTO_TEST_CASE( test_runner_reuse ) -{ - MessageLoop loop; - loop.start(); - - auto runner = make_shared(); - loop.addSource("runner", runner); - runner->waitConnectionState(AsyncEventSource::CONNECTED); - - int terminateCount(0); - std::mutex lock; - - vector stdouts; - string currentStdout; - auto onStdOut = [&] (string && message) { - // cerr << "received message on stdout: /" + message + "/" << endl; - currentStdout += message; - }; - auto stdOutSink = make_shared(onStdOut); - - RunResult runResult; - Runner::OnTerminate onTerminate; - onTerminate = [&] (const RunResult & result) { - stdouts.push_back(currentStdout); - currentStdout.clear(); - terminateCount++; - cerr << "terminateCount: " + to_string(terminateCount) + "\n"; - if (terminateCount < 2) { - cerr << "launching subsequent process...\n"; - auto & stdInSink = runner->getStdInSink(); - stdInSink.write("second"); - stdInSink.requestClose(); - runner->run({"/bin/cat", "-"}, onTerminate, stdOutSink); - cerr << "subsequent process started\n"; - } - else { - lock.unlock(); - } - }; - - lock.lock(); - auto & stdInSink = runner->getStdInSink(); - stdInSink.write("first"); - stdInSink.requestClose(); - runner->run({"/bin/cat", "-"}, onTerminate, stdOutSink); - - lock.lock(); - loop.shutdown(); - lock.unlock(); - - BOOST_CHECK_EQUAL(stdouts[0], "first"); - BOOST_CHECK_EQUAL(stdouts[1], "second"); -} -#endif diff --git a/service/testing/service_testing.mk b/service/testing/service_testing.mk index 5071315b..813a5ea4 100644 --- a/service/testing/service_testing.mk +++ b/service/testing/service_testing.mk @@ -1,4 +1,15 @@ -$(eval $(call library,mongo_tmp_server,mongo_temporary_server.cc, services)) +#$(eval $(call library,mongo_tmp_server,mongo_temporary_server.cc, services)) + +$(eval $(call library,custom_preload_1,custom_preload_1.cc,)) +$(eval $(call library,custom_preload_2,custom_preload_2.cc,)) +$(eval $(call library,custom_preload_3,custom_preload_3.cc,)) +$(eval $(call library,custom_preload_4,custom_preload_4.cc,)) +$(eval $(call test,service_utils_test,services,boost)) + +service_utils_test: $(LIB)/libcustom_preload_1.so \ + $(LIB)/libcustom_preload_2.so \ + $(LIB)/libcustom_preload_3.so \ + $(LIB)/libcustom_preload_4.so $(eval $(call test,epoll_test,services,boost)) $(eval $(call test,epoll_wait_test,services,boost manual)) @@ -37,7 +48,7 @@ $(eval $(call test,message_loop_test,services,boost)) $(eval $(call program,runner_test_helper,utils)) $(eval $(call test,runner_test,services,boost)) -$(eval $(call test,runner_stress_test,services,boost)) +$(eval $(call test,runner_stress_test,services,boost manual)) $(TESTS)/runner_test $(TESTS)/runner_stress_test: $(BIN)/runner_test_helper $(eval $(call test,sink_test,services,boost)) @@ -63,6 +74,5 @@ $(eval $(call test,sns_mock_test,cloud services,boost)) $(eval $(call test,zmq_message_loop_test,services,boost)) $(eval $(call test,event_handler_test,cloud services,boost manual)) -$(eval $(call test,mongo_basic_test,services boost_filesystem mongo_tmp_server,boost manual)) - -$(eval $(call include_sub_makes,py)) +#$(eval $(call test,mongo_basic_test,services boost_filesystem mongo_tmp_server,boost manual)) +#$(eval $(call include_sub_makes,py)) diff --git a/service/testing/service_utils_test.cc b/service/testing/service_utils_test.cc new file mode 100644 index 00000000..597ef586 --- /dev/null +++ b/service/testing/service_utils_test.cc @@ -0,0 +1,49 @@ +/** service_utils_test.cc -*- C++ -*- + Sirma Cagil Altay, 20 Oct 2015 + Copyright (c) 2015 Datacratic. All rights reserved. + + Test dynamic library loading ability of ServiceProxyArguments +*/ + +#define BOOST_TEST_MAIN +#define BOOST_TEST_DYN_LINK + +#include +#include +#include + +#include +#include + +#include "jml/utils/filter_streams.h" +#include "jml/utils/environment.h" +#include "soa/service/testing/dynamic_loading_test_table.h" + +#include "soa/service/service_utils.h" + +using namespace std; +using namespace ML; +using namespace Datacratic; + +static const string buildOptions("build/x86_64/bin/libcustom_preload_1.so," + "soa/service/testing/libs-to-dynamically-load"); +static const string envOptions("build/x86_64/bin/libcustom_preload_4"); +static const string RtbkitPreload("RTBKIT_PRELOAD"); + +BOOST_AUTO_TEST_CASE( test_service_utils_preload ) +{ + BOOST_REQUIRE_EQUAL(TEST::DynamicLoading::custom_lib_1,0); + BOOST_REQUIRE_EQUAL(TEST::DynamicLoading::custom_lib_2,0); + BOOST_REQUIRE_EQUAL(TEST::DynamicLoading::custom_lib_3,0); + BOOST_REQUIRE_EQUAL(TEST::DynamicLoading::custom_lib_4,0); + + ServiceProxyArguments myService; + myService.preload = buildOptions; + setenv(RtbkitPreload.c_str(),envOptions.c_str(),1); + myService.makeServiceProxies(); + + BOOST_REQUIRE_EQUAL(TEST::DynamicLoading::custom_lib_1,1); + BOOST_REQUIRE_EQUAL(TEST::DynamicLoading::custom_lib_2,1); + BOOST_REQUIRE_EQUAL(TEST::DynamicLoading::custom_lib_3,1); + BOOST_REQUIRE_EQUAL(TEST::DynamicLoading::custom_lib_4,1); +} diff --git a/service/zookeeper.cc b/service/zookeeper.cc index b8c9fa03..b43349fe 100644 --- a/service/zookeeper.cc +++ b/service/zookeeper.cc @@ -69,8 +69,9 @@ ZookeeperCallbackManager::popCallback(uintptr_t id) auto found = callbacks_.find(id); if( found != callbacks_.end()) { + auto cb = found->second.callback; callbacks_.erase(id); - return found->second.callback; + return cb; } return nullptr; } diff --git a/tinyxml2 b/tinyxml2 deleted file mode 160000 index 649d1caf..00000000 --- a/tinyxml2 +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 649d1caf2d17b9ad4fb5b712e6e1884ddf89747b diff --git a/types/basic_value_descriptions.h b/types/basic_value_descriptions.h index e5d4075e..59e3986d 100644 --- a/types/basic_value_descriptions.h +++ b/types/basic_value_descriptions.h @@ -36,7 +36,7 @@ struct DefaultDescription val->val2 == 0 && val->val1 <= std::numeric_limits::max()) { context.writeInt(val->val1); } else { - context.writeString(val->toString()); + context.writeStringUtf8(Utf8String(val->toString())); } } @@ -51,7 +51,7 @@ struct StringIdDescription: public DefaultDescription { virtual void printJsonTyped(const Datacratic::Id * val, JsonPrintingContext & context) const { - context.writeString(val->toString()); + context.writeStringUtf8(Utf8String(val->toString())); } }; diff --git a/types/id.cc b/types/id.cc index 8487d7ab..7667111a 100644 --- a/types/id.cc +++ b/types/id.cc @@ -516,10 +516,6 @@ uint64_t Id:: complexHash() const { -#if ID_HASH_AS_STRING - std::string converted = toString(); - return CityHash64(converted.c_str(), converted.size()); -#else if (type == STR) return CityHash64(str, len); else if (type == COMPOUND2) { @@ -529,7 +525,6 @@ complexHash() const //else if (type == CUSTOM) // return controlFn(CF_HASH, data); else throw ML::Exception("unknown Id type"); -#endif } void diff --git a/types/id.h b/types/id.h index 58b7bd71..3f98d985 100644 --- a/types/id.h +++ b/types/id.h @@ -205,14 +205,8 @@ struct Id { uint64_t hash() const { if (type == NONE || type == NULLID) return 0; -#if ID_HASH_AS_STRING - if (type == STR) - return CityHash64(str, len); - return complexHash(); -#else if (JML_UNLIKELY(type >= STR)) return complexHash(); return Hash128to64(std::make_pair(val1, val2)); -#endif } bool complexEqual(const Id & other) const; diff --git a/types/json_parsing.cc b/types/json_parsing.cc index 18a24bf6..17c38ce4 100644 --- a/types/json_parsing.cc +++ b/types/json_parsing.cc @@ -79,6 +79,7 @@ expectStringUtf8() case '\\':c = '\\'; break; case '"': c = '"'; break; case 'u': { + (void) *(*context); int code = context->expect_hex4(); c = code; break; diff --git a/types/json_parsing.h b/types/json_parsing.h index 7c71a0b3..73071969 100644 --- a/types/json_parsing.h +++ b/types/json_parsing.h @@ -815,15 +815,8 @@ void parseJson(Id * output, Context & context) using namespace std; if (context.isString()) { - char buffer[4096]; - ssize_t realSize = context.expectStringAscii(buffer, sizeof(buffer)); - if (realSize > -1) { - *output = Id(buffer, realSize); - } - else { - std::string value = context.expectStringAscii(); - *output = Id(value); - } + Utf8String value = context.expectStringUtf8(); + *output = Id(value.rawString()); return; } diff --git a/types/testing/id_test.cc b/types/testing/id_test.cc index c0017a2c..35c92fb7 100644 --- a/types/testing/id_test.cc +++ b/types/testing/id_test.cc @@ -348,27 +348,3 @@ BOOST_AUTO_TEST_CASE( test_compound_id ) { Id id(Id("hello"), Id("world")); } - -#if ID_HASH_AS_STRING -BOOST_AUTO_TEST_CASE( test_hash_as_string ) -{ - /* COMPOUND */ - { - Id typical(string("hello:big:world"), Id::STR); - Id id(Id("hello"), Id("big:world")); - BOOST_CHECK_EQUAL(typical.hash(), id.hash()); - } - - /* BIGDEC */ - { - Id typical(string("12345678"), Id::STR); - BOOST_CHECK_EQUAL(typical.type, Id::STR); - Id id(12345678); - BOOST_CHECK_EQUAL(typical.hash(), id.hash()); - - id = Id("12345678"); - BOOST_CHECK_EQUAL(id.type, Id::BIGDEC); - BOOST_CHECK_EQUAL(typical.hash(), id.hash()); - } -} -#endif diff --git a/utils/fnv_hash.h b/utils/fnv_hash.h new file mode 100644 index 00000000..085487cd --- /dev/null +++ b/utils/fnv_hash.h @@ -0,0 +1,76 @@ +/** fnv_hash.h -*- C++ -*- + jsbejeau, 17 Nov 2015 + Copyright (c) 2015 Datacratic. All rights reserved. + + FNV Hash : Same implementation as Go +*/ + +namespace Datacratic { + +/******************************************************************************/ +/* FNV HASH */ +/******************************************************************************/ + +const uint32_t offset32 = 2166136261; +const uint64_t offset64 = 14695981039346656037ULL; +const uint64_t prime32 = 16777619 ; +const uint64_t prime64 = 1099511628211L; + +uint32_t fnv_hash32(const std::string &str) { + + uint32_t hash{offset32}; + auto i = 0; + + while(str[i]) { + hash *= prime32; + hash ^= str[i]; + i++; + } + + return hash; +} + +uint32_t fnv_hash32a(const std::string &str) { + + uint32_t hash{offset32}; + auto i = 0; + + while(str[i]) { + hash ^= str[i]; + hash *= prime32; + i++; + } + + return hash; +} + +uint64_t fnv_hash64(const std::string &str) { + + uint64_t hash{offset64}; + auto i = 0; + + while(str[i]) { + hash *= prime64; + hash ^= str[i]; + i++; + } + + return hash; +} + +uint64_t fnv_hash64a(const std::string &str) { + + uint64_t hash{offset64}; + auto i = 0; + + while(str[i]) { + hash ^= str[i]; + hash *= prime64; + i++; + } + + return hash; +} + +} // namespace Datacratic + diff --git a/utils/mongo_init.h b/utils/mongo_init.h new file mode 100644 index 00000000..98c9ab5b --- /dev/null +++ b/utils/mongo_init.h @@ -0,0 +1,33 @@ +/** + * mongo_init.h + * Mich, 2015-09-03 + * Copyright (c) 2015 Datacratic. All rights reserved. + **/ +#pragma once + +#include "mongo/bson/bson.h" +#include "mongo/util/net/hostandport.h" + + +using namespace mongo; + +namespace Datacratic { + bool _mongoInitialized; + +struct MongoAtInit { + + MongoAtInit() { + if (!_mongoInitialized) { + _mongoInitialized = true; + std::cerr << __FILE__ << ":" << __LINE__ << std::endl; + using mongo::client::initialize; + using mongo::client::Options; + auto status = initialize(); + if (!status.isOK()) { + throw ML::Exception("Mongo initialize failed"); + } + } + } +} atInit; + +} diff --git a/utils/testing/fnv_hash_test.cc b/utils/testing/fnv_hash_test.cc new file mode 100644 index 00000000..bf4b9518 --- /dev/null +++ b/utils/testing/fnv_hash_test.cc @@ -0,0 +1,17 @@ +#define BOOST_TEST_MAIN +#define BOOST_TEST_DYN_LINK + +#include +#include +#include + +using namespace std; +using namespace Datacratic; + +BOOST_AUTO_TEST_CASE( fnv_hash ) +{ + BOOST_CHECK_EQUAL(fnv_hash32("abcdef"),2670544664); + BOOST_CHECK_EQUAL(fnv_hash32a("abcdef"),4282878506); + BOOST_CHECK_EQUAL(fnv_hash64("abcdef"),2594670854942755800ULL); + BOOST_CHECK_EQUAL(fnv_hash64a("abcdef"),15567776504244095498ULL); +} diff --git a/utils/testing/testing.mk b/utils/testing/testing.mk index 2547797d..7c7ea603 100644 --- a/utils/testing/testing.mk +++ b/utils/testing/testing.mk @@ -9,5 +9,6 @@ $(eval $(call test,fixture_test,test_utils,boost)) $(eval $(call test,print_utils_test,,boost)) $(eval $(call test,variadic_hash_test,variadic_hash,boost)) +$(eval $(call test,fnv_hash_test,,boost)) $(eval $(call test,type_traits_test,,boost)) $(eval $(call test,scope_test,arch,boost)) diff --git a/valgrind.supp b/valgrind.supp index 6d59a6c8..01059f4f 100644 --- a/valgrind.supp +++ b/valgrind.supp @@ -48,6 +48,26 @@ ... } +{ + Logging::Category singletons + Memcheck:Leak + ... + fun:_ZN10Datacratic7Logging8CategoryC1EPKcS3_ + ... + fun:_dl_init + obj:/lib/x86_64-linux-gnu/ld-2.15.so +} + +{ + Logging::Category singletons + Memcheck:Leak + ... + fun:_ZN10Datacratic7Logging8CategoryC1EPKcb + ... + fun:_dl_init + obj:/lib/x86_64-linux-gnu/ld-2.15.so +} + { tcmalloc trips valgrind Memcheck:Param