Skip to content

Commit

Permalink
Merge pull request #635 from mattmundell/chunk-result-inserts
Browse files Browse the repository at this point in the history
Buffer inserts when adding results from a slave
  • Loading branch information
timopollmeier authored Jul 10, 2019
2 parents 9bbf1a6 + 9587713 commit 490a357
Showing 1 changed file with 209 additions and 30 deletions.
239 changes: 209 additions & 30 deletions src/manage_sql.c
Original file line number Diff line number Diff line change
Expand Up @@ -22679,23 +22679,19 @@ report_set_source_iface (report_t report, const gchar *iface)
* @param[in] report The report.
* @param[in] result The result.
*/
void
report_add_result (report_t report, result_t result)
static void
report_add_result_for_buffer (report_t report, result_t result)
{
double severity, ov_severity;
int qod;
rowid_t rowid;
iterator_t cache_iterator;
user_t previous_user = 0;

if (report == 0 || result == 0)
return;
assert (result);

sql ("UPDATE results SET report = %llu,"
" owner = (SELECT reports.owner"
" FROM reports WHERE id = %llu)"
" WHERE id = %llu;",
report, report, result);
if (report == 0)
return;

if (sql_int ("SELECT NOT EXISTS (SELECT * from result_nvt_reports"
" WHERE result_nvt = (SELECT result_nvt"
Expand Down Expand Up @@ -22807,6 +22803,27 @@ report_add_result (report_t report, result_t result)

}
cleanup_iterator (&cache_iterator);
}

/**
* @brief Add a result to a report.
*
* @param[in] report The report.
* @param[in] result The result.
*/
void
report_add_result (report_t report, result_t result)
{
if (report == 0 || result == 0)
return;

sql ("UPDATE results SET report = %llu,"
" owner = (SELECT reports.owner"
" FROM reports WHERE id = %llu)"
" WHERE id = %llu;",
report, report, result);

report_add_result_for_buffer (report, result);

sql ("UPDATE report_counts"
" SET end_time = (SELECT coalesce(min(overrides.end_time), 0)"
Expand Down Expand Up @@ -48367,6 +48384,162 @@ set_slave_commit_size (int new_commit_size)
slave_commit_size = new_commit_size;
}

/**
* @brief Buffer a result to be inserted.
*
* @param[in] buffer Buffer to store SQL.
* @param[in] task The task associated with the result.
* @param[in] host Host IP address.
* @param[in] hostname Hostname.
* @param[in] port The port the result refers to.
* @param[in] nvt The OID of the NVT that produced the result.
* @param[in] type Type of result. "Security Hole", etc.
* @param[in] description Description of the result.
* @param[in] report Report that result belongs to.
* @param[in] owner Owner of report.
*
* @return A result descriptor for the new result, 0 if error.
*/
static result_t
buffer_insert (GString *buffer, task_t task, const char* host,
const char *hostname, const char* port, const char* nvt,
const char* type, const char* description,
report_t report, user_t owner)
{
gchar *nvt_revision, *severity;
gchar *quoted_hostname, *quoted_descr, *quoted_qod_type;
int qod, first;
nvt_t nvt_id = 0;

assert (report);

if (nvt && strcmp (nvt, "") && (find_nvt (nvt, &nvt_id) || nvt_id <= 0))
{
g_warning ("NVT '%s' not found. Result not created", nvt);
return -1;
}

if (nvt && strcmp (nvt, ""))
{
nvti_t *nvti;

nvti = lookup_nvti (nvt);
if (nvti)
{
gchar *qod_str, *qod_type;
qod_str = tag_value (nvti_tag (nvti), "qod");
qod_type = tag_value (nvti_tag (nvti), "qod_type");

if (qod_str == NULL || sscanf (qod_str, "%d", &qod) != 1)
qod = qod_from_type (qod_type);

quoted_qod_type = sql_quote (qod_type);

g_free (qod_str);
g_free (qod_type);
}
else
{
qod = QOD_DEFAULT;
quoted_qod_type = g_strdup ("");
}

nvt_revision = g_strdup_printf ("SELECT iso_time (modification_time)"
" FROM nvts"
" WHERE uuid = '%s';",
nvt);
}
else
{
qod = QOD_DEFAULT;
quoted_qod_type = g_strdup ("");
nvt_revision = g_strdup ("");
}
severity = nvt_severity (nvt, type);
if (!severity)
{
g_warning ("NVT '%s' has no severity. Result not created.", nvt);
return -1;
}

if (!strcmp (severity, ""))
{
g_free (severity);
severity = g_strdup ("0.0");
}
quoted_hostname = sql_quote (hostname ? hostname : "");
quoted_descr = sql_quote (description ?: "");
result_nvt_notice (nvt);
first = (strlen (buffer->str) == 0);

if (first)
g_string_append (buffer,
"INSERT into results"
" (owner, date, task, host, hostname, port,"
" nvt, nvt_version, severity, type,"
" description, uuid, qod, qod_type, result_nvt"
" report)"
" VALUES");
g_string_append_printf (buffer,
"%s"
" (%llu, m_now (), %llu, '%s', '%s', '%s',"
" '%s', '%s', '%s', '%s',"
" '%s', make_uuid (), %i, '%s',"
" (SELECT id FROM result_nvts WHERE nvt = '%s'),"
" %llu)",
first ? "" : ",",
owner,
task, host ?: "", quoted_hostname, port ?: "",
nvt ?: "", nvt_revision, severity, type,
quoted_descr, qod, quoted_qod_type, nvt ? nvt : "",
report);

g_free (quoted_hostname);
g_free (quoted_descr);
g_free (quoted_qod_type);
g_free (nvt_revision);
g_free (severity);
return 0;
}

/**
* @brief Run INSERT for update_from_slave.
*
* @param[in] buffer Buffer.
* @param[in] report Report.
*/
static void
update_from_slave_insert (GString *buffer, report_t report)
{
if (buffer && strlen (buffer->str))
{
if (report)
{
iterator_t ids;

g_string_append (buffer, " RETURNING id;");

init_iterator (&ids, buffer->str);
while (next (&ids))
report_add_result_for_buffer (report, iterator_int64 (&ids, 0));
cleanup_iterator (&ids);

sql ("UPDATE report_counts"
" SET end_time = (SELECT coalesce(min(overrides.end_time), 0)"
" FROM overrides, results"
" WHERE overrides.nvt = results.nvt"
" AND results.report = %llu"
" AND overrides.end_time >= m_now ())"
" WHERE report = %llu AND override = 1;",
report, report);
}
else
sql (buffer->str);

g_string_truncate (buffer, 0);
}
}

/**
* @brief Update the local task from the slave task.
*
Expand All @@ -48384,6 +48557,8 @@ update_from_slave (task_t task, entity_t get_report, entity_t *report,
entity_t entity, host_start, start;
entities_t results, hosts, entities;
int current_commit_size;
GString *buffer;
user_t owner;

entity = entity_child (get_report, "report");
if (entity == NULL)
Expand Down Expand Up @@ -48455,9 +48630,13 @@ update_from_slave (task_t task, entity_t get_report, entity_t *report,

assert (global_current_report);

owner = sql_int64_0 ("SELECT reports.owner FROM reports WHERE id = %llu;",
global_current_report);

sql_begin_immediate ();
results = entity->entities;
current_commit_size = 0;
buffer = g_string_new ("");
while ((entity = first_entity (results)))
{
if (strcmp (entity_name (entity), "result") == 0)
Expand Down Expand Up @@ -48490,32 +48669,32 @@ update_from_slave (task_t task, entity_t get_report, entity_t *report,
if (description == NULL)
goto rollback_fail;

{
result_t result;

result = make_result (task,
entity_text (host),
hostname ? entity_text (hostname) : "",
entity_text (port),
oid,
threat_message_type (entity_text (threat)),
entity_text (description));
if (global_current_report)
report_add_result (global_current_report, result);

current_commit_size++;
if (slave_commit_size && current_commit_size >= slave_commit_size)
{
sql_commit ();
sql_begin_immediate ();
current_commit_size = 0;
}
}
buffer_insert (buffer,
task,
entity_text (host),
hostname ? entity_text (hostname) : "",
entity_text (port),
oid,
threat_message_type (entity_text (threat)),
entity_text (description),
global_current_report,
owner);

current_commit_size++;
if (slave_commit_size && current_commit_size >= slave_commit_size)
{
update_from_slave_insert (buffer, global_current_report);
sql_commit ();
sql_begin_immediate ();
current_commit_size = 0;
}

(*next_result)++;
}
results = next_entities (results);
}
update_from_slave_insert (buffer, global_current_report);
g_string_free (buffer, TRUE);
sql_commit ();
return 0;

Expand Down

0 comments on commit 490a357

Please sign in to comment.