Skip to content

Commit

Permalink
[fix](regression) fix file cache lru ttl evict regression (#41296)
Browse files Browse the repository at this point in the history
  • Loading branch information
freemandealer authored Oct 15, 2024
1 parent bbbc660 commit 07ba5eb
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 7 deletions.
9 changes: 9 additions & 0 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ void register_suites() {
sp->set_call_back("VOlapTableSink::close",
[](auto&&) { std::this_thread::sleep_for(std::chrono::seconds(5)); });
});
// curl be_ip:http_port/api/injection_point/apply_suite?name=test_ttl_lru_evict'
suite_map.emplace("test_ttl_lru_evict", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("BlockFileCache::change_limit1", [](auto&& args) {
LOG(INFO) << "BlockFileCache::change_limit1";
auto* limit = try_any_cast<size_t*>(args[0]);
*limit = 1;
});
});
suite_map.emplace("test_file_segment_cache_corruption", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("Segment::open:corruption", [](auto&& args) {
Expand Down
4 changes: 4 additions & 0 deletions be/src/io/cache/block_file_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "io/cache/block_file_cache.h"

#include "common/status.h"
#include "cpp/sync_point.h"

#if defined(__APPLE__)
#include <sys/mount.h>
Expand Down Expand Up @@ -872,6 +873,9 @@ bool BlockFileCache::try_reserve_for_ttl_without_lru(size_t size,
size_t removed_size = 0;
size_t cur_cache_size = _cur_cache_size;
auto limit = config::max_ttl_cache_ratio * _capacity;

TEST_INJECTION_POINT_CALLBACK("BlockFileCache::change_limit1", &limit);

if ((_cur_ttl_size + size) * 100 > limit) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,17 @@ import org.apache.http.impl.client.LaxRedirectStrategy;
// - set smaller max_ttl_cache_ratio in this test

suite("test_ttl_lru_evict") {
sql """ use @regression_cluster_name1 """
def ttlProperties = """ PROPERTIES("file_cache_ttl_seconds"="3600") """
//sql """ use @regression_cluster_name1 """
sql """ use @compute_cluster """
def ttlProperties = """ PROPERTIES("file_cache_ttl_seconds"="150") """
String[][] backends = sql """ show backends """
String backendId;
def backendIdToBackendIP = [:]
def backendIdToBackendHttpPort = [:]
def backendIdToBackendBrpcPort = [:]
for (String[] backend in backends) {
if (backend[9].equals("true") && backend[19].contains("regression_cluster_name1")) {
// if (backend[9].equals("true") && backend[19].contains("regression_cluster_name1")) {
if (backend[9].equals("true") && backend[19].contains("compute_cluster")) {
backendIdToBackendIP.put(backend[0], backend[1])
backendIdToBackendHttpPort.put(backend[0], backend[4])
backendIdToBackendBrpcPort.put(backend[0], backend[5])
Expand Down Expand Up @@ -141,11 +143,43 @@ suite("test_ttl_lru_evict") {
}
}

long org_max_ttl_cache_ratio = Long.parseLong(get_be_param.call("max_ttl_cache_ratio"))
def enable_be_injection = {
for (String id in backendIdToBackendIP.keySet()) {
def beIp = backendIdToBackendIP.get(id)
def bePort = backendIdToBackendHttpPort.get(id)
def (code, out, err) = curl("GET", String.format("http://%s:%s/api/injection_point/enable", beIp, bePort))
assertTrue(out.contains("OK"))
}
}

try {
set_be_param.call("max_ttl_cache_ratio", "2")
def clear_be_injection = {
for (String id in backendIdToBackendIP.keySet()) {
def beIp = backendIdToBackendIP.get(id)
def bePort = backendIdToBackendHttpPort.get(id)
def (code, out, err) = curl("GET", String.format("http://%s:%s/api/injection_point/clear", beIp, bePort))
assertTrue(out.contains("OK"))
}
}

def disable_be_injection = {
for (String id in backendIdToBackendIP.keySet()) {
def beIp = backendIdToBackendIP.get(id)
def bePort = backendIdToBackendHttpPort.get(id)
def (code, out, err) = curl("GET", String.format("http://%s:%s/api/injection_point/disable", beIp, bePort))
assertTrue(out.contains("OK"))
}
}

def injection_limit_for_all_be = {
for (String id in backendIdToBackendIP.keySet()) {
def beIp = backendIdToBackendIP.get(id)
def bePort = backendIdToBackendHttpPort.get(id)
def (code, out, err) = curl("GET", String.format("http://%s:%s/api/injection_point/apply_suite?name=test_ttl_lru_evict", beIp, bePort))
assertTrue(out.contains("OK"))
}
}

try {
def tables = [customer_ttl: 15000000]
def s3BucketName = getS3BucketName()
def s3WithProperties = """WITH S3 (
Expand Down Expand Up @@ -219,6 +253,10 @@ suite("test_ttl_lru_evict") {
// load for MANY times to this dup table to make cache full enough to evict
// It will takes huge amount of time, so it should be p1/p2 level case.
// But someone told me it is the right place. Never mind just do it!

enable_be_injection()
injection_limit_for_all_be()

for (int i = 0; i < 10; i++) {
load_customer_once("customer_ttl")
}
Expand Down Expand Up @@ -333,6 +371,7 @@ suite("test_ttl_lru_evict") {
assertTrue(flag1)
}
} finally {
set_be_param.call("max_ttl_cache_ratio", org_max_ttl_cache_ratio.toString())
clear_be_injection()
disable_be_injection()
}
}

0 comments on commit 07ba5eb

Please sign in to comment.