Skip to content

Commit 650bdf8

Browse files
committed
the querier is working much better now
1 parent 8b9814f commit 650bdf8

File tree

1 file changed

+46
-35
lines changed

1 file changed

+46
-35
lines changed

powerwatch/analysis/outage_aggregator.py

+46-35
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
#!/usr/bin/env python
22
from pyspark.sql import SparkSession
3-
from pyspark.sql.functions import col, window, asc, desc, lead, lag, udf, hour, month, dayofmonth, collect_list, lit, year, date_trunc, dayofweek
3+
from pyspark.sql.functions import col, window, asc, desc, lead, lag, udf, hour, month, dayofmonth, dayofyear, collect_list, lit, year, date_trunc, dayofweek, when, unix_timestamp
44
import pyspark.sql.functions as F
55
from pyspark.sql.window import Window
66
from pyspark.sql.types import FloatType, IntegerType, DateType, TimestampType
77
from pyspark import SparkConf
8-
import datetime
8+
from datetime import datetime, timedelta
99
import os
1010
from math import isnan
1111
import argparse
1212
import json
13+
import calendar
1314

1415
#read arguments
1516
parser = argparse.ArgumentParser()
@@ -21,51 +22,62 @@
2122
#initiate spark context
2223
spark = SparkSession.builder.appName("SAIDI/SAIFI cluster size").getOrCreate()
2324

24-
#connect to the database
25-
#it's more efficient to do the bulk of filter in the database, especially in the time dimensions
26-
query = "(SELECT core_id, time, is_powered, product_id,millis, last_unplug_millis, last_plug_millis FROM powerwatch WHERE time > '2018-07-01' AND time < '2018-12-01' AND (product_id = 7008 OR product_id = 7009)) alias"
27-
28-
pw_df = spark.read.jdbc("jdbc:postgresql://timescale.ghana.powerwatch.io/powerwatch", query,
29-
properties={"user": args.user, "password": args.password, "driver":"org.postgresql.Driver"})
30-
31-
#caching this in memory up front makes this faster if it all fits
32-
pw_df.cache();
33-
25+
### It's really important that you partition on this data load!!! otherwise your executors will timeout and the whole thing will fail
26+
start_time = '2018-07-01'
27+
end_time = '2019-05-15'
28+
29+
#Roughly one partition per week of data is pretty fast and doesn't take too much chuffling
30+
num_partitions = 30
31+
32+
# This builds a list of predicates to query the data in parrallel. Makes everything much faster
33+
start_time_timestamp = calendar.timegm(datetime.strptime(start_time, "%Y-%m-%d").timetuple())
34+
end_time_timestamp = calendar.timegm(datetime.strptime(end_time, "%Y-%m-%d").timetuple())
35+
stride = (end_time_timestamp - start_time_timestamp)/num_partitions
36+
predicates = []
37+
for i in range(0,num_partitions):
38+
begin_timestamp = start_time_timestamp + i*stride
39+
end_timestamp = start_time_timestamp + (i+1)*stride
40+
pred_string = "time >= '" + datetime.utcfromtimestamp(int(begin_timestamp)).strftime("%Y-%m-%d %H:%M:%S")
41+
pred_string += "' AND "
42+
pred_string += "time < '" + datetime.utcfromtimestamp(int(end_timestamp)).strftime("%Y-%m-%d %H:%M:%S") + "'"
43+
predicates.append(pred_string)
44+
45+
#This query should only get data from deployed devices in the deployment table
46+
query = "(SELECT core_id, time, is_powered, product_id,millis, last_unplug_millis, last_plug_millis FROM powerwatch WHERE time >= '" + start_time + "' AND time < '" + end_time + "' AND (product_id = 7008 OR product_id = 7009 or product_id = 7010 or product_id = 7011 or product_id = 8462)) alias"
47+
48+
pw_df = spark.read.jdbc(
49+
url = "jdbc:postgresql://timescale.ghana.powerwatch.io/powerwatch",
50+
table = query,
51+
predicates = predicates,
52+
properties={"user": args.user, "password": args.password, "driver":"org.postgresql.Driver"})
53+
54+
#if you have multiple saves below this prevents reloading the data every time
55+
pw_df.cache()
3456

3557
#now we need to created a window function that looks at the leading lagging edge of is powered and detects transitions
3658
#then we can filter out all data that is not a transition
37-
def detectTransition(value1, value2):
38-
if(value1 == value2):
39-
return 0
40-
else:
41-
return 1
42-
udfDetectTransition = udf(detectTransition, IntegerType())
4359
w = Window.partitionBy("core_id").orderBy(asc("time"))
44-
is_powered_lag = lag("is_powered",1).over(w)
45-
pw_df = pw_df.withColumn("transition", udfDetectTransition("is_powered",is_powered_lag))
60+
pw_df = pw_df.withColumn("previous_power_state", lag("is_powered").over(w))
4661

47-
#filter out all transitions
48-
pw_df = pw_df.filter("transition != 0")
62+
#filter out every time that the state does not change
63+
pw_df = pw_df.filter(col("previous_power_state") != col("is_powered"))
4964

50-
#now count each outage (really restoration)
51-
def countOutage(value1, value2, value3):
52-
if(value1 == False and value2 == True and value3 == True):
53-
return 1
54-
else:
55-
return 0
56-
udfCountTransition = udf(countOutage, IntegerType())
65+
#now we should only count this if it is an outage (on, off, on)
5766
is_powered_lead = lead("is_powered",1).over(w)
5867
is_powered_lag = lag("is_powered",1).over(w)
59-
pw_df = pw_df.withColumn("outage", udfCountTransition("is_powered", is_powered_lead, is_powered_lag))
68+
pw_df = pw_df.withColumn("lagging_power",is_powered_lag)
69+
pw_df = pw_df.withColumn("leading_power",is_powered_lead)
70+
pw_df = pw_df.withColumn("outage", when((col("is_powered") == 0) & (col("lagging_power") == 1) & (col("leading_power") == 1), 1).otherwise(0))
6071

72+
#now need the most accurate outage time possible for outage event
6173
#now find all the exact outage and restore times using millis
6274
def timeCorrect(time, millis, unplugMillis):
6375
if(unplugMillis == 0 or millis == None or unplugMillis == None or isnan(millis) or isnan(unplugMillis)):
6476
return time
6577
elif unplugMillis > millis:
6678
return time
6779
else:
68-
return time - datetime.timedelta(microseconds = (int(millis)-int(unplugMillis))*1000)
80+
return time - timedelta(microseconds = (int(millis)-int(unplugMillis))*1000)
6981
udftimeCorrect = udf(timeCorrect, TimestampType())
7082
pw_df = pw_df.withColumn("outage_time", udftimeCorrect("time","millis","last_unplug_millis"))
7183
pw_df = pw_df.withColumn("r_time", udftimeCorrect("time","millis","last_plug_millis"))
@@ -77,7 +89,6 @@ def timeCorrect(time, millis, unplugMillis):
7789
#now filter out everything that is not an outage. We should have a time and end_time for every outage
7890
pw_df = pw_df.filter("outage != 0")
7991

80-
8192
#record the duration of the outage
8293
def calculateDuration(startTime, endTime):
8394
delta = endTime-startTime
@@ -88,7 +99,7 @@ def calculateDuration(startTime, endTime):
8899
pw_df = pw_df.withColumn("outage_duration", udfcalculateDuration("outage_time","restore_time"))
89100

90101
window_size = 150
91-
w = Window.orderBy(asc("outage_time")).rowsBetween(-1*window_size,window_size)
102+
w = Window.partitionBy(dayofyear("outage_time")).orderBy(asc("outage_time")).rowsBetween(-1*window_size,window_size)
92103
pw_df = pw_df.withColumn("outage_window_list",collect_list(F.struct("outage_time","core_id")).over(w))
93104

94105
def filterOutage(time, core_id, timeList):
@@ -111,7 +122,7 @@ def filterOutage(time, core_id, timeList):
111122
pw_df = pw_df.withColumn("outage_number",lit(1))
112123

113124
#okay now we have a list of all outages where at least one other device also had an outage within a time window
114-
125+
#pw_df.cache()
115126

116127
### SAIFI ###
117128
#note that this the raw number of sensors that go out rather than a single metric per "outage"
@@ -134,7 +145,7 @@ def filterOutage(time, core_id, timeList):
134145
outages_by_hour = outages_by_hour.groupBy("outage_date_hour").sum()
135146
outages_by_hour = outages_by_hour.withColumn("outage_hour", hour("outage_date_hour"))
136147
outages_by_hour = outages_by_hour.groupBy("outage_hour").avg().orderBy("outage_hour")
137-
outages_by_hour.show()
148+
outages_by_hour.show(30)
138149

139150

140151
#pw_df = pw_df.select("time","core_id","outage_duration","outage_number")

0 commit comments

Comments
 (0)