-
Notifications
You must be signed in to change notification settings - Fork 139
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug]: Simple processing of 1M rows takes more than 10 minutes #21
Comments
Additionally, I was not really able to run Pathway with multiple processes on WSL Linux (there was some kind of TCP issue?). What addresses do the processes use for inter-process communication? Is it some kind of internal server? |
The culprit here is the pw.io.csv.write(clean_data, 'clean_data.csv')
pw.run() it should be significantly faster and additionally it will provide some monitoring output.
What you have done is probably the best way currently. |
Hi @embe-pw, thanks a lot for taking a look! This helped, now the ingestion is fast. However, I am seeing that subsequent operations (sliding windows, group by, join) on 1M rows of data takes a long time. At least 5 minutes. Is this expected? Are there any benchmarks for Pathway to compare the performance on my machine? Or what is the best way to profile Pathway? |
So I have this little Pathway snippet that takes a long time to execute (10 minutes+). Can you please advise, what can be the reason why? The data is a simple CSV with 2 columns: datetime, val1(float). I read the file with a csv reader and then write it with a csv writer (static or streaming mode) import datetime as dt
import pathway as pw
import math
def myfunc(val1: float, val2: float) -> float:
return math.log(val1) / val2
def calculate(data_stream):
data1 = data_stream
data2 = data1.copy()
calc = (data1.interval_join(data2,
data1.datetime,
data2.datetime,
pw.temporal.interval(-dt.timedelta(minutes=1), -dt.timedelta(minutes=1)))
.select(datetime=data1.datetime,
val3=pw.apply_with_type(myfunc, float, data1.val1, data2.val1)))
agg = (calc.windowby(pw.this.datetime,
window=pw.temporal.sliding(hop=dt.timedelta(minutes=1), duration=dt.timedelta(minutes=50)),
behavior=pw.temporal.exactly_once_behavior())
.reduce(datetime=pw.this._pw_window_end,
avg=pw.reducers.avg(pw.this.val3)))
return agg It's very hard to debug what could be the reason behind this slowdown. Is it some kind of (silent) engine error on my machine? Or maybe the sliding window is silently failing / taking too long? How can I profile the performance of this query to find the slowest parts? My version is 0.8.3 |
Hey @ilyanoskov, |
Hey @ilyanoskov, I can confirm that the code can be slow and there's nothing incorrect on your side. The optimization of sliding windows is planned for the future. def calculate(data_stream):
data1 = data_stream
data2 = data1.copy()
calc = (data1.interval_join(data2,
data1.datetime,
data2.datetime,
pw.temporal.interval(-dt.timedelta(minutes=1), -dt.timedelta(minutes=1)))
.select(datetime=data1.datetime,
val3=pw.apply_with_type(myfunc, float, data1.val1, data2.val1)))
agg = (
calc.windowby(
pw.this.datetime,
window=pw.temporal.tumbling(duration=dt.timedelta(minutes=1)),
behavior=pw.temporal.exactly_once_behavior(),
)
.reduce(
datetime=pw.this._pw_window_start,
sum=pw.reducers.sum(pw.this.val3),
cnt=pw.reducers.count(),
)
.windowby(
pw.this.datetime,
window=pw.temporal.sliding(
hop=dt.timedelta(minutes=1), duration=dt.timedelta(minutes=50)
),
)
.reduce(
datetime=pw.this._pw_window_end,
avg=pw.reducers.sum(pw.this.sum) / pw.reducers.sum(pw.this.cnt),
)
)
return agg |
Thank you! How long does it run now? |
I don't have access to your data and the execution time depends on the data distribution (it influences the number of rows that is produced in |
@KamilPiechowiak Thanks for this fix. @ilyanoskov Thanks for reporting this. Happy to know if the improved version addresses your efficiency concern sufficiently. |
@KamilPiechowiak @KamilPiechowiak thanks a lot for taking a look. I am afraid 25 seconds is still quite a long time for this, Clickhouse takes ~15seconds to calculate similar features (but 2x more and over more windows), also Polars is at around the same performance as Clickhouse (in my tests). I also understand that these are not really streaming frameworks and it's not a fair comparison, but here I am looking for the fastest way to calculate the features and then keep them up to date. I was doing a POC for my use case with this small example, but in reality I will be calculating hundreds of features across many windows, and a much larger dataset (~1B records). So it looks like Pathway is perhaps not the best tool here. Unless there is a good way to pre-compute the large dataset of features (with Clickhouse or Polars) and then update it with Pathway incrementally in a streaming fashion? But then that makes one need to write two versions of the code, one for batch and one for streaming... P.S. I think it would be really good to introduce some kind of benchmark for window aggregation with Pathway, just to be able to track it's performance |
@ilyanoskov It's an excellent point, in a mixed setup like this, usually one of the two costs/efforts (1. batch 2. streaming/incremental) dominates the other. Most use cases focus on optimizing the streaming part only as it will dominate overall cost/effort by orders of magnitude, and it doesn't make sense to overfocus on the batch part as it will remain relatively small, even if it's a factor of say 5x away from optimal - BUT your use case may be different.
Thanks for the idea, @KamilPiechowiak I'm all for adding this one to benchmarking in CI/CD tests. |
@ilyanoskov, did you take the number from my comment or did you measure it? I measured it on the data I generated on my own, so the distribution might be different than yours. For example in my dataset each input entry to interval join results in 121 output entries. If your dataset is less dense the number will be smaller and the computation faster.
Yes, we can add it. |
Steps to reproduce
I have a simple Pathway function that takes more than 30 seconds to run. I am curious if this is expected or there is something wrong? How long does this take for you?
The code is:
Just create a CSV file with 1M rows to test this.
Now, I am trying to understand why is this taking so long. What is the best way to profile Pathway performance? Also, what is the best way to load the data with the DateTimeNaive datatype from CSV? The logs from previous runs are telling me
parsing DateTimeNaive from an external datasource is not supported
.Relevant log output
What did you expect to happen?
I expected that this operation would take a few hundred ms at best? Or maybe a second.
Version
0.8.3
Docker Versions (if used)
No response
OS
Linux
On which CPU architecture did you run Pathway?
x86-64
The text was updated successfully, but these errors were encountered: