|
4 | 4 |
|
5 | 5 | from __future__ import annotations |
6 | 6 |
|
7 | | -from collections.abc import Mapping |
| 7 | +from collections.abc import Collection, Mapping |
8 | 8 | from typing import TYPE_CHECKING, Any, Callable, TypeVar |
9 | 9 |
|
10 | 10 | import numpy as np |
@@ -482,7 +482,7 @@ def update_levels( |
482 | 482 | ini = ini.remove_unused_levels() # type: ignore |
483 | 483 |
|
484 | 484 | levels: list[pd.Index[Any]] = list(ini.levels) |
485 | | - codes: list[list[int] | npt.NDArray[np.integer[Any]]] = list(ini.codes) |
| 485 | + codes: list[npt.NDArray[np.integer[Any]]] = list(ini.codes) |
486 | 486 |
|
487 | 487 | for level, updater in updates.items(): |
488 | 488 | if level not in ini.names: |
@@ -685,7 +685,7 @@ def update_levels_from_other( |
685 | 685 | ini = ini.remove_unused_levels() # type: ignore |
686 | 686 |
|
687 | 687 | levels: list[pd.Index[Any]] = list(ini.levels) |
688 | | - codes: list[list[int] | npt.NDArray[np.integer[Any]]] = list(ini.codes) |
| 688 | + codes: list[npt.NDArray[np.integer[Any]]] = list(ini.codes) |
689 | 689 | names: list[str] = list(ini.names) |
690 | 690 |
|
691 | 691 | for level, (source, updater) in update_sources.items(): |
@@ -714,3 +714,186 @@ def update_levels_from_other( |
714 | 714 | res = pd.MultiIndex(levels=levels, codes=codes, names=names) |
715 | 715 |
|
716 | 716 | return res |
| 717 | + |
| 718 | + |
| 719 | +def create_level_from_collection( |
| 720 | + level: str, value: Collection[Any] |
| 721 | +) -> tuple[pandas.Index[Any], npt.NDArray[np.integer[Any]]]: |
| 722 | + """ |
| 723 | + Create new level and corresponding codes. |
| 724 | +
|
| 725 | + Parameters |
| 726 | + ---------- |
| 727 | + level |
| 728 | + Name of the level to create |
| 729 | +
|
| 730 | + value |
| 731 | + Values to use to create the level |
| 732 | +
|
| 733 | + Returns |
| 734 | + ------- |
| 735 | + : |
| 736 | + New level and corresponding codes |
| 737 | + """ |
| 738 | + new_level: pandas.Index[Any] = pd.Index(value, name=level) |
| 739 | + if not new_level.has_duplicates: |
| 740 | + # Fast route, can just return new level and codes from level we mapped from |
| 741 | + return new_level, np.arange(len(value)) |
| 742 | + |
| 743 | + # Slow route, have to update the codes |
| 744 | + new_level = new_level.unique() |
| 745 | + new_codes = new_level.get_indexer(value) # type: ignore |
| 746 | + |
| 747 | + return new_level, new_codes |
| 748 | + |
| 749 | + |
| 750 | +def set_levels( |
| 751 | + ini: pd.MultiIndex, levels_to_set: dict[str, Any | Collection[Any]] |
| 752 | +) -> pd.MultiIndex: |
| 753 | + """ |
| 754 | + Set the levels of a MultiIndex to the provided values |
| 755 | +
|
| 756 | + Parameters |
| 757 | + ---------- |
| 758 | + ini |
| 759 | + Input MultiIndex |
| 760 | +
|
| 761 | + levels_to_set |
| 762 | + Mapping of level names to values to set. If values is of type `Collection`, |
| 763 | + it must be of the same length as the MultiIndex. If it is not a `Collection`, |
| 764 | + it will be set to the same value for all levels. |
| 765 | +
|
| 766 | + Returns |
| 767 | + ------- |
| 768 | + : |
| 769 | + New MultiIndex with the levels set to the provided values |
| 770 | +
|
| 771 | + Raises |
| 772 | + ------ |
| 773 | + TypeError |
| 774 | + If `ini` is not a MultiIndex |
| 775 | + ValueError |
| 776 | + If the length of the values is a collection that is not equal to the |
| 777 | + length of the index |
| 778 | +
|
| 779 | + Examples |
| 780 | + -------- |
| 781 | + >>> start = pd.MultiIndex.from_tuples( |
| 782 | + ... [ |
| 783 | + ... ("sa", "ma", "v1", "kg"), |
| 784 | + ... ("sb", "ma", "v2", "m"), |
| 785 | + ... ("sa", "mb", "v1", "kg"), |
| 786 | + ... ("sa", "mb", "v2", "m"), |
| 787 | + ... ], |
| 788 | + ... names=["scenario", "model", "variable", "unit"], |
| 789 | + ... ) |
| 790 | + >>> start |
| 791 | + MultiIndex([('sa', 'ma', 'v1', 'kg'), |
| 792 | + ('sb', 'ma', 'v2', 'm'), |
| 793 | + ('sa', 'mb', 'v1', 'kg'), |
| 794 | + ('sa', 'mb', 'v2', 'm')], |
| 795 | + names=['scenario', 'model', 'variable', 'unit']) |
| 796 | + >>> |
| 797 | + >>> # Set a new level with a single string |
| 798 | + >>> set_levels( |
| 799 | + ... start, |
| 800 | + ... {"new_variable": "xyz"}, |
| 801 | + ... ) |
| 802 | + MultiIndex([('sa', 'ma', 'v1', 'kg', 'xyz'), |
| 803 | + ('sb', 'ma', 'v2', 'm', 'xyz'), |
| 804 | + ('sa', 'mb', 'v1', 'kg', 'xyz'), |
| 805 | + ('sa', 'mb', 'v2', 'm', 'xyz')], |
| 806 | + names=['scenario', 'model', 'variable', 'unit', 'new_variable']) |
| 807 | + >>> |
| 808 | + >>> # Replace a level with a collection |
| 809 | + >>> set_levels( |
| 810 | + ... start, |
| 811 | + ... {"new_variable": [1, 2, 3, 4]}, |
| 812 | + ... ) |
| 813 | + MultiIndex([('sa', 'ma', 'v1', 'kg', 1), |
| 814 | + ('sb', 'ma', 'v2', 'm', 2), |
| 815 | + ('sa', 'mb', 'v1', 'kg', 3), |
| 816 | + ('sa', 'mb', 'v2', 'm', 4)], |
| 817 | + names=['scenario', 'model', 'variable', 'unit', 'new_variable']) |
| 818 | + >>> |
| 819 | + >>> # Replace a level with a single value and add a new level |
| 820 | + >>> set_levels( |
| 821 | + ... start, |
| 822 | + ... {"model": "new_model", "new_variable": ["xyz", "xyz", "x", "y"]}, |
| 823 | + ... ) |
| 824 | + MultiIndex([('sa', 'new_model', 'v1', 'kg', 'xyz'), |
| 825 | + ('sb', 'new_model', 'v2', 'm', 'xyz'), |
| 826 | + ('sa', 'new_model', 'v1', 'kg', 'x'), |
| 827 | + ('sa', 'new_model', 'v2', 'm', 'y')], |
| 828 | + names=['scenario', 'model', 'variable', 'unit', 'new_variable']) |
| 829 | + """ |
| 830 | + levels: list[pd.Index[Any]] = list(ini.levels) |
| 831 | + codes: list[npt.NDArray[np.integer[Any]]] = list(ini.codes) |
| 832 | + names: list[str] = list(ini.names) |
| 833 | + |
| 834 | + for level, value in levels_to_set.items(): |
| 835 | + if isinstance(value, Collection) and not isinstance(value, str): |
| 836 | + if len(value) != len(ini): |
| 837 | + msg = ( |
| 838 | + f"Length of values for level '{level}' does not " |
| 839 | + f"match index length: {len(value)} != {len(ini)}" |
| 840 | + ) |
| 841 | + raise ValueError(msg) |
| 842 | + new_level, new_codes = create_level_from_collection(level, value) |
| 843 | + else: |
| 844 | + new_level = pd.Index([value], name=level) |
| 845 | + new_codes = np.zeros(ini.shape[0], dtype=int) |
| 846 | + |
| 847 | + if level in ini.names: |
| 848 | + level_idx = ini.names.index(level) |
| 849 | + levels[level_idx] = new_level |
| 850 | + codes[level_idx] = new_codes |
| 851 | + else: |
| 852 | + levels.append(new_level) |
| 853 | + codes.append(new_codes) |
| 854 | + names.append(level) |
| 855 | + |
| 856 | + res = pd.MultiIndex(levels=levels, codes=codes, names=names) |
| 857 | + |
| 858 | + return res |
| 859 | + |
| 860 | + |
| 861 | +def set_index_levels_func( |
| 862 | + df: pd.DataFrame, |
| 863 | + levels_to_set: dict[str, Any | Collection[Any]], |
| 864 | + copy: bool = True, |
| 865 | +) -> pd.DataFrame: |
| 866 | + """ |
| 867 | + Set the index levels of a [pd.DataFrame][pandas.DataFrame] |
| 868 | +
|
| 869 | + Parameters |
| 870 | + ---------- |
| 871 | + df |
| 872 | + [pd.DataFrame][pandas.DataFrame] to update |
| 873 | +
|
| 874 | + levels_to_set |
| 875 | + Mapping of level names to values to set |
| 876 | +
|
| 877 | + copy |
| 878 | + Should `df` be copied before returning? |
| 879 | +
|
| 880 | +
|
| 881 | + Returns |
| 882 | + ------- |
| 883 | + : |
| 884 | + `df` with updates applied to its index |
| 885 | + """ |
| 886 | + if not isinstance(df.index, pd.MultiIndex): |
| 887 | + msg = ( |
| 888 | + "This function is only intended to be used " |
| 889 | + "when `df`'s index is an instance of `MultiIndex`. " |
| 890 | + f"Received {type(df.index)=}" |
| 891 | + ) |
| 892 | + raise TypeError(msg) |
| 893 | + |
| 894 | + if copy: |
| 895 | + df = df.copy() |
| 896 | + |
| 897 | + df.index = set_levels(df.index, levels_to_set=levels_to_set) # type: ignore |
| 898 | + |
| 899 | + return df |
0 commit comments