Skip to content

Commit

Permalink
Merge pull request #2522 from DataDog/pg-block
Browse files Browse the repository at this point in the history
Support PG calls with a block
  • Loading branch information
TonyCTHsu authored Jan 24, 2023
2 parents 291dbd2 + a5a6cdb commit d6a1a60
Show file tree
Hide file tree
Showing 2 changed files with 1,643 additions and 676 deletions.
75 changes: 44 additions & 31 deletions lib/datadog/tracing/contrib/pg/instrumentation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,63 +20,66 @@ def self.included(base)

# PG::Connection patch methods
module InstanceMethods
def exec(sql, *args)
trace(Ext::SPAN_EXEC, sql: sql) do |sql_statement|
super(sql_statement, *args)
def exec(sql, *args, &block)
trace(Ext::SPAN_EXEC, sql: sql, block: block) do |sql_statement, wrapped_block|
super(sql_statement, *args, &wrapped_block)
end
end

def exec_params(sql, params, *args)
trace(Ext::SPAN_EXEC_PARAMS, sql: sql) do |sql_statement|
super(sql_statement, params, *args)
def exec_params(sql, params, *args, &block)
trace(Ext::SPAN_EXEC_PARAMS, sql: sql, block: block) do |sql_statement, wrapped_block|
super(sql_statement, params, *args, &wrapped_block)
end
end

def exec_prepared(statement_name, params, *args)
trace(Ext::SPAN_EXEC_PREPARED, statement_name: statement_name) do
super(statement_name, params, *args)
def exec_prepared(statement_name, params, *args, &block)
trace(Ext::SPAN_EXEC_PREPARED, statement_name: statement_name, block: block) do |_, wrapped_block|
super(statement_name, params, *args, &wrapped_block)
end
end

def async_exec(sql, *args)
trace(Ext::SPAN_ASYNC_EXEC, sql: sql) do |sql_statement|
super(sql_statement, *args)
# async_exec is an alias to exec
def async_exec(sql, *args, &block)
trace(Ext::SPAN_ASYNC_EXEC, sql: sql, block: block) do |sql_statement, wrapped_block|
super(sql_statement, *args, &wrapped_block)
end
end

def async_exec_params(sql, params, *args)
trace(Ext::SPAN_ASYNC_EXEC_PARAMS, sql: sql) do |sql_statement|
super(sql_statement, params, *args)
# async_exec_params is an alias to exec_params
def async_exec_params(sql, params, *args, &block)
trace(Ext::SPAN_ASYNC_EXEC_PARAMS, sql: sql, block: block) do |sql_statement, wrapped_block|
super(sql_statement, params, *args, &wrapped_block)
end
end

def async_exec_prepared(statement_name, params, *args)
trace(Ext::SPAN_ASYNC_EXEC_PREPARED, statement_name: statement_name) do
super(statement_name, params, *args)
# async_exec_prepared is an alias to exec_prepared
def async_exec_prepared(statement_name, params, *args, &block)
trace(Ext::SPAN_ASYNC_EXEC_PREPARED, statement_name: statement_name, block: block) do |_, wrapped_block|
super(statement_name, params, *args, &wrapped_block)
end
end

def sync_exec(sql, *args)
trace(Ext::SPAN_SYNC_EXEC, sql: sql) do |sql_statement|
super(sql_statement, *args)
def sync_exec(sql, *args, &block)
trace(Ext::SPAN_SYNC_EXEC, sql: sql, block: block) do |sql_statement, wrapped_block|
super(sql_statement, *args, &wrapped_block)
end
end

def sync_exec_params(sql, params, *args)
trace(Ext::SPAN_SYNC_EXEC_PARAMS, sql: sql) do |sql_statement|
super(sql_statement, params, *args)
def sync_exec_params(sql, params, *args, &block)
trace(Ext::SPAN_SYNC_EXEC_PARAMS, sql: sql, block: block) do |sql_statement, wrapped_block|
super(sql_statement, params, *args, &wrapped_block)
end
end

def sync_exec_prepared(statement_name, params, *args)
trace(Ext::SPAN_SYNC_EXEC_PREPARED, statement_name: statement_name) do
super(statement_name, params, *args)
def sync_exec_prepared(statement_name, params, *args, &block)
trace(Ext::SPAN_SYNC_EXEC_PREPARED, statement_name: statement_name, block: block) do |_, wrapped_block|
super(statement_name, params, *args, &wrapped_block)
end
end

private

def trace(name, sql: nil, statement_name: nil)
def trace(name, sql: nil, statement_name: nil, block: nil)
service = Datadog.configuration_for(self, :service_name) || datadog_configuration[:service_name]
resource = statement_name || sql

Expand All @@ -101,9 +104,18 @@ def trace(name, sql: nil, statement_name: nil)
)
end

result = yield(propagated_sql_statement)
annotate_span_with_result!(span, result)
result
# Read metadata from PG::Result
if block
yield(propagated_sql_statement, proc do |result|
ret = block.call(result)
annotate_span_with_result!(span, result)
ret
end)
else
result = yield(propagated_sql_statement)
annotate_span_with_result!(span, result)
result
end
end
end

Expand All @@ -128,6 +140,7 @@ def annotate_span_with_query!(span, service)
span.set_tag(Tracing::Metadata::Ext::NET::TAG_DESTINATION_PORT, port)
end

# @param [PG::Result] result
def annotate_span_with_result!(span, result)
span.set_tag(Contrib::Ext::DB::TAG_ROW_COUNT, result.ntuples)
end
Expand Down
Loading

0 comments on commit d6a1a60

Please sign in to comment.