Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make CEMS extraction handle new listed year_quarter partitions #3187

Merged
merged 30 commits into from
Jan 8, 2024
Merged
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4fb33e2
WIP: add list partitions to _matches
e-belfer Dec 22, 2023
a4cbf89
Fix csv file name
e-belfer Dec 22, 2023
aaecdfa
revert fast etl settings
e-belfer Dec 22, 2023
ee9b3bf
update the 860m doi
cmgosnell Dec 22, 2023
61bc757
Fix docs build
e-belfer Dec 26, 2023
542ce85
Merge pull request #3189 from catalyst-cooperative/eia860m-extraction
e-belfer Dec 26, 2023
cf0454d
Merge branch 'dev' into cems-extraction
e-belfer Dec 26, 2023
d26b4aa
Update to non-broken CEMS archive
e-belfer Dec 26, 2023
0b08162
Try adding datastore to CI
e-belfer Dec 29, 2023
e1215fe
Update docker to point at actually right year
e-belfer Dec 29, 2023
4f50183
Actually fix in GH action
e-belfer Dec 29, 2023
65af95d
Move pudl_datastore call
e-belfer Dec 29, 2023
2cbd7dd
Fix typo
e-belfer Dec 29, 2023
5587b2e
Fix partition option
e-belfer Dec 29, 2023
36752c3
Merge branch 'dev' into cems-extraction
e-belfer Jan 2, 2024
19dfb7b
Add so many logs to ID CI failure
e-belfer Jan 3, 2024
b8782cf
Add gcs cache to gh workflow
e-belfer Jan 3, 2024
71f548f
Merge branch 'dev' into cems-extraction
e-belfer Jan 3, 2024
35211db
fix gcs flag
e-belfer Jan 3, 2024
cc0ebc9
Remove gcs cache from GHA
e-belfer Jan 4, 2024
e2c77bc
Add even more logs
e-belfer Jan 4, 2024
28d50df
Switch debug logs to info
e-belfer Jan 4, 2024
36b823d
Add dtypes on readin
e-belfer Jan 4, 2024
5c01dd4
Try to reduce memory usage when reading EPACEMS CSVs.
zaneselvans Jan 5, 2024
f3833c3
Merge branch 'main' into cems-extraction
zaneselvans Jan 5, 2024
fdffa26
Reduce record linkage test threshold to 80%
zaneselvans Jan 5, 2024
69d40d2
Merge branch 'main' into cems-extraction
zaneselvans Jan 5, 2024
b472667
Clean up logging statements
e-belfer Jan 8, 2024
efb8ac3
Merge branch 'main' into cems-extraction
e-belfer Jan 8, 2024
cf8e64c
Merge branch 'main' into cems-extraction
e-belfer Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 23 additions & 17 deletions src/pudl/extract/epacems.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
API_DTYPE_DICT = {
"State": pd.CategoricalDtype(),
"Facility Name": pd.StringDtype(), # Not reading from CSV
"Facility ID": pd.Int16Dtype(), # unique facility id for internal EPA database management (ORIS code)
"Facility ID": pd.Int32Dtype(), # unique facility id for internal EPA database management (ORIS code)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

16 bit integers aren't actually large enough to hold these IDs, so some of them were wrapping around and becoming negative numbers.

"Unit ID": pd.StringDtype(),
"Associated Stacks": pd.StringDtype(),
# These op_date, op_hour, and op_time variables get converted to
Expand All @@ -109,21 +109,21 @@
"Date": pd.StringDtype(),
"Hour": pd.Int16Dtype(),
"Operating Time": pd.Float32Dtype(),
"Gross Load (MW)": pd.Float64Dtype(),
"Steam Load (1000 lb/hr)": pd.Float64Dtype(),
"SO2 Mass (lbs)": pd.Float64Dtype(),
"Gross Load (MW)": pd.Float32Dtype(),
"Steam Load (1000 lb/hr)": pd.Float32Dtype(),
"SO2 Mass (lbs)": pd.Float32Dtype(),
Comment on lines +112 to +114
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if switching these to 32bit floats was necessary.

"SO2 Mass Measure Indicator": pd.CategoricalDtype(),
"SO2 Rate (lbs/mmBtu)": pd.Float64Dtype(), # Not reading from CSV
"SO2 Rate (lbs/mmBtu)": pd.Float32Dtype(), # Not reading from CSV
"SO2 Rate Measure Indicator": pd.CategoricalDtype(), # Not reading from CSV
"NOx Rate (lbs/mmBtu)": pd.Float64Dtype(), # Not reading from CSV
"NOx Rate (lbs/mmBtu)": pd.Float32Dtype(), # Not reading from CSV
"NOx Rate Measure Indicator": pd.CategoricalDtype(), # Not reading from CSV
"NOx Mass (lbs)": pd.Float64Dtype(),
"NOx Mass (lbs)": pd.Float32Dtype(),
"NOx Mass Measure Indicator": pd.CategoricalDtype(),
"CO2 Mass (short tons)": pd.Float64Dtype(),
"CO2 Mass (short tons)": pd.Float32Dtype(),
"CO2 Mass Measure Indicator": pd.CategoricalDtype(),
"CO2 Rate (short tons/mmBtu)": pd.Float64Dtype(), # Not reading from CSV
"CO2 Rate (short tons/mmBtu)": pd.Float32Dtype(), # Not reading from CSV
"CO2 Rate Measure Indicator": pd.CategoricalDtype(), # Not reading from CSV
"Heat Input (mmBtu)": pd.Float64Dtype(),
"Heat Input (mmBtu)": pd.Float32Dtype(),
"Heat Input Measure Indicator": pd.CategoricalDtype(),
"Primary Fuel Type": pd.CategoricalDtype(),
"Secondary Fuel Type": pd.CategoricalDtype(),
Expand Down Expand Up @@ -191,26 +191,32 @@ def get_data_frame(self, partition: EpaCemsPartition) -> pd.DataFrame:

def _csv_to_dataframe(
self,
csv_file: Path,
csv_path: Path,
ignore_cols: dict[str, str],
rename_dict: dict[str, str],
dtype_dict: dict[str, type],
chunksize: int = 100_000,
) -> pd.DataFrame:
"""Convert a CEMS csv file into a :class:`pandas.DataFrame`.

Args:
csv (file-like object): data to be read
csv_path: Path to CSV file containing data to read.

Returns:
A DataFrame containing the contents of the CSV file.
A DataFrame containing the filtered and dtyped contents of the CSV file.
"""
return pd.read_csv(
csv_file,
chunk_iter = pd.read_csv(
csv_path,
index_col=False,
usecols=lambda col: col not in ignore_cols,
dtype=dtype_dict,
low_memory=False,
).rename(columns=rename_dict)
chunksize=chunksize,
low_memory=True,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know what it does on the inside, but low_memory=True tries to be more memory efficient, and chunksize reads in batches of records and processes them one at a time, returning an iterator of dataframes (one per chunk) rather than a single dataframe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Internally process the file in chunks, resulting in lower memory use while parsing, but possibly mixed type inference. To ensure no mixed types either set False, or specify the type with the dtype parameter. Note that the entire file is read into a single DataFrame regardless, use the chunksize or iterator parameter to return the data in chunks. (Only valid with C parser)."

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If all of the CategoricalDtype() columns are well-behaved and we actually do know what the values will be ahead of time, probably the right thing to do in here is define the categories ahead of time, and then this chunking will use low-memory dtypes that can also be concatenated without being objectified.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would make sense, but to be consistent with how we're handling other datasets I'd probably want to map all the column dtypes in fields.py and codes.py, as there are currently no column by column type enforcements on EPA CEMS data in the same way that we have for EIA/FERC data. This seems out of scope of this current PR and I'm tempted to move it into a separate issue and handle it there.

parse_dates=["Date"],
)
df = pd.concat(chunk_iter)
dtypes = {k: v for k, v in dtype_dict.items() if k in df.columns}
return df.astype(dtypes).rename(columns=rename_dict)
Comment on lines +213 to +215
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This apparently worked, but I'm not sure why it worked. I didn't expect it to for several reasons:

  • The CSV is in a zipfile, and we're asking pandas to read directly from the zipfile. Does that mean the entire zipfile has to be decompressed before you can get at any of the data? Or can you just read the first 100,000 rows of a zipfile (seems unlikely).
  • When you concatenate together dataframes with automatically generated categorical columns, the categoricals become objects, because each dataframe is using a different dictionary mapping for the categories internally, so there's no peak memory savings here from using categorical columns. You could squeeze that savings out (and I thought we would have to) by explicitly stating the categories in the CategoricalDtype() ensuring that all the dataframes have the same categories, or by iteratively concatenating the many per-chunk dataframes together one at a time, and explicitly converting the categorical in both the new and already concatenated dataframes dynamically to match, using union categoricals



def extract(year_quarter: str, ds: Datastore) -> pd.DataFrame:
Expand Down
Loading