Skip to content

Commit

Permalink
Merged functions to remove code duplication.
Browse files Browse the repository at this point in the history
Made changes to set a timeout on DNS requests.

Signed-off-by: Noman Ali Bajwa <nmnbajwa@gmail.com>
  • Loading branch information
Noman-Ali-Bajwa committed Oct 18, 2022
1 parent 6b95316 commit a3e9243
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 95 deletions.
113 changes: 27 additions & 86 deletions src/stirling/source_connectors/socket_tracer/protocols/dns/stitcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <rapidjson/stringbuffer.h>
#include <rapidjson/writer.h>

#include <chrono>
#include <deque>
#include <string>
#include <utility>
Expand All @@ -32,6 +33,9 @@
DEFINE_bool(include_respless_dns_requests, false,
"If true, use customStitchFrames otherwise uses simple StitchFrames");

DEFINE_uint64(dns_request_timeout_threshold_milliseconds, 2000,
"Number of seconds to wait for the in-flight response of a dns request.");

namespace px {
namespace stirling {
namespace protocols {
Expand Down Expand Up @@ -178,8 +182,9 @@ StatusOr<Record> ProcessReqRespPair(const Frame& req_frame, const Frame& resp_fr
// Currently StitchFrames() uses a response-led matching algorithm.
// For each response that is at the head of the deque, there should exist a previous request with
// the same txid. Find it, and consume both frames.
RecordsWithErrorCount<Record> PxStitchFrames(std::deque<Frame>* req_frames,
std::deque<Frame>* resp_frames) {
RecordsWithErrorCount<Record> StitchFrames(std::deque<Frame>* req_frames,
std::deque<Frame>* resp_frames,
bool include_respless_dns_requests) {
std::vector<Record> entries;
int error_count = 0;

Expand Down Expand Up @@ -240,97 +245,33 @@ RecordsWithErrorCount<Record> PxStitchFrames(std::deque<Frame>* req_frames,

resp_frames->clear();

return {entries, error_count};
}

// CustomStitchFrames is a re-implementation of the existing StitchFrames with an added
// ability to record those DNS requests as well for which we could not match any response.
// Flag include_respless_dns_requests must be set to include such DNS requests.
RecordsWithErrorCount<Record> CustomStitchFrames(std::deque<Frame>* req_frames,
std::deque<Frame>* resp_frames) {
std::vector<Record> entries;
int error_count = 0;

for (auto& resp_frame : *resp_frames) {
bool found_match = false;

// Search for matching req frame
for (auto& req_frame : *req_frames) {
// If the request timestamp is after the response, then it can't be the match.
// Nor can any subsequent requests either, so stop searching.

if (req_frame.timestamp_ns > resp_frame.timestamp_ns) {
break;
}

if (resp_frame.header.txid == req_frame.header.txid) {
StatusOr<Record> record_status = ProcessReqRespPair(req_frame, resp_frame);
if (record_status.ok()) {
entries.push_back(record_status.ConsumeValueOrDie());
} else {
VLOG(1) << record_status.ToString();
++error_count;
}

// Found a match, so remove both request and response.
// We don't remove request frames on the fly, however,
// because it could otherwise cause unnecessary churn/copying in the deque.
// This is due to the fact that responses can come out-of-order.
// Just mark the request as consumed, and clean-up when they reach the head of the queue.
// Note that responses are always head-processed, so they don't require this optimization.
found_match = true;
req_frame.consumed = true;
break;
}
}

if (!found_match) {
VLOG(1) << absl::Substitute("Did not find a request matching the response. TXID = $0",
resp_frame.header.txid);
++error_count;
}

// Clean-up consumed frames at the head.
// Do this inside the resp loop to aggressively clean-out req_frames whenever a frame consumed.
// Should speed up the req_frames search for the next iteration.
if (include_respless_dns_requests) {
// After the external loop's lifecycle comes to an end we end up with the request deque
// having only those request frames which have not been consumed yet i.e. consumed = false
// so essentially these are the requests which could not be matched with any response frame.
// Hence we iterate over this request deque, add a default response to it, make a record and
// append those records at the end of the entries vector.
auto it = req_frames->begin();
while (it != req_frames->end()) {
if (!(*it).consumed) {
break;
auto current_time = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
auto elapsed_seconds = current_time - (*it).timestamp_ns;

if (elapsed_seconds > FLAGS_dns_request_timeout_threshold_milliseconds) {
Frame default_resp_frame;
default_resp_frame.timestamp_ns = 99;
StatusOr<Record> record_status = ProcessReqRespPair(*it, default_resp_frame);
entries.push_back(record_status.ConsumeValueOrDie());
}
}
it++;
}
req_frames->erase(req_frames->begin(), it);

// TODO(oazizi): Consider removing requests that are too old, otherwise a lost response can mean
// the are never processed. This would result in a memory leak until the more drastic connection
// tracker clean-up mechanisms kick in.
}

resp_frames->clear();

// After the external loop's lifecycle comes to an end we end up with the request deque
// having only those request frames which have not been consumed yet i.e. consumed = false
// so essentially these are the requests which could not be matched with any response frame.
// Hence we iterate over this request deque, add a default response to it, make a record and
// append those records at the end of the entries vector.
auto it = req_frames->begin();
while (it != req_frames->end()) {
if (!(*it).consumed) {
Frame default_resp_frame;
default_resp_frame.timestamp_ns = 99;
StatusOr<Record> record_status = ProcessReqRespPair(*it, default_resp_frame);
entries.push_back(record_status.ConsumeValueOrDie());
}
it++;
return {entries, error_count};
} else {
return {entries, error_count};
}
return {entries, error_count};
}

RecordsWithErrorCount<Record> StitchFrames(std::deque<Frame>* req_frames,
std::deque<Frame>* resp_frames) {
return (FLAGS_include_respless_dns_requests) ? CustomStitchFrames(req_frames, resp_frames)
: PxStitchFrames(req_frames, resp_frames);
}

} // namespace dns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
// Choose either the simple implementation of StitchFrames (without missing records) or
// to use CustomStitchFrames for including responseless DNS requests as well.
DECLARE_bool(include_respless_dns_requests);
DECLARE_uint64(dns_request_timeout_threshold_milliseconds);

namespace px {
namespace stirling {
Expand All @@ -40,18 +41,21 @@ namespace dns {
*
* @param req_packets: deque of all request frames.
* @param resp_packets: deque of all response frames.
* @param include_respless_dns_requests: bool to decide whether to include responseless DNS requests
* or not.
* @return A vector of entries to be appended to table store.
*/
RecordsWithErrorCount<Record> StitchFrames(std::deque<Frame>* req_packets,
std::deque<Frame>* resp_packets);
std::deque<Frame>* resp_packets,
bool include_respless_dns_requests);

} // namespace dns

template <>
inline RecordsWithErrorCount<dns::Record> StitchFrames(std::deque<dns::Frame>* req_packets,
std::deque<dns::Frame>* resp_packets,
NoState* /* state */) {
return dns::StitchFrames(req_packets, resp_packets);
return dns::StitchFrames(req_packets, resp_packets, FLAGS_include_respless_dns_requests);
}

} // namespace protocols
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ TEST(DnsStitcherTest, RecordOutput) {
req_frames.push_back(req0_frame);
resp_frames.push_back(resp0_frame);

result = StitchFrames(&req_frames, &resp_frames);
result = StitchFrames(&req_frames, &resp_frames, FLAGS_include_respless_dns_requests);
EXPECT_TRUE(resp_frames.empty());
EXPECT_EQ(req_frames.size(), 0);
EXPECT_EQ(result.error_count, 0);
Expand Down Expand Up @@ -128,7 +128,7 @@ TEST(DnsStitcherTest, OutOfOrderMatching) {
Frame req2_frame = CreateReqFrame(++t, 2);
Frame resp2_frame = CreateRespFrame(++t, 2, std::vector<DNSRecord>());

result = StitchFrames(&req_frames, &resp_frames);
result = StitchFrames(&req_frames, &resp_frames, FLAGS_include_respless_dns_requests);
EXPECT_TRUE(resp_frames.empty());
EXPECT_EQ(req_frames.size(), 0);
EXPECT_EQ(result.error_count, 0);
Expand All @@ -137,15 +137,15 @@ TEST(DnsStitcherTest, OutOfOrderMatching) {
req_frames.push_back(req0_frame);
req_frames.push_back(req1_frame);

result = StitchFrames(&req_frames, &resp_frames);
result = StitchFrames(&req_frames, &resp_frames, FLAGS_include_respless_dns_requests);
EXPECT_TRUE(resp_frames.empty());
EXPECT_EQ(req_frames.size(), 2);
EXPECT_EQ(result.error_count, 0);
EXPECT_EQ(result.records.size(), FLAGS_include_respless_dns_requests ? 2 : 0);

resp_frames.push_back(resp1_frame);

result = StitchFrames(&req_frames, &resp_frames);
result = StitchFrames(&req_frames, &resp_frames, FLAGS_include_respless_dns_requests);
EXPECT_TRUE(resp_frames.empty());
EXPECT_EQ(req_frames.size(), 2);
EXPECT_EQ(result.error_count, 0);
Expand All @@ -154,21 +154,21 @@ TEST(DnsStitcherTest, OutOfOrderMatching) {
req_frames.push_back(req2_frame);
resp_frames.push_back(resp0_frame);

result = StitchFrames(&req_frames, &resp_frames);
result = StitchFrames(&req_frames, &resp_frames, FLAGS_include_respless_dns_requests);
EXPECT_TRUE(resp_frames.empty());
EXPECT_EQ(req_frames.size(), 1);
EXPECT_EQ(result.error_count, 0);
EXPECT_EQ(result.records.size(), FLAGS_include_respless_dns_requests ? 2 : 1);

resp_frames.push_back(resp2_frame);

result = StitchFrames(&req_frames, &resp_frames);
result = StitchFrames(&req_frames, &resp_frames, FLAGS_include_respless_dns_requests);
EXPECT_TRUE(resp_frames.empty());
EXPECT_EQ(resp_frames.size(), 0);
EXPECT_EQ(result.error_count, 0);
EXPECT_EQ(result.records.size(), 1);

result = StitchFrames(&req_frames, &resp_frames);
result = StitchFrames(&req_frames, &resp_frames, FLAGS_include_respless_dns_requests);
EXPECT_TRUE(resp_frames.empty());
EXPECT_EQ(resp_frames.size(), 0);
EXPECT_EQ(result.error_count, 0);
Expand Down

0 comments on commit a3e9243

Please sign in to comment.