Skip to content

hourly_output_curves_pack

hourly_output_curves_pack

Hourly output curves packing utilities.

Classes

HourlyOutputCurvesPack

Bases: Packable

A packable for managing hourly output curves across scenarios.

HourlyOutputCurvesPack handles the extraction and export of hourly output curves from scenarios.

The class supports carrier-based organization of hourly output curves, allowing users to export specific carriers or all available carriers.

Functions
validate_curve_config classmethod
validate_curve_config(config_values)

Validate hourly curves configuration (carriers or curve names).

Users can specify either: - Carrier types: electricity, heat, hydrogen, methane - Specific curve names: merit_order, electricity_price, etc.

Parameters:

Name Type Description Default
config_values List[str]

List of carrier names and/or curve names to validate

required

Returns:

Type Description
List[str]

Tuple of (valid_values, warnings)

List[str]
  • valid_values: List of validated entries (both carriers and curve names)
Tuple[List[str], List[str]]
  • warnings: List of warning messages for invalid entries
Source code in src/pyetm/models/packables/hourly_output_curves_pack.py
@classmethod
def validate_curve_config(cls, config_values: List[str]) -> Tuple[List[str], List[str]]:
    """
    Validate hourly curves configuration (carriers or curve names).

    Users can specify either:
    - Carrier types: electricity, heat, hydrogen, methane
    - Specific curve names: merit_order, electricity_price, etc.

    Args:
        config_values: List of carrier names and/or curve names to validate

    Returns:
        Tuple of (valid_values, warnings)
        - valid_values: List of validated entries (both carriers and curve names)
        - warnings: List of warning messages for invalid entries
    """
    if not config_values:
        return [], []

    # Get valid carriers from the carrier mappings
    carrier_map = HourlyOutputCurves._load_carrier_mappings()
    valid_carriers = set(carrier_map.keys())

    # Get valid curve names from the runner
    from pyetm.services.scenario_runners.fetch_hourly_output_curves import (
        FetchAllHourlyOutputCurvesRunner,
    )
    valid_curve_names = set(FetchAllHourlyOutputCurvesRunner.CURVE_TYPES)

    # Validate each entry
    valid_values = []
    warnings = []

    for value in config_values:
        value_str = str(value).strip()
        if not value_str:
            continue

        # Check if it's a valid carrier OR a valid curve name
        if value_str in valid_carriers or value_str in valid_curve_names:
            valid_values.append(value_str)
        else:
            # Invalid entry - create helpful warning message
            warning = (
                f"Invalid hourly curve entry '{value_str}'. "
                f"Valid carriers: {', '.join(sorted(valid_carriers))}. "
                f"Valid curve names: {', '.join(sorted(valid_curve_names))}."
            )
            warnings.append(warning)

    return valid_values, warnings
to_dataframe
to_dataframe(columns='', curves=None, **kwargs)

Build a DataFrame with optional curve filtering.

Source code in src/pyetm/models/packables/hourly_output_curves_pack.py
def to_dataframe(
    self, columns: str = "", curves: Optional[Sequence[str]] = None, **kwargs: Any
) -> pd.DataFrame:
    """
    Build a DataFrame with optional curve filtering.
    """
    if len(self.scenarios) == 0:
        return pd.DataFrame()

    df = self._to_dataframe(columns=columns, curves=curves, **kwargs)

    if df.empty:
        return df

    # Filter curves if specified
    if curves is not None and df.columns.nlevels >= 2:
        # Get available curve types from the MultiIndex
        available_curves = df.columns.get_level_values(1).unique()
        # Filter to only requested curves that exist
        valid_curves = [c for c in curves if c in available_curves]
        if valid_curves:
            # Select columns where level 1 matches
            mask = df.columns.get_level_values(1).isin(valid_curves)
            df = df.loc[:, mask]
        else:
            # No valid curves found, return empty
            return pd.DataFrame()

    return df
build_pack_dataframe
build_pack_dataframe(columns='', curves=None, **kwargs)

Override base build_pack_dataframe to pass curves filter to _build_dataframe_for_scenario.

Source code in src/pyetm/models/packables/hourly_output_curves_pack.py
def build_pack_dataframe(
    self, columns: str = "", curves: Optional[Sequence[str]] = None, **kwargs: Any
) -> pd.DataFrame:
    """
    Override base build_pack_dataframe to pass curves filter to _build_dataframe_for_scenario.
    """
    frames: list[pd.DataFrame] = []
    keys: list[Any] = []
    for scenario in self.scenarios:
        try:
            df = self._build_dataframe_for_scenario(
                scenario, columns=columns, curves=curves, **kwargs
            )
            self.log_scenario_warnings(scenario, self.key, self.sheet_name)
        except Exception as e:
            logger.warning(
                "Failed building frame for scenario %s in %s: %s",
                scenario.identifier(),
                self.__class__.__name__,
                e,
            )
            continue
        if df is None or df.empty:
            continue
        frames.append(df)
        keys.append(self._key_for(scenario))
    return self._concat_frames(frames, keys)
to_dict_per_curve
to_dict_per_curve(curves=None)

Build a dict organized by curve type, then by scenario.

Returns:

Type Description
dict[str, dict[str, DataFrame]]

Dictionary with structure: dict[curve_name][scenario_id] = DataFrame,

dict[str, dict[str, DataFrame]]

where curve_name is the curve type and scenario_id is the scenario identifier.

Source code in src/pyetm/models/packables/hourly_output_curves_pack.py
def to_dict_per_curve(
    self, curves: Optional[Sequence[str]] = None
) -> dict[str, dict[str, pd.DataFrame]]:
    """
    Build a dict organized by curve type, then by scenario.

    Returns:
        Dictionary with structure: dict\\[curve_name\\]\\[scenario_id\\] = DataFrame,
        where curve_name is the curve type and scenario_id is the scenario identifier.
    """
    result: dict[str, dict[str, pd.DataFrame]] = {}

    for scenario in self.scenarios:
        try:
            scenario_key = self._key_for(scenario)

            # Fetch the requested curves from the API
            # If curves is specified, fetch only those curves
            # If curves is None, attempt to fetch all curves in the collection
            if curves is not None:
                curves_to_fetch = curves
            else:
                # Try to get all attached curves, but handle case where this isn't available
                try:
                    attached = scenario.hourly_output_curves.attached_keys()
                    # Validate it's iterable
                    curves_to_fetch = list(attached) if attached else []
                except (AttributeError, TypeError):
                    # If attached_keys() doesn't exist or fails, skip fetching
                    curves_to_fetch = []

            # Fetch each curve (will cache for later use by to_dataframe)
            if hasattr(scenario, "get_hourly_curve") and curves_to_fetch:
                try:
                    for curve_name in curves_to_fetch:
                        try:
                            scenario.get_hourly_curve(curve_name)
                        except Exception as e:
                            logger.warning(
                                "Failed to fetch curve %s for scenario %s: %s",
                                curve_name,
                                scenario.identifier(),
                                e,
                            )
                except (TypeError, AttributeError):
                    # Handle case where curves_to_fetch isn't iterable
                    pass

            # Get multi-index dataframe from model
            df = scenario.hourly_output_curves.to_dataframe(curves=curves)

            if df.empty:
                continue

            # Unstack the multi-index to separate by curve_name
            for curve_name in df.index.get_level_values("curve_name").unique():
                # Extract data for this specific curve
                curve_df_raw = df.xs(curve_name, level="curve_name")
                # Ensure it's a DataFrame, not a Series
                curve_df = curve_df_raw if isinstance(curve_df_raw, pd.DataFrame) else curve_df_raw.to_frame()

                if curve_name not in result:
                    result[curve_name] = {}
                result[curve_name][scenario_key] = curve_df

            self.log_scenario_warnings(scenario, self.key, self.sheet_name)

        except Exception as e:
            logger.warning(
                "Failed building curves dict for scenario %s: %s",
                scenario.identifier(),
                e,
            )
            continue

    return result
to_excel_per_carrier
to_excel_per_carrier(path, carriers=None)

Export hourly output curves to Excel file organized by carrier.

Source code in src/pyetm/models/packables/hourly_output_curves_pack.py
def to_excel_per_carrier(self, path: str, carriers: Optional[Sequence[str]] = None) -> None:
    """Export hourly output curves to Excel file organized by carrier."""

    # Determine carrier selection
    carrier_map = HourlyOutputCurves._load_carrier_mappings()
    valid_carriers = list(carrier_map.keys())
    selected = list(valid_carriers if carriers is None else carriers)
    selected = [c for c in selected if c in valid_carriers]
    if not selected:
        selected = valid_carriers

    # Nothing to do without scenarios
    if not self.scenarios:
        return

    wrote_any = False
    workbook = None
    try:
        # Sort scenarios for deterministic sheet layout
        scenarios_sorted = sorted(self.scenarios, key=lambda s: s.id)

        for carrier in selected:
            series_entries: list[Tuple[Tuple[str, str], Series[Any]]] = []

            for scenario in scenarios_sorted:
                # Scenario label
                try:
                    scenario_name = str(scenario.identifier())
                except Exception:
                    scenario_name = str(getattr(scenario, "id", "scenario"))

                # Fetch curves by carrier type using the model's method
                curves = None
                try:
                    curves = scenario.hourly_output_curves.get_curves(
                        [carrier], scenario
                    )
                except Exception as e:
                    logger.warning(
                        f"Failed to fetch {carrier} curves for scenario '{scenario_name}': {e}"
                    )
                    curves = None
                if not isinstance(curves, dict) or not curves:
                    continue

                for curve_name, df in curves.items():
                    if df is None:
                        continue
                    try:
                        if isinstance(df, pd.Series):
                            s = df.copy()
                            series_entries.append(((scenario_name, curve_name), s))
                        elif isinstance(df, pd.DataFrame):
                            if df.empty:
                                continue
                            if df.shape[1] == 1:
                                s = df.iloc[:, 0].copy()
                                series_entries.append(((scenario_name, curve_name), s))
                            else:
                                for col in df.columns:
                                    s = df[col].copy()
                                    sub_curve = f"{curve_name}:{col}"
                                    series_entries.append(((scenario_name, sub_curve), s))
                    except Exception:
                        continue

            if not series_entries:
                continue

            cols: list[Tuple[str, str]] = [key for key, _ in series_entries]
            frames = [s for _, s in series_entries]
            combined = pd.concat(frames, axis=1)
            combined.columns = pd.MultiIndex.from_tuples(cols, names=["Scenario", "Curve"])

            # Lazily create the workbook on first real data
            if workbook is None:
                workbook = Workbook(str(path))
            excel_utils.add_frame(
                name=carrier.upper(),
                frame=combined,
                workbook=workbook,
                column_width=18,
                scenario_styling=True,
            )
            wrote_any = True
    finally:
        if workbook is not None and wrote_any:
            workbook.close()