Skip to content

Commit cd10ca9

Browse files
authored
Update build_summary_table function (#578)
* Update build_summary_table function This patch update build_summary_table to match the same function in impala-shell. https://github.com/apache/impala/blob/a07bf84/shell/impala_client.py#L113 Testing: Run and pass following command ``` tox -- -ktest_build_summary_table ``` * Copy exec_summary.py from apache/impala@e73e2d4 * Remove one more cur.close_operation()
1 parent 4b975de commit cd10ca9

File tree

3 files changed

+245
-121
lines changed

3 files changed

+245
-121
lines changed

impala/exec_summary.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Licensed to the Apache Software Foundation (ASF) under one
5+
# or more contributor license agreements. See the NOTICE file
6+
# distributed with this work for additional information
7+
# regarding copyright ownership. The ASF licenses this file
8+
# to you under the Apache License, Version 2.0 (the
9+
# "License"); you may not use this file except in compliance
10+
# with the License. You may obtain a copy of the License at
11+
#
12+
# http://www.apache.org/licenses/LICENSE-2.0
13+
#
14+
# Unless required by applicable law or agreed to in writing,
15+
# software distributed under the License is distributed on an
16+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
17+
# KIND, either express or implied. See the License for the
18+
# specific language governing permissions and limitations
19+
# under the License.
20+
21+
from impala._thrift_gen.ExecStats.ttypes import TExecStats
22+
23+
24+
def build_exec_summary_table(summary, idx, indent_level, new_indent_level, output,
25+
is_prettyprint=True, separate_prefix_column=False):
26+
"""Direct translation of Coordinator::PrintExecSummary() to recursively build a list
27+
of rows of summary statistics, one per exec node
28+
29+
summary: the TExecSummary object that contains all the summary data
30+
31+
idx: the index of the node to print
32+
33+
indent_level: the number of spaces to print before writing the node's label, to give
34+
the appearance of a tree. The 0th child of a node has the same indent_level as its
35+
parent. All other children have an indent_level of one greater than their parent.
36+
37+
new_indent_level: If true, this indent level is different from the previous row's.
38+
39+
output: the list of rows into which to append the rows produced for this node and its
40+
children.
41+
42+
is_prettyprint: Optional. If True, print time, units, and bytes columns in pretty
43+
printed format.
44+
45+
separate_prefix_column: Optional. If True, the prefix and operator name will be
46+
returned as separate column. Otherwise, prefix and operater name will be concatenated
47+
into single column.
48+
49+
Returns the index of the next exec node in summary.exec_nodes that should be
50+
processed, used internally to this method only.
51+
"""
52+
if not summary.nodes:
53+
# Summary nodes is empty or None. Nothing to build.
54+
return
55+
assert idx < len(summary.nodes), (
56+
"Index ({0}) must be less than exec summary count ({1})").format(
57+
idx, len(summary.nodes))
58+
59+
attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
60+
61+
# Initialise aggregate and maximum stats
62+
agg_stats, max_stats = TExecStats(), TExecStats()
63+
for attr in attrs:
64+
setattr(agg_stats, attr, 0)
65+
setattr(max_stats, attr, 0)
66+
67+
node = summary.nodes[idx]
68+
instances = 0
69+
if node.exec_stats:
70+
# exec_stats is not None or an empty list.
71+
instances = len(node.exec_stats)
72+
for stats in node.exec_stats:
73+
for attr in attrs:
74+
val = getattr(stats, attr)
75+
if val is not None:
76+
setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
77+
setattr(max_stats, attr, max(getattr(max_stats, attr), val))
78+
avg_time = agg_stats.latency_ns / instances
79+
else:
80+
avg_time = 0
81+
82+
is_sink = node.node_id == -1
83+
# If the node is a broadcast-receiving exchange node, the cardinality of rows produced
84+
# is the max over all instances (which should all have received the same number of
85+
# rows). Otherwise, the cardinality is the sum over all instances which process
86+
# disjoint partitions.
87+
if is_sink:
88+
cardinality = -1
89+
elif node.is_broadcast:
90+
cardinality = max_stats.cardinality
91+
else:
92+
cardinality = agg_stats.cardinality
93+
94+
est_stats = node.estimated_stats
95+
label_prefix = ""
96+
if indent_level > 0:
97+
label_prefix = "|"
98+
label_prefix += " |" * (indent_level - 1)
99+
if new_indent_level:
100+
label_prefix += "--"
101+
else:
102+
label_prefix += " "
103+
104+
def prettyprint(val, units, divisor):
105+
for unit in units:
106+
if val < divisor:
107+
if unit == units[0]:
108+
return "%d%s" % (val, unit)
109+
else:
110+
return "%3.2f%s" % (val, unit)
111+
val /= divisor
112+
113+
def prettyprint_bytes(byte_val):
114+
return prettyprint(byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0)
115+
116+
def prettyprint_units(unit_val):
117+
return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)
118+
119+
def prettyprint_time(time_val):
120+
return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
121+
122+
latency = max_stats.latency_ns
123+
cardinality_est = est_stats.cardinality
124+
memory_used = max_stats.memory_used
125+
memory_est = est_stats.memory_used
126+
if (is_prettyprint):
127+
avg_time = prettyprint_time(avg_time)
128+
latency = prettyprint_time(latency)
129+
cardinality = "" if is_sink else prettyprint_units(cardinality)
130+
cardinality_est = "" if is_sink else prettyprint_units(cardinality_est)
131+
memory_used = prettyprint_bytes(memory_used)
132+
memory_est = prettyprint_bytes(memory_est)
133+
134+
row = list()
135+
if separate_prefix_column:
136+
row.append(label_prefix)
137+
row.append(node.label)
138+
else:
139+
row.append(label_prefix + node.label)
140+
141+
row.extend([
142+
node.num_hosts,
143+
instances,
144+
avg_time,
145+
latency,
146+
cardinality,
147+
cardinality_est,
148+
memory_used,
149+
memory_est,
150+
node.label_detail])
151+
152+
output.append(row)
153+
try:
154+
sender_idx = summary.exch_to_sender_map[idx]
155+
# This is an exchange node or a join node with a separate builder, so the source
156+
# is a fragment root, and should be printed next.
157+
sender_indent_level = indent_level + node.num_children
158+
sender_new_indent_level = node.num_children > 0
159+
build_exec_summary_table(summary, sender_idx, sender_indent_level,
160+
sender_new_indent_level, output, is_prettyprint,
161+
separate_prefix_column)
162+
except (KeyError, TypeError):
163+
# Fall through if idx not in map, or if exch_to_sender_map itself is not set
164+
pass
165+
166+
idx += 1
167+
if node.num_children > 0:
168+
first_child_output = []
169+
idx = build_exec_summary_table(summary, idx, indent_level, False, first_child_output,
170+
is_prettyprint, separate_prefix_column)
171+
for _ in range(1, node.num_children):
172+
# All other children are indented
173+
idx = build_exec_summary_table(summary, idx, indent_level + 1, True, output,
174+
is_prettyprint, separate_prefix_column)
175+
output += first_child_output
176+
return idx

impala/hiveserver2.py

Lines changed: 5 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
from thrift.transport.TTransport import TTransportException
3030
from thrift.Thrift import TApplicationException
3131
from thrift.protocol.TBinaryProtocol import TBinaryProtocolAccelerated
32+
from impala._thrift_gen.ExecStats.ttypes import TExecStats
3233
from impala._thrift_gen.TCLIService.ttypes import (
3334
TOpenSessionReq, TFetchResultsReq, TCloseSessionReq,
3435
TExecuteStatementReq, TGetInfoReq, TGetInfoType, TTypeId,
@@ -44,6 +45,7 @@
4445
from impala.compat import (Decimal, _xrange as xrange)
4546
from impala.error import (NotSupportedError, OperationalError,
4647
ProgrammingError, HiveServer2Error, HttpError)
48+
from impala.exec_summary import build_exec_summary_table
4749
from impala.interface import Connection, Cursor, _bind_parameters
4850
from impala.util import get_logger_and_init_null
4951

@@ -727,8 +729,9 @@ def get_summary(self):
727729

728730
def build_summary_table(self, summary, output, idx=0,
729731
is_fragment_root=False, indent_level=0):
730-
return build_summary_table(summary, idx, is_fragment_root,
731-
indent_level, output)
732+
return build_exec_summary_table(
733+
summary, idx, indent_level, is_fragment_root, output, is_prettyprint=True,
734+
separate_prefix_column=False)
732735

733736
def get_databases(self):
734737
def op():
@@ -1550,122 +1553,3 @@ def get_result_schema(self):
15501553
log.debug('get_result_schema: schema=%s', schema)
15511554

15521555
return schema
1553-
1554-
1555-
def build_summary_table(summary, idx, is_fragment_root, indent_level, output):
1556-
"""Direct translation of Coordinator::PrintExecSummary() to recursively
1557-
build a list of rows of summary statistics, one per exec node
1558-
1559-
summary: the TExecSummary object that contains all the summary data
1560-
1561-
idx: the index of the node to print
1562-
1563-
is_fragment_root: true if the node to print is the root of a fragment (and
1564-
therefore feeds into an exchange)
1565-
1566-
indent_level: the number of spaces to print before writing the node's
1567-
label, to give the appearance of a tree. The 0th child of a node has the
1568-
same indent_level as its parent. All other children have an indent_level
1569-
of one greater than their parent.
1570-
1571-
output: the list of rows into which to append the rows produced for this
1572-
node and its children.
1573-
1574-
Returns the index of the next exec node in summary.exec_nodes that should
1575-
be processed, used internally to this method only.
1576-
"""
1577-
# pylint: disable=too-many-locals
1578-
1579-
attrs = ["latency_ns", "cpu_time_ns", "cardinality", "memory_used"]
1580-
1581-
# Initialise aggregate and maximum stats
1582-
agg_stats, max_stats = TExecStats(), TExecStats()
1583-
for attr in attrs:
1584-
setattr(agg_stats, attr, 0)
1585-
setattr(max_stats, attr, 0)
1586-
1587-
node = summary.nodes[idx]
1588-
for stats in node.exec_stats:
1589-
for attr in attrs:
1590-
val = getattr(stats, attr)
1591-
if val is not None:
1592-
setattr(agg_stats, attr, getattr(agg_stats, attr) + val)
1593-
setattr(max_stats, attr, max(getattr(max_stats, attr), val))
1594-
1595-
if len(node.exec_stats) > 0:
1596-
avg_time = agg_stats.latency_ns / len(node.exec_stats)
1597-
else:
1598-
avg_time = 0
1599-
1600-
# If the node is a broadcast-receiving exchange node, the cardinality of
1601-
# rows produced is the max over all instances (which should all have
1602-
# received the same number of rows). Otherwise, the cardinality is the sum
1603-
# over all instances which process disjoint partitions.
1604-
if node.is_broadcast and is_fragment_root:
1605-
cardinality = max_stats.cardinality
1606-
else:
1607-
cardinality = agg_stats.cardinality
1608-
1609-
est_stats = node.estimated_stats
1610-
label_prefix = ""
1611-
if indent_level > 0:
1612-
label_prefix = "|"
1613-
if is_fragment_root:
1614-
label_prefix += " " * indent_level
1615-
else:
1616-
label_prefix += "--" * indent_level
1617-
1618-
def prettyprint(val, units, divisor):
1619-
for unit in units:
1620-
if val < divisor:
1621-
if unit == units[0]:
1622-
return "%d%s" % (val, unit)
1623-
else:
1624-
return "%3.2f%s" % (val, unit)
1625-
val /= divisor
1626-
1627-
def prettyprint_bytes(byte_val):
1628-
return prettyprint(
1629-
byte_val, [' B', ' KB', ' MB', ' GB', ' TB'], 1024.0)
1630-
1631-
def prettyprint_units(unit_val):
1632-
return prettyprint(unit_val, ["", "K", "M", "B"], 1000.0)
1633-
1634-
def prettyprint_time(time_val):
1635-
return prettyprint(time_val, ["ns", "us", "ms", "s"], 1000.0)
1636-
1637-
row = [label_prefix + node.label,
1638-
len(node.exec_stats),
1639-
prettyprint_time(avg_time),
1640-
prettyprint_time(max_stats.latency_ns),
1641-
prettyprint_units(cardinality),
1642-
prettyprint_units(est_stats.cardinality),
1643-
prettyprint_bytes(max_stats.memory_used),
1644-
prettyprint_bytes(est_stats.memory_used),
1645-
node.label_detail]
1646-
1647-
output.append(row)
1648-
try:
1649-
sender_idx = summary.exch_to_sender_map[idx]
1650-
# This is an exchange node, so the sender is a fragment root, and
1651-
# should be printed next.
1652-
build_summary_table(summary, sender_idx, True, indent_level, output)
1653-
except (KeyError, TypeError):
1654-
# Fall through if idx not in map, or if exch_to_sender_map itself is
1655-
# not set
1656-
pass
1657-
1658-
idx += 1
1659-
if node.num_children > 0:
1660-
first_child_output = []
1661-
idx = build_summary_table(summary, idx, False, indent_level,
1662-
first_child_output)
1663-
# pylint: disable=unused-variable
1664-
# TODO: is child_idx supposed to be unused? See #120
1665-
for child_idx in range(1, node.num_children):
1666-
# All other children are indented (we only have 0, 1 or 2 children
1667-
# for every exec node at the moment)
1668-
idx = build_summary_table(summary, idx, False, indent_level + 1,
1669-
output)
1670-
output += first_child_output
1671-
return idx

impala/tests/test_impala.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,3 +112,67 @@ def validate_log(cur):
112112
assert len(node.exec_stats) >= node.num_hosts
113113
profile = cur.get_profile()
114114
assert profile is not None
115+
116+
def test_build_summary_table(tmp_db, cur, empty_table):
117+
"""Test build_exec_summary function of impyla.
118+
"""
119+
tmp_db_lower = tmp_db.lower()
120+
# Assert column Operator, #Host, #Inst, #Rows, Est. #Rows, Est. Peak Mem, and Detail.
121+
# Skip column Avg Time, Max Time, and Peak Mem.
122+
123+
def skip_cols(row):
124+
assert len(row) == 10, row
125+
output = list(row)
126+
del output[7]
127+
del output[4]
128+
del output[3]
129+
return output
130+
131+
def validate_summary_table(table, expected):
132+
for i in range(0, len(expected)):
133+
row = skip_cols(table[i])
134+
assert expected[i] == row, 'Expect {0} but found {1}'.format(
135+
str(expected[i]), str(row))
136+
137+
query = """SELECT * FROM {0} a INNER JOIN {1} b ON (a.i = b.i)""".format(
138+
empty_table, empty_table)
139+
cur.execute(query)
140+
cur.fetchall()
141+
summary = cur.get_summary()
142+
output_dop_0 = list()
143+
cur.build_summary_table(summary, output_dop_0)
144+
assert len(output_dop_0) == 8, output_dop_0
145+
expected_dop_0 = [
146+
['F02:ROOT', 1, 1, '', '', '4.00 MB', ''],
147+
['04:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'UNPARTITIONED'],
148+
['F00:EXCHANGE SENDER', 1, 1, '', '', '64.00 KB', ''],
149+
['02:HASH JOIN', 1, 1, '0', '0', '1.94 MB', 'INNER JOIN, BROADCAST'],
150+
['|--03:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'BROADCAST'],
151+
['| F01:EXCHANGE SENDER', 1, 1, '', '', '32.00 KB', ''],
152+
['| 01:SCAN HDFS', 1, 1, '0', '0', '0 B',
153+
'{0}.{1} b'.format(tmp_db_lower, empty_table)],
154+
['00:SCAN HDFS', 1, 1, '0', '0', '0 B',
155+
'{0}.{1} a'.format(tmp_db_lower, empty_table)],
156+
]
157+
validate_summary_table(output_dop_0, expected_dop_0)
158+
159+
cur.execute(query, configuration={'mt_dop': '2'})
160+
cur.fetchall()
161+
summary = cur.get_summary()
162+
output_dop_2 = list()
163+
cur.build_summary_table(summary, output_dop_2)
164+
assert len(output_dop_2) == 9, output_dop_2
165+
expected_dop_2 = [
166+
['F02:ROOT', 1, 1, '', '', '4.00 MB', ''],
167+
['04:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'UNPARTITIONED'],
168+
['F00:EXCHANGE SENDER', 1, 1, '', '', '64.00 KB', ''],
169+
['02:HASH JOIN', 1, 1, '0', '0', '0 B', 'INNER JOIN, BROADCAST'],
170+
['|--F03:JOIN BUILD', 1, 1, '', '', '3.88 MB', ''],
171+
['| 03:EXCHANGE', 1, 1, '0', '0', '16.00 KB', 'BROADCAST'],
172+
['| F01:EXCHANGE SENDER', 1, 1, '', '', '32.00 KB', ''],
173+
['| 01:SCAN HDFS', 1, 1, '0', '0', '0 B',
174+
'{0}.{1} b'.format(tmp_db_lower, empty_table)],
175+
['00:SCAN HDFS', 1, 1, '0', '0', '0 B',
176+
'{0}.{1} a'.format(tmp_db_lower, empty_table)],
177+
]
178+
validate_summary_table(output_dop_2, expected_dop_2)

0 commit comments

Comments
 (0)