-
Notifications
You must be signed in to change notification settings - Fork 16
Description
Description
I'm not sure where the memory leak is, but it appears we're leaking memory (and performance possibly) from Deephaven UI in Python.
Steps to reproduce
Ran this code from @alexpeters1208 and let it sit for a bit. Re-ran the code a few times and changed a few inputs. My memory usage grew pretty significantly while I was idling and at one point was listing 6 GB.
Code collapsed since it's big
from deephaven import ui, agg, empty_table
from deephaven.stream.table_publisher import table_publisher
from deephaven.stream import blink_to_append_only
from deephaven.plot import express as dx
from deephaven import updateby as uby
from deephaven import dtypes as dht
stocks = dx.data.stocks().reverse()
@ui.component
def line_plot(
filtered_source,
exchange, window_size, bol_bands):
window_size_key = {
"5 seconds": ("priceAvg5s", "priceStd5s"),
"30 seconds": ("priceAvg30s", "priceStd30s"),
"1 minute": ("priceAvg1m", "priceStd1m"),
"5 minutes": ("priceAvg5m", "priceStd5m")}
bol_bands_key = {"None": None, "80%": 1.282, "90%": 1.645, "95%": 1.960, "99%": 2.576}
if exchange == "ALL":
base_plot = dx.line(filtered_source, x="timestamp", y="price", by="exchange",
unsafe_update_figure=lambda fig: fig.update_traces(opacity=0.4))
else:
base_plot = dx.line(filtered_source, x="timestamp", y="price",
unsafe_update_figure=lambda fig: fig.update_traces(opacity=0.4))
avg_plot = dx.line(filtered_source,
x="timestamp", y=window_size_key[window_size][0],
color_discrete_sequence=["orange"],
labels={window_size_key[window_size][0]: "Rolling Average"})
def set_bol_properties(fig):
fig.update_layout(showlegend=False)
fig.update_traces(fill="tonexty", fillcolor='rgba(255,165,0,0.08)')
if bol_bands_key[bol_bands] is not None:
bol_plot = dx.line(filtered_source \
.update([
f"errorY={window_size_key[window_size][0]} + {bol_bands_key[bol_bands]}*{window_size_key[window_size][1]}",
f"errorYMinus={window_size_key[window_size][0]} - {bol_bands_key[bol_bands]}*{window_size_key[window_size][1]}",
]),
x="timestamp", y=["errorYMinus", "errorY"],
color_discrete_sequence=["rgba(255,165,0,0.3)", "rgba(255,165,0,0.3)"],
unsafe_update_figure=set_bol_properties)
return ui.panel(dx.layer(base_plot, avg_plot, bol_plot), title="Prices")
return ui.panel(dx.layer(base_plot, avg_plot), title="Prices")
@ui.component
def full_table(source):
return ui.panel(source, title="Full Table")
@ui.component
def filtered_table(source):
return ui.panel(source \
.drop_columns([
"priceAvg5s", "priceStd5s", "priceAvg30s", "priceStd30s",
"priceAvg1m", "priceStd1m", "priceAvg5m", "priceStd5m"]) \
.reverse(), title="Filtered Table")
@ui.component
def parameters_panel(
symbols,
exchanges,
symbol, set_symbol,
exchange, set_exchange,
window_size, set_window_size,
bol_bands, set_bol_bands):
symbol_picker = ui.picker(
*symbols,
label="Symbol",
on_selection_change=set_symbol,
selected_key=symbol,
marginX=10,
marginTop=10
)
exchange_picker = ui.picker(
*exchanges,
label="Exchange",
on_selection_change=set_exchange,
selected_key=exchange,
marginX=10,
marginTop=10
)
window_size_selector = ui.button_group(
ui.button("5 seconds", on_press=lambda: set_window_size("5 seconds")),
ui.button("30 seconds", on_press=lambda: set_window_size("30 seconds")),
ui.button("1 minute", on_press=lambda: set_window_size("1 minute")),
ui.button("5 minutes", on_press=lambda: set_window_size("5 minutes")),
margin_x=10
)
bolinger_band_selector = ui.button_group(
ui.button("None", on_press=lambda: set_bol_bands("None")),
ui.button("80%", on_press=lambda: set_bol_bands("80%")),
ui.button("90%", on_press=lambda: set_bol_bands("90%")),
ui.button("95%", on_press=lambda: set_bol_bands("95%")),
ui.button("99%", on_press=lambda: set_bol_bands("99%")),
margin_x=10
)
return ui.panel(
ui.row(
symbol_picker,
exchange_picker
),
ui.row(
ui.column(
ui.text(
"Window size:",
margin_x=10,
marginTop=20,
margin_bottom=10
),
window_size_selector
)
),
ui.row(
ui.column(
ui.text(
"Bolinger bands:",
margin_x=10,
marginTop=20,
margin_bottom=10
),
bolinger_band_selector
)
),
title="Parameters"
)
@ui.component
def orderbook_panel(symbols):
symbol, set_symbol = ui.use_state("")
size, set_size = ui.use_state(0)
blink_table, publisher = ui.use_memo(
lambda: table_publisher(
"Order table", {"sym": dht.string, "size": dht.int32, "side": dht.string}
),
[],
)
t = ui.use_memo(lambda: blink_to_append_only(blink_table), [blink_table])
def submit_order(order_sym, order_size, side):
publisher.add(
empty_table(1).update(
[f"sym=`{order_sym}`", f"size={order_size}", f"side=`{side}`"]
)
)
def handle_buy(_):
submit_order(symbol, size, "buy")
def handle_sell(_):
submit_order(symbol, size, "sell")
symbol_picker = ui.picker(
*symbols,
label="Symbol",
label_position="side",
on_selection_change=set_symbol,
selected_key=symbol,
marginX=10,
marginTop=4
)
size_selector = ui.number_field(
label="Size",
label_position="side",
value=size,
on_change=set_size
)
return ui.panel(
ui.flex(
symbol_picker,
size_selector,
ui.button("Buy", on_press=handle_buy, variant="accent", style="fill"),
ui.button("Sell", on_press=handle_sell, variant="negative", style="fill"),
gap=10,
marginX=10,
marginTop=4,
wrap=True,
),
t,
title="Order Book"
)
@ui.component
def my_layout(source, source_with_stats):
# lists of symbols and exchanges will inform drop-down selectors
symbols = ui.use_column_data(source.agg_by(agg.unique(cols="sym"), by="sym"))
exchanges = ui.use_column_data(source.agg_by(agg.unique(cols="exchange"), by="exchange"))
exchanges.append("ALL")
# state variables
symbol, set_symbol = ui.use_state(symbols[0])
exchange, set_exchange = ui.use_state("ALL")
window_size, set_window_size = ui.use_state("30 seconds")
bol_bands, set_bol_bands = ui.use_state("90%")
# use state variables to par down source table before sending off to other functions
if exchange == "ALL":
single_symbol = source_with_stats \
.where([f"sym == `{symbol}`"]) \
.drop_columns([
"priceAvg5s", "priceStd5s", "priceAvg30s", "priceStd30s",
"priceAvg1m", "priceStd1m", "priceAvg5m", "priceStd5m"]) \
.rename_columns([
"priceAvg5s=priceAvg5sAvg", "priceStd5s=priceStd5sAvg",
"priceAvg30s=priceAvg30sAvg", "priceStd30s=priceStd30sAvg",
"priceAvg1m=priceAvg1mAvg", "priceStd1m=priceStd1mAvg",
"priceAvg5m=priceAvg5mAvg", "priceStd5m=priceStd5mAvg"])
else:
single_symbol = source_with_stats \
.where([f"sym == `{symbol}`", f"exchange == `{exchange}`"]) \
.drop_columns([
"priceAvg5sAvg", "priceStd5sAvg", "priceAvg30sAvg", "priceStd30sAvg",
"priceAvg1mAvg", "priceStd1mAvg", "priceAvg5mAvg", "priceStd5mAvg"]) \
return ui.row(
ui.column(
line_plot(
single_symbol, exchange, window_size, bol_bands
),
ui.stack(
full_table(source),
filtered_table(single_symbol),
),
width=65
),
ui.column(
ui.row(
parameters_panel(
symbols, exchanges,
symbol, set_symbol,
exchange, set_exchange,
window_size, set_window_size,
bol_bands, set_bol_bands
),
height=40
),
ui.row(
orderbook_panel(symbols),
height=60
),
width=35
)
)
# precompute all of the rolling statistics
sorted_stocks = stocks.sort("timestamp")
stocks_with_stats = sorted_stocks \
.update_by([
uby.rolling_avg_time("timestamp", "priceAvg5sAvg=price", "PT2.5s", "PT2.5s"),
uby.rolling_avg_time("timestamp", "priceAvg30sAvg=price", "PT15s", "PT15s"),
uby.rolling_avg_time("timestamp", "priceAvg1mAvg=price", "PT30s", "PT30s"),
uby.rolling_avg_time("timestamp", "priceAvg5mAvg=price", "PT150s", "PT150s"),
uby.rolling_std_time("timestamp", "priceStd5sAvg=price", "PT2.5s", "PT2.5s"),
uby.rolling_std_time("timestamp", "priceStd30sAvg=price", "PT15s", "PT15s"),
uby.rolling_std_time("timestamp", "priceStd1mAvg=price", "PT30s", "PT30s"),
uby.rolling_std_time("timestamp", "priceStd5mAvg=price", "PT150s", "PT150s"),
], by = ["sym"]) \
.update_by([
uby.rolling_avg_time("timestamp", "priceAvg5s=price", "PT2.5s", "PT2.5s"),
uby.rolling_avg_time("timestamp", "priceAvg30s=price", "PT15s", "PT15s"),
uby.rolling_avg_time("timestamp", "priceAvg1m=price", "PT30s", "PT30s"),
uby.rolling_avg_time("timestamp", "priceAvg5m=price", "PT150s", "PT150s"),
uby.rolling_std_time("timestamp", "priceStd5s=price", "PT2.5s", "PT2.5s"),
uby.rolling_std_time("timestamp", "priceStd30s=price", "PT15s", "PT15s"),
uby.rolling_std_time("timestamp", "priceStd1m=price", "PT30s", "PT30s"),
uby.rolling_std_time("timestamp", "priceStd5m=price", "PT150s", "PT150s"),
], by = ["sym", "exchange"])
dashboard = ui.dashboard(my_layout(stocks, stocks_with_stats))Expected results
Relatively stable memory usage
Actual results
Memory growth at a much more rapid pace than the table should grow.
Additional details and attachments
I'm not quite sure what the issue is, but we might want to look at liveness scope and make sure we're not retaining any references in things like use_memo. Also need to ensure the client is closing connections to unused tables and not keeping things open on the server.
There was some slowness reported while developing this dashboard, so it was probably run many more times than I ran it to see memory growth.
Versions
Deephaven UI: 0.11.0
Engine Version: 0.34.0-SNAPSHOT
Web UI Version: 0.69.1
Java Version: 17.0.10
Barrage Version: 0.6.0
Browser Name: Chrome 123
OS Name: Linux