Skip to content

hourly_output_curves

hourly_output_curves

Hourly output curve data models.

Classes

HourlyOutputCurveError

Bases: Exception

Base hourly output curve error

HourlyOutputCurve

HourlyOutputCurve(**data)

Bases: Base

Wrapper around a single hourly output curve (export in the front end). Curves are getting saved to the filesystem, as bulk processing of scenarios could end up with several 100 MBs of curves, which we don't want to keep in memory.

Source code in src/pyetm/models/base.py
def __init__(self, **data: Any) -> None:
    """
    Initialize the model, converting validation errors to warnings.
    """
    super(BaseModel, self).__setattr__("__pydantic_private__", {})

    # Initialize all private attributes with their defaults
    private_dict: Dict[str, Any] = self.__pydantic_private__  # type: ignore[assignment]
    for attr_name, attr_info in self.__class__.__private_attributes__.items():
        if (
            hasattr(attr_info, "default_factory")
            and attr_info.default_factory is not None
        ):
            # Call factory - signature varies between pydantic versions
            private_dict[attr_name] = attr_info.default_factory()
        elif hasattr(attr_info, "default"):
            private_dict[attr_name] = attr_info.default
        else:
            private_dict[attr_name] = None

    try:
        super().__init__(**data)
    except ValidationError as e:
        # Check if data is None or empty - this indicates API error
        if not data or data is None:
            # Re-raise with clearer message
            raise ValueError(
                f"Cannot create {self.__class__.__name__} with empty data. "
                "This usually indicates an authentication or API error."
            ) from e

        # If validation fails, create model without validation and collect warnings
        # Use model_construct to bypass validation
        temp_instance = self.__class__.model_construct(**data)

        # Copy the constructed data to this instance
        for field_name, field_value in temp_instance.__dict__.items():
            if not field_name.startswith("_"):
                object.__setattr__(self, field_name, field_value)

        # Ensure required Pydantic slot attributes exist to prevent AttributeError
        for slot in ("__pydantic_fields_set__", "__pydantic_extra__"):
            try:
                value = object.__getattribute__(temp_instance, slot)
                object.__setattr__(self, slot, value)
            except AttributeError:
                # Initialize missing slot attributes with defaults
                if slot == "__pydantic_extra__":
                    object.__setattr__(self, slot, {})
                elif slot == "__pydantic_fields_set__":
                    object.__setattr__(self, slot, set())

        # Convert validation errors to warnings
        for error in e.errors():
            field_path = ".".join(str(part) for part in error.get("loc", []))
            message = error.get("msg", "Validation failed")
            self._warning_collector.add(field_path, message, "error")
Functions
retrieve
retrieve(client, scenario, force_refresh=False)

Process curve from client, save to file, set file_path

Source code in src/pyetm/models/hourly_output_curves.py
def retrieve(
    self, client: Any, scenario: Any, force_refresh: bool = False
) -> Optional[pd.DataFrame]:
    """Process curve from client, save to file, set file_path"""
    # Normalize to Session to get ETEngine session ID
    session = self._normalize_to_session(scenario)

    file_path = (
        get_settings().path_to_tmp(str(session.id))
        / f"{self.key.replace('/', '-')}.csv"
    )

    # Reuse a cached file if present unless explicitly refreshing.
    if not force_refresh and file_path.is_file():
        self.file_path = file_path
        if self.file_path is not None:
            try:
                return _read_csv_cached(self.file_path)
            except Exception as e:
                # Fall through to re-download on cache read failure
                self.add_warning(
                    "file_path",
                    f"Failed to read cached curve file for {self.key}: {e}; refetching",
                )
    try:
        result = DownloadHourlyOutputCurveRunner.run(client, session, self.key)
        if result.success and result.data is not None:
            try:
                result.data.seek(0)
                df = pd.read_csv(result.data, index_col=0)
                df_clean = df.dropna(how="all")
                df_clean.index = pd.RangeIndex(len(df_clean))
                # Create parent directory before saving
                file_path.parent.mkdir(parents=True, exist_ok=True)
                df_clean.to_csv(file_path, index=True)
                # Only set file_path after successful save
                self.file_path = file_path
                return df_clean

            except Exception as e:
                error_msg = f"Failed to process curve data for {self.key}: {e}"
                self.add_warning("data", error_msg)
                logger.exception(error_msg)
                return None
        else:
            # API call failed or returned no data
            errors = (
                result.errors if hasattr(result, "errors") else ["Unknown error"]
            )
            error_msg = f"Failed to download curve {self.key}: {errors}"
            self.add_warning("download", error_msg)
            logger.error(error_msg)
            return None

    except Exception as e:
        error_msg = f"Unexpected error retrieving curve {self.key}: {e}"
        self.add_warning("base", error_msg)
        logger.exception(error_msg)
        return None
contents
contents()

Open file from path and return contents

Source code in src/pyetm/models/hourly_output_curves.py
def contents(self) -> Optional[pd.DataFrame]:
    """Open file from path and return contents"""
    if not self.available():
        self.add_warning(
            "file_path", f"Curve {self.key} not available - no file path set"
        )
        return None

    try:
        if self.file_path is None:
            self.add_warning("file_path", f"Curve {self.key} has no file path set")
            return None
        return _read_csv_cached(self.file_path)
    except Exception as e:
        self.add_warning(
            "file_path", f"Failed to read curve file for {self.key}: {e}"
        )
        return None
remove
remove()

Remove file and clear path

Source code in src/pyetm/models/hourly_output_curves.py
def remove(self) -> bool:
    """Remove file and clear path"""
    if not self.available() or self.file_path is None:
        return True

    try:
        self.file_path.unlink(missing_ok=True)
        self.file_path = None
        _read_csv_cached_impl.cache_clear()
        return True
    except Exception as e:
        self.add_warning(
            "file_path", f"Failed to remove curve file for {self.key}: {e}"
        )
        return False
from_json classmethod
from_json(data)

Initialize a HourlyOutputCurve from JSON data

Source code in src/pyetm/models/hourly_output_curves.py
@classmethod
def from_json(cls, data: dict[str, Any]) -> "HourlyOutputCurve":
    """
    Initialize a HourlyOutputCurve from JSON data
    """
    try:
        curve = cls.model_validate(data)
        return curve
    except Exception as e:
        # Create basic curve with warning attached
        basic_data = {
            "key": data.get("key", "unknown"),
            "type": data.get("type", "unknown"),
        }
        curve = cls.model_construct(**basic_data)
        curve.add_warning("base", f"Failed to create curve from data: {e}")
        return curve

HourlyOutputCurves

HourlyOutputCurves(**data)

Bases: Base

Collection of Hourly Output Curves.

Source code in src/pyetm/models/base.py
def __init__(self, **data: Any) -> None:
    """
    Initialize the model, converting validation errors to warnings.
    """
    super(BaseModel, self).__setattr__("__pydantic_private__", {})

    # Initialize all private attributes with their defaults
    private_dict: Dict[str, Any] = self.__pydantic_private__  # type: ignore[assignment]
    for attr_name, attr_info in self.__class__.__private_attributes__.items():
        if (
            hasattr(attr_info, "default_factory")
            and attr_info.default_factory is not None
        ):
            # Call factory - signature varies between pydantic versions
            private_dict[attr_name] = attr_info.default_factory()
        elif hasattr(attr_info, "default"):
            private_dict[attr_name] = attr_info.default
        else:
            private_dict[attr_name] = None

    try:
        super().__init__(**data)
    except ValidationError as e:
        # Check if data is None or empty - this indicates API error
        if not data or data is None:
            # Re-raise with clearer message
            raise ValueError(
                f"Cannot create {self.__class__.__name__} with empty data. "
                "This usually indicates an authentication or API error."
            ) from e

        # If validation fails, create model without validation and collect warnings
        # Use model_construct to bypass validation
        temp_instance = self.__class__.model_construct(**data)

        # Copy the constructed data to this instance
        for field_name, field_value in temp_instance.__dict__.items():
            if not field_name.startswith("_"):
                object.__setattr__(self, field_name, field_value)

        # Ensure required Pydantic slot attributes exist to prevent AttributeError
        for slot in ("__pydantic_fields_set__", "__pydantic_extra__"):
            try:
                value = object.__getattribute__(temp_instance, slot)
                object.__setattr__(self, slot, value)
            except AttributeError:
                # Initialize missing slot attributes with defaults
                if slot == "__pydantic_extra__":
                    object.__setattr__(self, slot, {})
                elif slot == "__pydantic_fields_set__":
                    object.__setattr__(self, slot, set())

        # Convert validation errors to warnings
        for error in e.errors():
            field_path = ".".join(str(part) for part in error.get("loc", []))
            message = error.get("msg", "Validation failed")
            self._warning_collector.add(field_path, message, "error")
Functions
is_attached
is_attached(curve_name)

Returns true if that curve is attached

Source code in src/pyetm/models/hourly_output_curves.py
def is_attached(self, curve_name: str) -> bool:
    """Returns true if that curve is attached"""
    return any((curve_name == key for key in self.attached_keys()))
attached_keys
attached_keys()

Returns the keys of attached curves.

Source code in src/pyetm/models/hourly_output_curves.py
def attached_keys(self) -> list[str]:
    """Returns the keys of attached curves."""
    return [curve.key for curve in self.curves]
get_curve
get_curve(identifier, scenario)

Get a single hourly output curve by name or carrier type alias.

Carrier types ('electricity', 'heat', 'hydrogen', 'methane') are treated as convenient aliases for their primary curves.

Parameters:

Name Type Description Default
identifier str

Curve name (e.g., 'merit_order') or carrier type alias

required
scenario Any

The scenario or session object

required

Returns:

Type Description
Optional[DataFrame]

DataFrame with hourly data, or None if not found

Examples:

>>> curves.get_curve("merit_order", scenario)  # by name
>>> curves.get_curve("electricity", scenario)  # by carrier alias → merit_order
Source code in src/pyetm/models/hourly_output_curves.py
def get_curve(self, identifier: str, scenario: Any) -> Optional[pd.DataFrame]:
    """
    Get a single hourly output curve by name or carrier type alias.

    Carrier types ('electricity', 'heat', 'hydrogen', 'methane') are treated
    as convenient aliases for their primary curves.

    Args:
        identifier: Curve name (e.g., 'merit_order') or carrier type alias
        scenario: The scenario or session object

    Returns:
        DataFrame with hourly data, or None if not found

    Examples:
        >>> curves.get_curve("merit_order", scenario)  # by name
        >>> curves.get_curve("electricity", scenario)  # by carrier alias → merit_order
    """
    carrier_mapping = self._load_carrier_mappings()

    # Check if identifier is a carrier type (alias)
    if identifier in carrier_mapping:
        curve_names = carrier_mapping[identifier]
        if len(curve_names) == 1:
            return self.get_contents(scenario, curve_names[0])
        elif len(curve_names) > 1:
            # Future-proof: if carriers ever map to multiple curves
            self.add_warning(
                "identifier",
                f"Carrier '{identifier}' maps to multiple curves: {curve_names}. "
                f"Use get_curves(['{identifier}']) instead.",
            )
            return None
        else:
            self.add_warning(
                "identifier",
                f"Carrier '{identifier}' has no associated curves.",
            )
            return None

    # Otherwise treat as curve name
    return self.get_contents(scenario, identifier)
get_curves
get_curves(identifiers, scenario)

Get multiple hourly output curves by names or carrier type aliases.

Parameters:

Name Type Description Default
identifiers list[str]

List of curve names and/or carrier type aliases

required
scenario Any

The scenario or session object

required

Returns:

Type Description
dict[str, DataFrame]

Dictionary mapping curve names to DataFrames

Examples:

>>> curves.get_curves(["electricity", "heat"], scenario)
{'merit_order': DataFrame, 'heat_network': DataFrame}
>>> curves.get_curves(["merit_order", "electricity_price"], scenario)
{'merit_order': DataFrame, 'electricity_price': DataFrame}
Source code in src/pyetm/models/hourly_output_curves.py
def get_curves(
    self, identifiers: list[str], scenario: Any
) -> dict[str, pd.DataFrame]:
    """
    Get multiple hourly output curves by names or carrier type aliases.

    Args:
        identifiers: List of curve names and/or carrier type aliases
        scenario: The scenario or session object

    Returns:
        Dictionary mapping curve names to DataFrames

    Examples:
        >>> curves.get_curves(["electricity", "heat"], scenario)
        {'merit_order': DataFrame, 'heat_network': DataFrame}

        >>> curves.get_curves(["merit_order", "electricity_price"], scenario)
        {'merit_order': DataFrame, 'electricity_price': DataFrame}
    """
    results = {}
    carrier_mapping = self._load_carrier_mappings()

    for identifier in identifiers:
        # If it's a carrier type, expand to curve names
        if identifier in carrier_mapping:
            for curve_name in carrier_mapping[identifier]:
                curve_data = self.get_contents(scenario, curve_name)
                if curve_data is not None:
                    results[curve_name] = curve_data
        else:
            # Otherwise treat as curve name
            curve_data = self.get_contents(scenario, identifier)
            if curve_data is not None:
                results[identifier] = curve_data

    return results
from_json classmethod
from_json(data)

Initialize HourlyOutputCurves collection from JSON data

Source code in src/pyetm/models/hourly_output_curves.py
@classmethod
def from_json(cls, data: list[dict[str, Any]]) -> "HourlyOutputCurves":
    """
    Initialize HourlyOutputCurves collection from JSON data
    """
    curves = []

    for curve_data in data:
        try:
            curve = HourlyOutputCurve.from_json(curve_data)
            curves.append(curve)
        except Exception as e:
            # Create a basic curve and continue processing
            key = curve_data.get("key", "unknown")
            basic_curve = HourlyOutputCurve.model_construct(key=key, type="unknown")
            basic_curve.add_warning(key, f"Skipped invalid curve data: {e}")
            curves.append(basic_curve)

    collection = cls.model_validate({"curves": curves})

    # Merge warnings from individual curves
    collection._merge_submodel_warnings(*curves, key_attr="key")

    return collection
from_service_result classmethod
from_service_result(service_result, scenario, cache_curves=True)

Create HourlyOutputCurves instance from service result

Source code in src/pyetm/models/hourly_output_curves.py
@classmethod
def from_service_result(
    cls, service_result: Any, scenario: Any, cache_curves: bool = True
) -> "HourlyOutputCurves":
    """Create HourlyOutputCurves instance from service result"""
    # Normalize to Session to get ETEngine session ID
    from pyetm.models.scenario import Scenario

    session = scenario.session if isinstance(scenario, Scenario) else scenario

    if not service_result.success or not service_result.data:
        empty_curves = cls(curves=[])
        for error in service_result.errors:
            empty_curves.add_warning("base", f"Service error: {error}")
        return empty_curves

    curves_list = []

    if cache_curves:
        cache_dir = get_settings().path_to_tmp(str(session.id))
        cache_dir.mkdir(parents=True, exist_ok=True)

    for curve_name, curve_data in service_result.data.items():
        try:
            curve = HourlyOutputCurve.model_validate(
                {"key": curve_name, "type": cls._infer_curve_type(curve_name)}
            )

            if cache_curves:
                curve_file = cache_dir / f"{curve_name.replace('/', '-')}.csv"
                curve_data.seek(0)
                df = pd.read_csv(curve_data, index_col=0)
                df_clean = df.dropna(how="all")
                df_clean.to_csv(curve_file, index=True)
                curve.file_path = curve_file

            curves_list.append(curve)

        except Exception as e:
            basic_curve = HourlyOutputCurve.model_construct(
                key=curve_name, type="unknown"
            )
            basic_curve.add_warning("base", f"Failed to process curve data: {e}")
            curves_list.append(basic_curve)

    curves_collection = cls(curves=curves_list)

    for error in service_result.errors:
        curves_collection.add_warning("base", f"Download warning: {error}")

    curves_collection._merge_submodel_warnings(*curves_list, key_attr="key")

    return curves_collection
fetch_all classmethod
fetch_all(scenario, cache_curves=True)

Convenience method to fetch all carrier curves for a scenario.

Source code in src/pyetm/models/hourly_output_curves.py
@classmethod
def fetch_all(
    cls, scenario: Any, cache_curves: bool = True
) -> "HourlyOutputCurves":
    """
    Convenience method to fetch all carrier curves for a scenario.
    """
    # Normalize to Session to get ETEngine session ID
    from pyetm.models.scenario import Scenario

    session = scenario.session if isinstance(scenario, Scenario) else scenario

    service_result = FetchAllHourlyOutputCurvesRunner.run(get_client(), session)
    return cls.from_service_result(service_result, session, cache_curves)
create_empty_collection classmethod
create_empty_collection()

Create a collection with all known hourly output curve types but no data. This allows is_attached() to work before data is retrieved.

Source code in src/pyetm/models/hourly_output_curves.py
@classmethod
def create_empty_collection(cls) -> "HourlyOutputCurves":
    """
    Create a collection with all known hourly output curve types but no data.
    This allows is_attached() to work before data is retrieved.
    """
    from pyetm.services.scenario_runners.fetch_hourly_output_curves import (
        FetchAllHourlyOutputCurvesRunner,
    )

    curves_list = []
    for curve_name in FetchAllHourlyOutputCurvesRunner.CURVE_TYPES:
        curve = HourlyOutputCurve.model_validate(
            {"key": curve_name, "type": cls._infer_curve_type(curve_name)}
        )
        curves_list.append(curve)

    return cls(curves=curves_list)
remove_curve
remove_curve(curve_name)

Remove a specific hourly output curve's cache file and clear LRU cache.

Parameters:

Name Type Description Default
curve_name str

Name of the curve to remove

required

Returns:

Type Description
bool

True if successfully removed, False otherwise

Source code in src/pyetm/models/hourly_output_curves.py
def remove_curve(self, curve_name: str) -> bool:
    """Remove a specific hourly output curve's cache file and clear LRU cache.

    Args:
        curve_name: Name of the curve to remove

    Returns:
        True if successfully removed, False otherwise
    """
    curve = self._find(curve_name)
    if curve is None:
        return False

    success = curve.remove()
    if success:
        _read_csv_cached_impl.cache_clear()
    return success
clear_cache
clear_cache()

Clear all hourly output curve cache files and LRU cache.

Returns:

Type Description
int

Number of files successfully removed

Source code in src/pyetm/models/hourly_output_curves.py
def clear_cache(self) -> int:
    """Clear all hourly output curve cache files and LRU cache.

    Returns:
        Number of files successfully removed
    """
    removed_count = 0
    for curve in self.curves:
        if curve.remove():
            removed_count += 1

    _read_csv_cached_impl.cache_clear()
    self.warnings.clear()
    return removed_count

Functions