Skip to content

Commit 4ff7510

Browse files
bstablerBlake Rosenthal
andauthored
write trip matrices (#311)
require newer pandas add ability to save trips table to pipeline and begin to finish example all time periods fix trip scheduling bug add support for odt skims as well Co-authored-by: Ben Stabler <benstabler@yahoo.com> Co-authored-by: Blake Rosenthal <blake.rosenthal@rsginc.com>
1 parent 4255f03 commit 4ff7510

File tree

14 files changed

+616
-7
lines changed

14 files changed

+616
-7
lines changed

activitysim/abm/models/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@
2727
from . import trip_purpose
2828
from . import trip_purpose_and_destination
2929
from . import trip_scheduling
30+
from . import trip_matrices
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
# ActivitySim
2+
# See full license in LICENSE.txt.
3+
4+
import logging
5+
6+
import openmatrix as omx
7+
import pandas as pd
8+
import numpy as np
9+
10+
from activitysim.core import config
11+
from activitysim.core import inject
12+
from activitysim.core import pipeline
13+
14+
from .util import expressions
15+
from .util.expressions import skim_time_period_label
16+
17+
logger = logging.getLogger(__name__)
18+
19+
20+
@inject.step()
21+
def write_trip_matrices(trips, skim_dict, skim_stack):
22+
"""
23+
Write trip matrices step.
24+
25+
Adds boolean columns to local trips table via annotation expressions,
26+
then aggregates trip counts and writes OD matrices to OMX. Save annotated
27+
trips table to pipeline if desired.
28+
"""
29+
30+
model_settings = config.read_model_settings('write_trip_matrices.yaml')
31+
trips_df = annotate_trips(trips, skim_dict, skim_stack, model_settings)
32+
33+
if bool(model_settings.get('SAVE_TRIPS_TABLE')):
34+
pipeline.replace_table('trips', trips_df)
35+
36+
logger.info('Aggregating trips...')
37+
aggregate_trips = trips_df.groupby(['origin', 'destination'], sort=False).sum()
38+
logger.info('Finished.')
39+
40+
orig_vals = aggregate_trips.index.get_level_values('origin')
41+
dest_vals = aggregate_trips.index.get_level_values('destination')
42+
43+
zone_index = pipeline.get_table('land_use').index
44+
assert all(zone in zone_index for zone in orig_vals)
45+
assert all(zone in zone_index for zone in dest_vals)
46+
47+
_, orig_index = zone_index.reindex(orig_vals)
48+
_, dest_index = zone_index.reindex(dest_vals)
49+
50+
write_matrices(aggregate_trips, zone_index, orig_index, dest_index, model_settings)
51+
52+
53+
def annotate_trips(trips, skim_dict, skim_stack, model_settings):
54+
"""
55+
Add columns to local trips table. The annotator has
56+
access to the origin/destination skims and everything
57+
defined in the model settings CONSTANTS.
58+
59+
Pipeline tables can also be accessed by listing them under
60+
TABLES in the preprocessor settings.
61+
"""
62+
63+
trips_df = trips.to_frame()
64+
65+
trace_label = 'trip_matrices'
66+
67+
# setup skim keys
68+
assert ('trip_period' not in trips_df)
69+
trips_df['trip_period'] = skim_time_period_label(trips_df.depart)
70+
od_skim_wrapper = skim_dict.wrap('origin', 'destination')
71+
odt_skim_stack_wrapper = skim_stack.wrap(left_key='origin', right_key='destination',
72+
skim_key='trip_period')
73+
skims = {
74+
'od_skims': od_skim_wrapper,
75+
"odt_skims": odt_skim_stack_wrapper
76+
}
77+
78+
locals_dict = {}
79+
constants = config.get_model_constants(model_settings)
80+
if constants is not None:
81+
locals_dict.update(constants)
82+
83+
expressions.annotate_preprocessors(
84+
trips_df, locals_dict, skims,
85+
model_settings, trace_label)
86+
87+
# Data will be expanded by an expansion weight column from
88+
# the households pipeline table, if specified in the model settings.
89+
hh_weight_col = model_settings.get('HH_EXPANSION_WEIGHT_COL')
90+
91+
if hh_weight_col and hh_weight_col not in trips_df:
92+
logger.info("adding '%s' from households to trips table" % hh_weight_col)
93+
household_weights = pipeline.get_table('households')[hh_weight_col]
94+
trips_df[hh_weight_col] = trips_df.household_id.map(household_weights)
95+
96+
return trips_df
97+
98+
99+
def write_matrices(aggregate_trips, zone_index, orig_index, dest_index, model_settings):
100+
"""
101+
Write aggregated trips to OMX format.
102+
103+
The MATRICES setting lists the new OMX files to write.
104+
Each file can contain any number of 'tables', each specified by a
105+
table key ('name') and a trips table column ('data_field') to use
106+
for aggregated counts.
107+
108+
Any data type may be used for columns added in the annotation phase,
109+
but the table 'data_field's must be summable types: ints, floats, bools.
110+
"""
111+
112+
matrix_settings = model_settings.get('MATRICES')
113+
114+
if not matrix_settings:
115+
logger.error('Missing MATRICES setting in write_trip_matrices.yaml')
116+
117+
for matrix in matrix_settings:
118+
filename = matrix.get('file_name')
119+
filepath = config.output_file_path(filename)
120+
logger.info('opening %s' % filepath)
121+
file = omx.open_file(filepath, 'w') # possibly overwrite existing file
122+
table_settings = matrix.get('tables')
123+
124+
for table in table_settings:
125+
table_name = table.get('name')
126+
col = table.get('data_field')
127+
128+
if col not in aggregate_trips:
129+
logger.error('missing %s column in %s DataFrame' % (col, aggregate_trips.name))
130+
return
131+
132+
hh_weight_col = model_settings.get('HH_EXPANSION_WEIGHT_COL')
133+
if hh_weight_col:
134+
aggregate_trips[col] = aggregate_trips[col] / aggregate_trips[hh_weight_col]
135+
136+
data = np.zeros((len(zone_index), len(zone_index)))
137+
data[orig_index, dest_index] = aggregate_trips[col]
138+
logger.info('writing %s' % table_name)
139+
file[table_name] = data # write to file
140+
141+
# include the index-to-zone map in the file
142+
logger.info('adding %s mapping for %s zones to %s' %
143+
(zone_index.name, zone_index.size, filename))
144+
file.create_mapping(zone_index.name, zone_index.to_numpy())
145+
146+
logger.info('closing %s' % filepath)
147+
file.close()

activitysim/abm/models/trip_scheduling.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ def schedule_trips_in_leg(
319319
trips = trips.sort_index()
320320
trips['next_trip_id'] = np.roll(trips.index, -1 if outbound else 1)
321321
is_final = (trips.trip_num == trips.trip_count) if outbound else (trips.trip_num == 1)
322-
trips.next_trip_id = trips.next_trip_id.where(is_final, NO_TRIP_ID)
322+
trips.next_trip_id = trips.next_trip_id.where(~is_final, NO_TRIP_ID)
323323

324324
# iterate over outbound trips in ascending trip_num order, skipping the initial trip
325325
# iterate over inbound trips in descending trip_num order, skipping the finial trip

activitysim/abm/tables/households.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,11 @@
2020
def households(households_sample_size, override_hh_ids, trace_hh_id):
2121

2222
df_full = read_input_table("households")
23-
households_sliced = False
23+
tot_households = df_full.shape[0]
24+
25+
logger.info("full household list contains %s households" % tot_households)
2426

25-
logger.info("full household list contains %s households" % df_full.shape[0])
27+
households_sliced = False
2628

2729
# only using households listed in override_hh_ids
2830
if override_hh_ids is not None:
@@ -48,9 +50,9 @@ def households(households_sample_size, override_hh_ids, trace_hh_id):
4850
households_sliced = True
4951

5052
# if we need a subset of full store
51-
elif households_sample_size > 0 and df_full.shape[0] > households_sample_size:
53+
elif tot_households > households_sample_size > 0:
5254

53-
logger.info("sampling %s of %s households" % (households_sample_size, df_full.shape[0]))
55+
logger.info("sampling %s of %s households" % (households_sample_size, tot_households))
5456

5557
"""
5658
Because random seed is set differently for each step, sampling of households using
@@ -80,6 +82,14 @@ def households(households_sample_size, override_hh_ids, trace_hh_id):
8082
# persons table
8183
inject.add_injectable('households_sliced', households_sliced)
8284

85+
if 'sample_rate' not in df.columns:
86+
if households_sample_size == 0:
87+
sample_rate = 1
88+
else:
89+
sample_rate = round(households_sample_size / tot_households, 3)
90+
91+
df['sample_rate'] = sample_rate
92+
8393
logger.info("loaded households %s" % (df.shape,))
8494

8595
# FIXME - pathological knowledge of name of chunk_id column used by chunked_choosers_by_chunk_id

activitysim/abm/test/test_pipeline.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,12 @@
44
import logging
55
import pkg_resources
66

7+
import openmatrix as omx
8+
import numpy as np
9+
import numpy.testing as npt
10+
711
import pandas as pd
8-
import pandas.util.testing as pdt
12+
import pandas.testing as pdt
913
import pytest
1014
import yaml
1115

@@ -17,6 +21,7 @@
1721

1822
# set the max households for all tests (this is to limit memory use on travis)
1923
HOUSEHOLDS_SAMPLE_SIZE = 100
24+
HOUSEHOLDS_SAMPLE_RATE = 0.02 # HOUSEHOLDS_SAMPLE_RATE / 5000 households
2025

2126
# household with mandatory, non mandatory, atwork_subtours, and joint tours
2227
HH_ID = 257341
@@ -52,6 +57,7 @@ def setup_dirs(configs_dir, data_dir=None):
5257
tracing.delete_output_files('csv')
5358
tracing.delete_output_files('txt')
5459
tracing.delete_output_files('yaml')
60+
tracing.delete_output_files('omx')
5561

5662

5763
def teardown_function(func):
@@ -433,6 +439,18 @@ def regress():
433439
# should be at least two tours per trip
434440
assert trips_df.shape[0] >= 2*tours_df.shape[0]
435441

442+
# write_trip_matrices
443+
trip_matrices_file = config.output_file_path('trips_md.omx')
444+
assert os.path.exists(trip_matrices_file)
445+
trip_matrices = omx.open_file(trip_matrices_file)
446+
assert trip_matrices.shape() == (25, 25)
447+
448+
assert 'WALK_MD' in trip_matrices.list_matrices()
449+
walk_trips = np.array(trip_matrices['WALK_MD'])
450+
assert walk_trips.dtype == np.dtype('float64')
451+
452+
trip_matrices.close()
453+
436454

437455
def test_full_run1():
438456

@@ -517,6 +535,7 @@ def test_full_run5_singleton():
517535

518536
if __name__ == "__main__":
519537

538+
from activitysim import abm # register injectables
520539
print("running test_full_run1")
521540
test_full_run1()
522541
# teardown_function(None)

activitysim/examples/example_mtc/configs/settings.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ models:
8686
- trip_mode_choice
8787
- write_data_dictionary
8888
- track_skim_usage
89+
- write_trip_matrices
8990
- write_tables
9091

9192
# to resume after last successful checkpoint, specify resume_after: _

0 commit comments

Comments
 (0)