Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

increase commenting of indicators #37

Merged
merged 1 commit into from
Jun 17, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
increase commenting of indicators
  • Loading branch information
sebxwolf committed Jun 17, 2020
commit 08f1d7cf94467e7079c979c1d5662bbe7da42386
200 changes: 179 additions & 21 deletions cdr-aggregation/notebooks/modules/priority_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,47 +282,122 @@ def attempt_aggregation(self,
if attempts == 4:
print('Tried creating and saving indicators 4 times, but failed.')

##### Priority Indicators

## Indicator 1

######## Priority Indicators ########



#### Indicator 1

# result:
# - apply sample period filter
# - groupby region and frequency
# - then count observations
# - apply privacy filter

def transactions(self, time_filter, frequency):

result = self.df.where(time_filter)\
.groupby(frequency, 'region')\
.count()\
.where(F.col('count') > self.privacy_filter)

return result

## Indicator 2 + 3


#### Indicator 2 + 3

# result:
# - apply sample period filter
# - groupby region and frequency
# - then count distinct sims
# - apply privacy filter

def unique_subscribers(self, time_filter, frequency):

result = self.df.where(time_filter)\
.groupby(frequency, 'region')\
.agg(F.countDistinct('msisdn').alias('count'))\
.where(F.col('count') > self.privacy_filter)

return result

## Indicator 3


#### Indicator 3

# result:
# - apply sample period filter
# - groupby frequency
# - then count distinct sims
# - apply privacy filter

def unique_subscribers_country(self, time_filter, frequency):

result = self.df.where(time_filter)\
.groupby(frequency)\
.agg(F.countDistinct('msisdn').alias('count'))\
.where(F.col('count') > self.privacy_filter)

return result

## Indicator 4


#### Indicator 4

# prep:
# - apply sample period filter
# - then count distinct sims over full sample period

# result:
# - compute distinct sims per day
# - calculate percentage of distinct sim per day / distinct sim over full sample period

def percent_of_all_subscribers_active(self, time_filter, frequency):

prep = self.df.where(time_filter)\
.select('msisdn')\
.distinct()\
.count()

result = self.unique_subscribers_country(
time_filter, frequency).withColumn('percent_active',
F.col('count') / prep)

return result

## Indicator 5


#### Indicator 5

# assert correct frequency

# result (intra-day od):
# - get intra-day od matrix using flowminder definition

# prep (inter-day od):
# - apply sample period filter
# - create timestamp lag per user
# - create day lag per user, with a max calue of 7 days
# - filter for observations that involve a day change (cause we have intra-day already)
# - also filter for region changes only, since we are computing od matrix
# - groupby o(rigin) and (d)estination, and frequency
# - count observations
# - apply privacy filter

# result (joining intra-day od (result) and inter-day od (prep)):
# - join on o, d, and frequency
# - fill columns for NA's that arose in merge, so that we have complete columns
# - compute total od summing intra and inter od count

def origin_destination_connection_matrix(self, time_filter, frequency):

assert frequency == 'day', 'This indicator is only defined for daily frequency'

result = self.spark.sql(self.sql_code['directed_regional_pair_connections_per_day'])

prep = self.df.where(time_filter)\
.withColumn('call_datetime_lag', F.lag('call_datetime').over(user_window))\
.withColumn('day_lag',
Expand All @@ -334,6 +409,7 @@ def origin_destination_connection_matrix(self, time_filter, frequency):
.groupby(frequency, 'region', 'region_lag')\
.agg(F.count(F.col('msisdn')).alias('od_count'))\
.where(F.col('od_count') > 15)

result = result.join(prep, (prep.region == result.region_to)\
& (prep.region_lag == result.region_from)\
& (prep.day == result.connection_date), 'full')\
Expand All @@ -346,27 +422,34 @@ def origin_destination_connection_matrix(self, time_filter, frequency):
.na.fill({'od_count' : 0, 'subscriber_count' : 0})\
.withColumn('total_count', F.col('subscriber_count') + F.col('od_count'))\
.drop('region').drop('region_lag').drop('day')

return result

## Indicator 6 helper method


#### Indicator 6 helper method to find home locations for a given frequency

# define user-day and user-frequency windows

# result:
# - apply sample period filter
# - Get last timestamp of the day for each user
# - Dummy when an observation is the last of the day
# - Count how many times a region is the last of the day
# - Dummy for the region with highest count
# - Keep only the region with higest count
# - Keep only one observation of that region

def assign_home_locations(self, time_filter, frequency):
# A window for each user-day with desending time

user_day = Window\
.orderBy(F.desc_nulls_last('call_datetime'))\
.partitionBy('msisdn', 'day')
# A window for each user-frequency with descending region count

user_frequency = Window\
.orderBy(F.desc_nulls_last('last_region_count'))\
.partitionBy('msisdn', frequency)

# Steps:
# Get last timestamp of the day for each user
# Dummy when an observation is the last of the day
# Count how many times a region is the last of the day
# Dummy for the region with highest count
# Keep only the region with higest count
# Keep only one observation of that region

result = self.df.where(time_filter)\
.na.fill({'region' : self.missing_value_code })\
.withColumn('last_timestamp',
Expand All @@ -383,18 +466,45 @@ def assign_home_locations(self, time_filter, frequency):
.where(F.col('modal_region') == 1)\
.groupby('msisdn', frequency)\
.agg(F.last('region').alias('home_region'))

return result

## Indicator 6 + 11


#### Indicator 6 + 11

# result:
# - find home locations using helper method
# - group by frequency and home region
# - count observations
# - apply privacy_filter

def unique_subscriber_home_locations(self, time_filter, frequency):

result = self.assign_home_locations(time_filter, frequency)\
.groupby(frequency, 'home_region')\
.count()\
.where(F.col('count') > self.privacy_filter)

return result

## Indicator 7 + 8


#### Indicator 7 + 8

# prep:
# - apply sample period filter
# - create time and location lags

# result:
# - join prep with distances matrix
# - group by user, frequency and home region
# - sum distances
# - group by frequency and home region
# - get mean and standard deviation of distance

def mean_distance(self, time_filter, frequency):

prep = self.df.where(time_filter)\
.withColumn('location_id_lag', F.lag('location_id').over(user_window))\
.withColumn('call_datetime_lag', F.lag('call_datetime').over(user_window))\
Expand All @@ -403,6 +513,7 @@ def mean_distance(self, time_filter, frequency):
F.col('call_datetime_lag').cast('long')) <= (self.cutoff_days * 24 * 60 * 60),
F.lag('location_id').over(user_window))\
.otherwise(None))

result = prep.join(self.distances_df,
(prep.location_id==self.distances_df.destination) &\
(prep.location_id_lag==self.distances_df.origin),
Expand All @@ -412,11 +523,36 @@ def mean_distance(self, time_filter, frequency):
.groupby('home_region', frequency)\
.agg(F.mean('distance').alias('mean_distance'),
F.stddev_pop('distance').alias('stdev_distance'))

return result

## Indicator 9


#### Indicator 9

# get home_locations

# prep (day locations):
# - create timestamp lead with max value set to end of sample period
# - calculate duration using lead timestamp
# - constrain max duration to cutoff_days
# - group by user, region and frequency (keeping home_location_frequency)
# - get total duration
# - group by user and frequency (keeping home_location_frequency)
# - get the region with the longest duration
# - rename vars to avoid duplicates in merge

# result:
# - merge home with day locations per user and home_location_frequency
# - fill NAs that arose in merge
# - group by home_region, region and frequency
# - caclulate mean, standard deviation of duration and count sims
# - apply privacy filter

def home_vs_day_location(self, time_filter, frequency, home_location_frequency = 'week', **kwargs):

home_locations = self.assign_home_locations(time_filter, home_location_frequency)

prep = self.df.where(time_filter)\
.withColumn('call_datetime_lead',
F.when(F.col('call_datetime_lead').isNull(),
Expand All @@ -432,6 +568,7 @@ def home_vs_day_location(self, time_filter, frequency, home_location_frequency =
.agg(F.last('region').alias('region'), F.last('total_duration').alias('duration'))\
.withColumnRenamed('msisdn', 'msisdn2')\
.withColumnRenamed(home_location_frequency, home_location_frequency + '2')

result = prep.join(home_locations,
(prep.msisdn2 == home_locations.msisdn) & \
(prep[home_location_frequency + '2'] == \
Expand All @@ -442,11 +579,31 @@ def home_vs_day_location(self, time_filter, frequency, home_location_frequency =
F.stddev_pop('duration').alias('stdev_duration'),
F.count('msisdn').alias('count'))\
.where(F.col('count') > self.privacy_filter)

return result

## Indicator 10


#### Indicator 10

# result:
# - apply sample period filter
# - drop all observations that don't imply a region change from lag or lead
# - create timestamp lead, recplaing missing values with end of sample period
# - calculate duration
# - constrain duration to max seven days
# - get the lead duration
# - sum durations for stops without lead switch
# - set max duration to 21 days
# - get lag duration
# - drop all observations with lead rather than lag switch
# - group by frequency and origin (lag) and destination (lead)
# - calculate avg, std, sums and counts of o and d durations

def origin_destination_matrix_time(self, time_filter, frequency):

user_frequency_window = Window.partitionBy('msisdn').orderBy('call_datetime')

result = self.df.where(time_filter)\
.where((F.col('region_lag') != F.col('region')) | \
(F.col('region_lead') != F.col('region')) | \
Expand Down Expand Up @@ -477,4 +634,5 @@ def origin_destination_matrix_time(self, time_filter, frequency):
F.avg('duration_change_only_lag').alias('avg_duration_origin'),
F.count('duration_change_only_lag').alias('count_origin'),
F.stddev_pop('duration_change_only_lag').alias('stddev_duration_origin'))

return result