Skip to content

Commit

Permalink
Aggregate trial statistics (#112)
Browse files Browse the repository at this point in the history
* Aggregate trial statistics

* Fix merging for ensemble models

* Add documentation

* review edit
  • Loading branch information
Tabrizian authored Jun 8, 2022
1 parent efd1cbe commit 82986cf
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 7 deletions.
217 changes: 216 additions & 1 deletion src/c++/perf_analyzer/inference_profiler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -573,8 +573,10 @@ InferenceProfiler::ProfileHelper(
LoadStatus load_status;
size_t completed_trials = 0;
std::queue<cb::Error> error;
std::deque<PerfStatus> perf_status;

do {
PerfStatus status_summary;
RETURN_IF_ERROR(manager_->CheckHealth());

// Needed to obtain stable measurements
Expand All @@ -587,8 +589,11 @@ InferenceProfiler::ProfileHelper(
} else {
error.push(Measure(status_summary, measurement_request_count_, true));
}
if (error.size() >= load_parameters_.stability_window) {
perf_status.push_back(status_summary);

if (error.size() > load_parameters_.stability_window) {
error.pop();
perf_status.pop_front();
}

if (error.back().IsOk()) {
Expand Down Expand Up @@ -648,6 +653,7 @@ InferenceProfiler::ProfileHelper(
(latency_threshold_ms_ * 1000 * 1000))) {
within_threshold = true;
}

// We call it complete only if stability_window measurements are within
// +/-(stability_threshold)% of the average infer per second and latency
if ((load_status.infer_per_sec[idx] <
Expand Down Expand Up @@ -681,6 +687,7 @@ InferenceProfiler::ProfileHelper(
completed_trials++;
} while ((!early_exit) && (completed_trials < max_trials_));


// return the appropriate error which might have occured in the
// stability_window for its proper handling.
while (!error.empty()) {
Expand All @@ -690,13 +697,219 @@ InferenceProfiler::ProfileHelper(
error.pop();
}
}
RETURN_IF_ERROR(MergePerfStatusReports(perf_status, status_summary));

if (early_exit) {
return cb::Error("Received exit signal.");
}
return cb::Error::Success;
}

cb::Error
InferenceProfiler::MergeServerSideStats(
std::vector<ServerSideStats>& server_side_stats,
ServerSideStats& server_side_summary)
{
auto& server_side_stat = server_side_stats[0];

// Make sure that the perf status reports profiling settings match with each
// other.
for (size_t i = 1; i < server_side_stats.size(); i++) {
if (server_side_stats[i].composing_models_stat.size() !=
server_side_stat.composing_models_stat.size()) {
return cb::Error(
"Inconsistent ensemble setting detected between the trials.");
}
}

// Initialize the server stats for the merged report.
server_side_summary.inference_count = 0;
server_side_summary.execution_count = 0;
server_side_summary.cache_hit_count = 0;
server_side_summary.cache_miss_count = 0;
server_side_summary.success_count = 0;
server_side_summary.queue_count = 0;
server_side_summary.compute_input_count = 0;
server_side_summary.compute_output_count = 0;
server_side_summary.compute_infer_count = 0;
server_side_summary.cumm_time_ns = 0;
server_side_summary.queue_time_ns = 0;
server_side_summary.compute_input_time_ns = 0;
server_side_summary.compute_infer_time_ns = 0;
server_side_summary.compute_output_time_ns = 0;
server_side_summary.cache_hit_time_ns = 0;
server_side_summary.cache_miss_time_ns = 0;
server_side_summary.composing_models_stat.clear();
for (auto& composing_model_stat : server_side_stat.composing_models_stat) {
std::vector<ServerSideStats> composing_model_stats;
for (auto& server_side_stat : server_side_stats) {
composing_model_stats.push_back(
server_side_stat.composing_models_stat[composing_model_stat.first]);
}

ServerSideStats merged_composing_model_stats;
RETURN_IF_ERROR(MergeServerSideStats(
composing_model_stats, merged_composing_model_stats));
server_side_summary.composing_models_stat.insert(
{composing_model_stat.first, merged_composing_model_stats});
}

for (auto& server_side_stat : server_side_stats) {
// Aggregated Server Stats
server_side_summary.inference_count += server_side_stat.inference_count;
server_side_summary.execution_count += server_side_stat.execution_count;
server_side_summary.cache_hit_count += server_side_stat.cache_hit_count;
server_side_summary.cache_miss_count += server_side_stat.cache_miss_count;
server_side_summary.success_count += server_side_stat.success_count;
server_side_summary.queue_count += server_side_stat.queue_count;
server_side_summary.compute_input_count +=
server_side_stat.compute_input_count;
server_side_summary.compute_infer_count +=
server_side_stat.compute_infer_count;
server_side_summary.compute_output_count +=
server_side_stat.compute_output_count;
server_side_summary.cumm_time_ns += server_side_stat.cumm_time_ns;
server_side_summary.queue_time_ns += server_side_stat.queue_time_ns;
server_side_summary.compute_input_time_ns +=
server_side_stat.compute_input_time_ns;
server_side_summary.compute_infer_time_ns +=
server_side_stat.compute_infer_time_ns;
server_side_summary.compute_output_time_ns +=
server_side_stat.compute_output_time_ns;
server_side_summary.cache_hit_time_ns += server_side_stat.cache_hit_time_ns;
server_side_summary.cache_miss_time_ns +=
server_side_stat.cache_miss_time_ns;
}

return cb::Error::Success;
}

cb::Error
InferenceProfiler::MergePerfStatusReports(
std::deque<PerfStatus>& perf_status_reports, PerfStatus& summary_status)
{
if (perf_status_reports.size() != load_parameters_.stability_window) {
return cb::Error(
"Perf Status reports size must match the stability window.");
}

auto& perf_status = perf_status_reports[0];

// Make sure that the perf status reports profiling settings match with each
// other.
for (size_t i = 1; i < perf_status_reports.size(); i++) {
perf_status.concurrency = summary_status.concurrency;
perf_status.request_rate = summary_status.request_rate;

if (perf_status_reports[i].on_sequence_model !=
perf_status.on_sequence_model) {
return cb::Error("Incosistent sequence setting detected.");
}

if (perf_status_reports[i].batch_size != perf_status.batch_size) {
return cb::Error("Incosistent batch size detected.");
}

if (perf_status_reports[i].server_stats.composing_models_stat.size() !=
perf_status.server_stats.composing_models_stat.size()) {
return cb::Error(
"Inconsistent ensemble setting detected between the trials.");
}
}

summary_status.batch_size = perf_status.batch_size;
summary_status.on_sequence_model = perf_status.on_sequence_model;

// Initialize the client stats for the merged report.
summary_status.client_stats.request_count = 0;
summary_status.client_stats.sequence_count = 0;
summary_status.client_stats.delayed_request_count = 0;
summary_status.client_stats.duration_ns = 0;
summary_status.client_stats.avg_latency_ns = 0;
summary_status.client_stats.percentile_latency_ns.clear();
summary_status.client_stats.latencies.clear();
summary_status.client_stats.std_us = 0;
summary_status.client_stats.avg_request_time_ns = 0;
summary_status.client_stats.avg_send_time_ns = 0;
summary_status.client_stats.avg_receive_time_ns = 0;
summary_status.client_stats.infer_per_sec = 0;
summary_status.client_stats.sequence_per_sec = 0;
summary_status.client_stats.completed_count = 0;
summary_status.stabilizing_latency_ns = 0;

std::vector<ServerSideStats> server_side_stats;
for (auto& perf_status : perf_status_reports) {
// Aggregated Client Stats
summary_status.client_stats.request_count +=
perf_status.client_stats.request_count;
summary_status.client_stats.sequence_count +=
perf_status.client_stats.sequence_count;
summary_status.client_stats.delayed_request_count +=
perf_status.client_stats.delayed_request_count;
summary_status.client_stats.duration_ns +=
perf_status.client_stats.duration_ns;

server_side_stats.push_back(perf_status.server_stats);

summary_status.client_stats.latencies.insert(
summary_status.client_stats.latencies.end(),
perf_status.client_stats.latencies.begin(),
perf_status.client_stats.latencies.end());
}

if (include_lib_stats_) {
for (auto& perf_status : perf_status_reports) {
summary_status.client_stats.completed_count +=
perf_status.client_stats.completed_count;

summary_status.client_stats.avg_request_time_ns +=
perf_status.client_stats.avg_request_time_ns *
perf_status.client_stats.completed_count;

summary_status.client_stats.avg_send_time_ns +=
perf_status.client_stats.avg_send_time_ns *
perf_status.client_stats.completed_count;

summary_status.client_stats.avg_receive_time_ns +=
perf_status.client_stats.avg_receive_time_ns *
perf_status.client_stats.completed_count;
}

if (summary_status.client_stats.completed_count != 0) {
summary_status.client_stats.avg_request_time_ns =
summary_status.client_stats.avg_request_time_ns /
summary_status.client_stats.completed_count;

summary_status.client_stats.avg_send_time_ns =
summary_status.client_stats.avg_send_time_ns /
summary_status.client_stats.completed_count;

summary_status.client_stats.avg_receive_time_ns =
summary_status.client_stats.avg_receive_time_ns /
summary_status.client_stats.completed_count;
}
}

RETURN_IF_ERROR(
MergeServerSideStats(server_side_stats, summary_status.server_stats));

std::sort(
summary_status.client_stats.latencies.begin(),
summary_status.client_stats.latencies.end());

float client_duration_sec =
(float)summary_status.client_stats.duration_ns / NANOS_PER_SECOND;
summary_status.client_stats.sequence_per_sec =
summary_status.client_stats.sequence_count / client_duration_sec;
summary_status.client_stats.infer_per_sec =
(summary_status.client_stats.request_count * summary_status.batch_size) /
client_duration_sec;
RETURN_IF_ERROR(
SummarizeLatency(summary_status.client_stats.latencies, summary_status));

return cb::Error::Success;
}

cb::Error
InferenceProfiler::GetServerSideStatus(
std::map<cb::ModelIdentifier, cb::ModelStatistics>* model_stats)
Expand Down Expand Up @@ -793,6 +1006,7 @@ InferenceProfiler::Summarize(
RETURN_IF_ERROR(SummarizeClientStat(
start_stat, end_stat, valid_range.second - valid_range.first,
latencies.size(), valid_sequence_count, delayed_request_count, summary));
summary.client_stats.latencies = std::move(latencies);

if (include_server_stats_) {
RETURN_IF_ERROR(SummarizeServerStats(
Expand Down Expand Up @@ -951,6 +1165,7 @@ InferenceProfiler::SummarizeClientStat(
end_stat.completed_request_count - start_stat.completed_request_count;
uint64_t request_time_ns = end_stat.cumulative_total_request_time_ns -
start_stat.cumulative_total_request_time_ns;
summary.client_stats.completed_count = completed_count;
uint64_t send_time_ns =
end_stat.cumulative_send_time_ns - start_stat.cumulative_send_time_ns;
uint64_t receive_time_ns = end_stat.cumulative_receive_time_ns -
Expand Down
28 changes: 26 additions & 2 deletions src/c++/perf_analyzer/inference_profiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#pragma once

#include <deque>
#include <thread>
#include "concurrency_manager.h"
#include "custom_load_manager.h"
Expand Down Expand Up @@ -116,6 +117,8 @@ struct ClientSideStats {
uint64_t avg_latency_ns;
// a ordered map of percentiles to be reported (<percentile, value> pair)
std::map<size_t, uint64_t> percentile_latency_ns;
// List of all the valid latencies.
std::vector<uint64_t> latencies;
// Using usec to avoid square of large number (large in nsec)
uint64_t std_us;
uint64_t avg_request_time_ns;
Expand All @@ -124,6 +127,9 @@ struct ClientSideStats {
// Per sec stat
double infer_per_sec;
double sequence_per_sec;

// Completed request count reported by the client library
uint64_t completed_count;
};

/// The entire statistics record.
Expand Down Expand Up @@ -291,8 +297,8 @@ class InferenceProfiler {
const size_t concurrent_request_count, std::vector<PerfStatus>& summary,
bool* meets_threshold);

/// Similar to above function, but instead of setting the concurrency, it sets
/// the specified request rate for measurements.
/// Similar to above function, but instead of setting the concurrency, it
/// sets the specified request rate for measurements.
/// \param request_rate The request rate for inferences.
/// \param summary Appends the measurements summary at the end of this list.
/// \param meets_threshold Returns whether the setting meets the threshold.
Expand Down Expand Up @@ -442,6 +448,24 @@ class InferenceProfiler {
/// \return True if all MPI ranks are stable.
bool AllMPIRanksAreStable(bool current_rank_stability);

/// Merge individual perf status reports into a single perf status. This
/// function is used to merge the results from multiple Measure runs into a
/// single report.
/// \param perf_status List of perf status reports to be merged.
/// \param summary_status Final merged summary status.
/// \return cb::Error object indicating success or failure.
cb::Error MergePerfStatusReports(
std::deque<PerfStatus>& perf_status, PerfStatus& summary_status);

/// Merge individual server side statistics into a single server side report.
/// \param server_side_stats List of server side statistics reports to be
/// merged.
/// \param server_side_summary Final merged summary status.
/// \return cb::Error object indicating success or failure.
cb::Error MergeServerSideStats(
std::vector<ServerSideStats>& server_side_stats,
ServerSideStats& server_side_summary);

bool verbose_;
uint64_t measurement_window_ms_;
uint64_t measurement_request_count_;
Expand Down
18 changes: 14 additions & 4 deletions src/c++/perf_analyzer/perf_analyzer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ SignalHandler(int signum)
// specified, the selected percentile value will be reported instead of
// average value.
//
// Perf Analyzer determines the stability of throughput and latency by observing
// measurements in different trials. If the latency and throughput, are within
// the stability percentage (see --stability-percentage option) Perf Analyzer
// will report the average of the throughput and latency numbers observed in the
// last three trials. All the measurements gathered during the last three trials
// is aggregated to generate a single report. The number of total requests is
// the sum of all the requests in the individual measurement windows.
//
// There are broadly three ways to load server for the data collection using
// perf_analyzer:
// - Maintaining Target Concurrency:
Expand Down Expand Up @@ -172,16 +180,15 @@ SignalHandler(int signum)
// --concurrency-range: The range of concurrency levels perf_analyzer will use.
// A concurrency level indicates the number of concurrent requests in queue.
// --request-rate-range: The range of request rates perf_analyzer will use to
// load
// the server.
// load the server.
// --request-intervals: File containing time intervals (in microseconds) to use
// between successive requests.
// --latency-threshold: latency threshold in msec.
// --measurement-interval: time interval for each measurement window in msec.
// --async: Enables Asynchronous inference calls.
// --binary-search: Enables binary search within the specified range.
// --request-distribution: Allows user to specify the distribution for selecting
// the time intervals between the request dispatch.
// the time intervals between the request dispatch.
//
// For detail of the options not listed, please refer to the usage.
//
Expand Down Expand Up @@ -483,7 +490,10 @@ Usage(char** argv, const std::string& msg = std::string())
"latency measurements when determining if a result is stable. The "
"measurement is considered as stable if the recent 3 measurements "
"are within +/- (stability percentage)% of their average in terms "
"of both infer per second and latency. Default is 10(%).",
"of both infer per second and latency. When perf analyzer "
"determines that the measurements are stable, it returns average "
"of the measurements collected in the last 3 windows. Default is "
"10(%).",
18)
<< std::endl;
std::cerr << FormatMessage(
Expand Down

0 comments on commit 82986cf

Please sign in to comment.