Skip to content

Commit

Permalink
Hookup check cache with cache keys. (#45)
Browse files Browse the repository at this point in the history
* Hookup check cache with cache keys.

* Update comment.

* update comment again.
  • Loading branch information
qiwzhang authored Mar 22, 2017
1 parent 370f4b7 commit 4cd95ef
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 55 deletions.
5 changes: 5 additions & 0 deletions mixerclient/include/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#define MIXERCLIENT_OPTIONS_H

#include <memory>
#include <set>

namespace istio {
namespace mixer_client {
Expand Down Expand Up @@ -55,6 +56,10 @@ struct CheckOptions {
// deletion is triggered by a timer. This value must be larger than
// flush_interval_ms.
const int expiration_ms;

// Only the attributes in this set are used to caclculate cache key.
// If empty, check cache is disabled.
std::set<std::string> cache_keys;
};

// Options controlling report behavior.
Expand Down
33 changes: 20 additions & 13 deletions mixerclient/src/check_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ CheckCache::CheckCache(const CheckOptions& options) : options_(options) {
flush_interval_in_cycle_ =
options_.flush_interval_ms * SimpleCycleTimer::Frequency() / 1000;

if (options.num_entries >= 0) {
if (options.num_entries >= 0 && !options.cache_keys.empty()) {
cache_.reset(new CheckLRUCache(
options.num_entries, std::bind(&CheckCache::OnCacheEntryDelete, this,
std::placeholders::_1)));
Expand All @@ -50,9 +50,19 @@ bool CheckCache::ShouldFlush(const CacheElem& elem) {
return age >= flush_interval_in_cycle_;
}

Status CheckCache::Check(const Attributes& attributes,
CheckResponse* response) {
string request_signature = GenerateSignature(attributes);
Status CheckCache::Check(const Attributes& attributes, CheckResponse* response,
std::string* signature) {
if (!cache_) {
// By returning NOT_FOUND, caller will send request to server.
return Status(Code::NOT_FOUND, "");
}

std::string request_signature =
GenerateSignature(attributes, options_.cache_keys);
if (signature) {
*signature = request_signature;
}

std::lock_guard<std::mutex> lock(cache_mutex_);
CheckLRUCache::ScopedLookup lookup(cache_.get(), request_signature);

Expand All @@ -75,12 +85,10 @@ Status CheckCache::Check(const Attributes& attributes,
return Status::OK;
}

Status CheckCache::CacheResponse(const Attributes& attributes,
Status CheckCache::CacheResponse(const std::string& request_signature,
const CheckResponse& response) {
std::lock_guard<std::mutex> lock(cache_mutex_);

if (cache_) {
string request_signature = GenerateSignature(attributes);
std::lock_guard<std::mutex> lock(cache_mutex_);
CheckLRUCache::ScopedLookup lookup(cache_.get(), request_signature);

int64_t now = SimpleCycleTimer::Now();
Expand All @@ -97,12 +105,12 @@ Status CheckCache::CacheResponse(const Attributes& attributes,
return Status::OK;
}

// TODO: need to hook up a timer to call Flush.
// Flush aggregated requests whom are longer than flush_interval.
// Called at time specified by GetNextFlushInterval().
Status CheckCache::Flush() {
std::lock_guard<std::mutex> lock(cache_mutex_);

if (cache_) {
std::lock_guard<std::mutex> lock(cache_mutex_);
cache_->RemoveExpiredEntries();
}

Expand All @@ -114,10 +122,9 @@ void CheckCache::OnCacheEntryDelete(CacheElem* elem) { delete elem; }
// Flush out aggregated check requests, clear all cache items.
// Usually called at destructor.
Status CheckCache::FlushAll() {
GOOGLE_LOG(INFO) << "Remove all entries of check cache.";
std::lock_guard<std::mutex> lock(cache_mutex_);

if (cache_) {
GOOGLE_LOG(INFO) << "Remove all entries of check cache.";
std::lock_guard<std::mutex> lock(cache_mutex_);
cache_->RemoveAll();
}

Expand Down
6 changes: 4 additions & 2 deletions mixerclient/src/check_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "include/client.h"
#include "include/options.h"
#include "mixer/v1/service.pb.h"
#include "src/signature.h"
#include "utils/simple_lru_cache.h"
#include "utils/simple_lru_cache_inl.h"

Expand All @@ -45,11 +46,12 @@ class CheckCache {
// caller has to send the request to mixer.
// Otherwise, returns OK and cached response.
virtual ::google::protobuf::util::Status Check(
const Attributes& request, ::istio::mixer::v1::CheckResponse* response);
const Attributes& request, ::istio::mixer::v1::CheckResponse* response,
std::string* signature);

// Caches a response from a remote mixer call.
virtual ::google::protobuf::util::Status CacheResponse(
const Attributes& request,
const std::string& signature,
const ::istio::mixer::v1::CheckResponse& response);

// Invalidates expired check responses.
Expand Down
65 changes: 45 additions & 20 deletions mixerclient/src/check_cache_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,13 @@ class CheckCacheTest : public ::testing::Test {
ASSERT_TRUE(
TextFormat::ParseFromString(kSuccessResponse1, &pass_response1_));
CheckOptions options(1 /*entries*/, kFlushIntervalMs, kExpirationMs);
options.cache_keys = {"string-key"};

cache_ = std::unique_ptr<CheckCache>(new CheckCache(options));
ASSERT_TRUE((bool)(cache_));

Attributes::Value string_value;
string_value.type = Attributes::Value::STRING;
string_value.str_v = "this-is-a-string-value";
attributes1_.attributes["string-key"] = string_value;
attributes1_.attributes["string-key"] =
Attributes::StringValue("this-is-a-string-value");
}

Attributes attributes1_;
Expand All @@ -69,56 +68,82 @@ class CheckCacheTest : public ::testing::Test {
std::unique_ptr<CheckCache> cache_;
};

TEST_F(CheckCacheTest, TestDisableCache) {
TEST_F(CheckCacheTest, TestDisableCache1) {
// 0 cache entries. cache is disabled
CheckOptions options(0 /*entries*/, 1000, 2000);
options.cache_keys = {"string-key"};
cache_ = std::unique_ptr<CheckCache>(new CheckCache(options));

ASSERT_TRUE((bool)(cache_));
CheckResponse response;
EXPECT_ERROR_CODE(Code::NOT_FOUND, cache_->Check(attributes1_, &response));

EXPECT_ERROR_CODE(Code::NOT_FOUND,
cache_->Check(attributes1_, &response, nullptr));
}

TEST_F(CheckCacheTest, TestDisableCache2) {
// empty cache keys. cache is disabled
CheckOptions options(1000 /*entries*/, 1000, 2000);
cache_ = std::unique_ptr<CheckCache>(new CheckCache(options));

ASSERT_TRUE((bool)(cache_));
CheckResponse response;

EXPECT_ERROR_CODE(Code::NOT_FOUND,
cache_->Check(attributes1_, &response, nullptr));
}

TEST_F(CheckCacheTest, TestCachePassResponses) {
CheckResponse response;
EXPECT_ERROR_CODE(Code::NOT_FOUND, cache_->Check(attributes1_, &response));
std::string signature;
EXPECT_ERROR_CODE(Code::NOT_FOUND,
cache_->Check(attributes1_, &response, &signature));

EXPECT_OK(cache_->CacheResponse(attributes1_, pass_response1_));
EXPECT_OK(cache_->Check(attributes1_, &response));
EXPECT_OK(cache_->CacheResponse(signature, pass_response1_));
EXPECT_OK(cache_->Check(attributes1_, &response, &signature));
}

TEST_F(CheckCacheTest, TestRefresh) {
CheckResponse response;
EXPECT_ERROR_CODE(Code::NOT_FOUND, cache_->Check(attributes1_, &response));
EXPECT_OK(cache_->CacheResponse(attributes1_, pass_response1_));
EXPECT_OK(cache_->Check(attributes1_, &response));
std::string signature;
EXPECT_ERROR_CODE(Code::NOT_FOUND,
cache_->Check(attributes1_, &response, &signature));

EXPECT_OK(cache_->CacheResponse(signature, pass_response1_));
EXPECT_OK(cache_->Check(attributes1_, &response, &signature));
// sleep 0.12 second.
usleep(120000);

// First one should be NOT_FOUND for refresh
EXPECT_ERROR_CODE(Code::NOT_FOUND, cache_->Check(attributes1_, &response));
EXPECT_ERROR_CODE(Code::NOT_FOUND,
cache_->Check(attributes1_, &response, &signature));
// Second one use cached response.
EXPECT_OK(cache_->CacheResponse(attributes1_, pass_response1_));
EXPECT_OK(cache_->Check(attributes1_, &response));
EXPECT_OK(cache_->CacheResponse(signature, pass_response1_));
EXPECT_OK(cache_->Check(attributes1_, &response, &signature));
EXPECT_TRUE(MessageDifferencer::Equals(response, pass_response1_));

EXPECT_OK(cache_->FlushAll());
EXPECT_ERROR_CODE(Code::NOT_FOUND, cache_->Check(attributes1_, &response));
EXPECT_ERROR_CODE(Code::NOT_FOUND,
cache_->Check(attributes1_, &response, &signature));
}

TEST_F(CheckCacheTest, TestCacheExpired) {
CheckResponse response;
EXPECT_ERROR_CODE(Code::NOT_FOUND, cache_->Check(attributes1_, &response));
std::string signature;
EXPECT_ERROR_CODE(Code::NOT_FOUND,
cache_->Check(attributes1_, &response, &signature));

EXPECT_OK(cache_->CacheResponse(attributes1_, pass_response1_));
EXPECT_OK(cache_->Check(attributes1_, &response));
EXPECT_OK(cache_->CacheResponse(signature, pass_response1_));
EXPECT_OK(cache_->Check(attributes1_, &response, &signature));
EXPECT_TRUE(MessageDifferencer::Equals(response, pass_response1_));

// sleep 0.22 second to cause cache expired.
usleep(220000);
EXPECT_OK(cache_->Flush());

// First one should be NOT_FOUND for refresh
EXPECT_ERROR_CODE(Code::NOT_FOUND, cache_->Check(attributes1_, &response));
EXPECT_ERROR_CODE(Code::NOT_FOUND,
cache_->Check(attributes1_, &response, &signature));
}

} // namespace mixer_client
Expand Down
38 changes: 28 additions & 10 deletions mixerclient/src/client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,21 +40,39 @@ MixerClientImpl::MixerClientImpl(const MixerClientOptions &options)
check_transport_.reset(new CheckTransport(transport));
report_transport_.reset(new ReportTransport(transport));
quota_transport_.reset(new QuotaTransport(transport));

check_cache_ =
std::unique_ptr<CheckCache>(new CheckCache(options.check_options));
}

MixerClientImpl::~MixerClientImpl() {}
MixerClientImpl::~MixerClientImpl() { check_cache_->FlushAll(); }

void MixerClientImpl::Check(const Attributes &attributes, DoneFunc on_done) {
auto response = new CheckResponse;
check_transport_->Send(attributes, response,
[response, on_done](const Status &status) {
if (status.ok()) {
on_done(ConvertRpcStatus(response->result()));
} else {
on_done(status);
}
delete response;
});
std::string signature;
Status status = check_cache_->Check(attributes, response, &signature);
if (status.error_code() == Code::NOT_FOUND) {
std::shared_ptr<CheckCache> check_cache_copy = check_cache_;
check_transport_->Send(
attributes, response,
[check_cache_copy, signature, response, on_done](const Status &status) {
if (status.ok()) {
check_cache_copy->CacheResponse(signature, *response);
on_done(ConvertRpcStatus(response->result()));
} else {
on_done(status);
}
delete response;
});
return;
}

if (status.ok()) {
on_done(ConvertRpcStatus(response->result()));
} else {
on_done(status);
}
delete response;
}

void MixerClientImpl::Report(const Attributes &attributes, DoneFunc on_done) {
Expand Down
3 changes: 3 additions & 0 deletions mixerclient/src/client_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#define MIXERCLIENT_CLIENT_IMPL_H

#include "include/client.h"
#include "src/check_cache.h"
#include "src/grpc_transport.h"
#include "src/transport.h"

Expand All @@ -42,6 +43,8 @@ class MixerClientImpl : public MixerClient {
std::unique_ptr<QuotaTransport> quota_transport_;
std::unique_ptr<GrpcTransport> grpc_transport_;

std::shared_ptr<CheckCache> check_cache_;

GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(MixerClientImpl);
};

Expand Down
7 changes: 6 additions & 1 deletion mixerclient/src/signature.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@ const char kDelimiter[] = "\0";
const int kDelimiterLength = 1;
} // namespace

string GenerateSignature(const Attributes& attributes) {
string GenerateSignature(const Attributes& attributes,
const std::set<std::string>& cache_keys) {
MD5 hasher;

for (const auto& attribute : attributes.attributes) {
// Skip the attributes not in the cache keys
if (cache_keys.find(attribute.first) == cache_keys.end()) {
continue;
}
hasher.Update(attribute.first);
hasher.Update(kDelimiter, kDelimiterLength);
switch (attribute.second.type) {
Expand Down
3 changes: 2 additions & 1 deletion mixerclient/src/signature.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ namespace istio {
namespace mixer_client {

// Generates signature for Attributes.
std::string GenerateSignature(const Attributes& attributes);
std::string GenerateSignature(const Attributes& attributes,
const std::set<std::string>& cache_keys);

} // namespace mixer_client
} // namespace istio
Expand Down
24 changes: 16 additions & 8 deletions mixerclient/src/signature_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,47 +64,55 @@ class SignatureUtilTest : public ::testing::Test {
attributes_.attributes[key] = Attributes::StringMapValue(std::move(value));
}

std::set<std::string> GetKeys() const {
std::set<std::string> keys;
for (const auto& it : attributes_.attributes) {
keys.insert(it.first);
}
return keys;
}

Attributes attributes_;
};

TEST_F(SignatureUtilTest, Attributes) {
AddString("string-key", "this is a string value");
EXPECT_EQ("26f8f724383c46e7f5803380ab9c17ba",
MD5::DebugString(GenerateSignature(attributes_)));
MD5::DebugString(GenerateSignature(attributes_, GetKeys())));

AddBytes("bytes-key", "this is a bytes value");
EXPECT_EQ("1f409524b79b9b5760032dab7ecaf960",
MD5::DebugString(GenerateSignature(attributes_)));
MD5::DebugString(GenerateSignature(attributes_, GetKeys())));

AddDoublePair("double-key", 99.9);
EXPECT_EQ("6183342ff222018f6300de51cdcd4501",
MD5::DebugString(GenerateSignature(attributes_)));
MD5::DebugString(GenerateSignature(attributes_, GetKeys())));

AddInt64Pair("int-key", 35);
EXPECT_EQ("d681b9c72d648f9c831d95b4748fe1c2",
MD5::DebugString(GenerateSignature(attributes_)));
MD5::DebugString(GenerateSignature(attributes_, GetKeys())));

AddBoolPair("bool-key", true);
EXPECT_EQ("958930b41f0d8b43f5c61c31b0b092e2",
MD5::DebugString(GenerateSignature(attributes_)));
MD5::DebugString(GenerateSignature(attributes_, GetKeys())));

// default to Clock's epoch.
std::chrono::time_point<std::chrono::system_clock> time_point;
AddTime("time-key", time_point);
EXPECT_EQ("f7dd61e1a5881e2492d93ad023ab49a2",
MD5::DebugString(GenerateSignature(attributes_)));
MD5::DebugString(GenerateSignature(attributes_, GetKeys())));

std::chrono::seconds secs(5);
AddDuration("duration-key",
std::chrono::duration_cast<std::chrono::nanoseconds>(secs));
EXPECT_EQ("13ae11ea2bea216da46688cd9698645e",
MD5::DebugString(GenerateSignature(attributes_)));
MD5::DebugString(GenerateSignature(attributes_, GetKeys())));

std::map<std::string, std::string> string_map = {{"key1", "value1"},
{"key2", "value2"}};
AddStringMap("string-map-key", std::move(string_map));
EXPECT_EQ("c861f02e251c896513eb0f7c97aa2ce7",
MD5::DebugString(GenerateSignature(attributes_)));
MD5::DebugString(GenerateSignature(attributes_, GetKeys())));
}

} // namespace
Expand Down

0 comments on commit 4cd95ef

Please sign in to comment.