Source code for breos.weather

"""
Weather data fetching and processing module.

This module handles:
- Fetching TMY (Typical Meteorological Year) data from PVGIS
- Fetching historical weather data from Open-Meteo
- Converting between hourly and 15-minute resolutions using Makima interpolation
"""

import logging
import os
import re
from datetime import timedelta
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import pvlib
from pvlib.location import Location
from scipy.interpolate import Akima1DInterpolator

from breos.utils import safe_path_slug

logger = logging.getLogger(__name__)

# Optional imports for API calls
try:
    import openmeteo_requests
    import requests_cache
    from requests.adapters import HTTPAdapter
    from urllib3.util.retry import Retry

    HAS_OPENMETEO = True
except ImportError:
    HAS_OPENMETEO = False


[docs] def parse_weather_filename(filename: str) -> Optional[Dict[str, str]]: """ Parse a weather filename following the convention: {location}_{type}_{yearstart}_{yearend}_{source}.csv Examples: porto_tmy_2005_2023_pvgis-sarah3.csv porto_historical_2005_2024_openmeteo.csv lisbon_tmy_2014_nsrdb.csv Returns: Dict with keys: location, type, year_start, year_end, source Returns None if filename doesn't match the convention. """ basename = os.path.basename(filename) if not basename.endswith(".csv"): return None name = basename[:-4] # strip .csv # Pattern: location_type_yearstart_yearend_source # Source may contain hyphens (e.g., pvgis-sarah3) match = re.match(r"^(.+)_(tmy|historical)_(\d{4})_(\d{4})_([\w-]+)$", name) if match: return { "location": match.group(1), "type": match.group(2), "year_start": match.group(3), "year_end": match.group(4), "source": match.group(5), } # Pattern without year_end: location_type_year_source (e.g., lisbon_tmy_2014_nsrdb) match = re.match(r"^(.+)_(tmy|historical)_(\d{4})_([\w-]+)$", name) if match: return { "location": match.group(1), "type": match.group(2), "year_start": match.group(3), "year_end": match.group(3), "source": match.group(4), } return None
[docs] def load_weather( location: str, data_type: Optional[str] = None, start_year: Optional[int] = None, end_year: Optional[int] = None, source: Optional[str] = None, weather_dir: str = "weather/", ) -> Optional[pd.DataFrame]: """ Smart weather loading: scan local files for matching weather data. Searches the weather directory for files matching the naming convention, filters by location/type/source, and checks date coverage. If a file covers the requested range (e.g., requesting 2008-2010 and a 2005-2024 file exists), subsets it automatically. Args: location: Location name (e.g., 'porto', 'lisbon') data_type: 'tmy' or 'historical' (None = any) start_year: Start year for date coverage check end_year: End year for date coverage check source: Data source filter (e.g., 'openmeteo', 'pvgis-sarah3') weather_dir: Directory to scan for weather files Returns: DataFrame if a matching file is found, None otherwise. """ if not os.path.isdir(weather_dir): return None candidates = [] for fname in os.listdir(weather_dir): parsed = parse_weather_filename(fname) if parsed is None: continue if parsed["location"] != location: continue if data_type is not None and parsed["type"] != data_type: continue if source is not None and parsed["source"] != source: continue parsed["filepath"] = os.path.join(weather_dir, fname) candidates.append(parsed) if not candidates: return None # If date range is specified, filter by coverage if start_year is not None and end_year is not None: covered = [] for c in candidates: file_start = int(c["year_start"]) file_end = int(c["year_end"]) if file_start <= start_year and file_end >= end_year: covered.append(c) elif c["type"] == "tmy": # TMY files don't need date coverage — they represent a typical year covered.append(c) candidates = covered if covered else candidates # Prefer the first match (could be refined with priority logic) best = candidates[0] filepath = best["filepath"] logger.info("Found local weather file: %s", filepath) df = pd.read_csv(filepath, index_col=0, parse_dates=True) # Parse datetime index if it didn't work from index_col=0 if not isinstance(df.index, pd.DatetimeIndex): # Try converting the existing index (handles timezone-aware strings) try: df.index = pd.to_datetime(df.index, utc=True) except (ValueError, TypeError): # Fall back to looking for named datetime columns df = pd.read_csv(filepath) for col_name in ["date", "time", "Datetime"]: if col_name in df.columns: df[col_name] = pd.to_datetime(df[col_name]) df.set_index(col_name, inplace=True) break # Subset by year range for historical data if best["type"] == "historical" and start_year is not None and end_year is not None: file_start = int(best["year_start"]) file_end = int(best["year_end"]) if file_start < start_year or file_end > end_year: mask = (df.index.year >= start_year) & (df.index.year <= end_year) df = df.loc[mask] logger.info("Subset to %s-%s (%d rows)", start_year, end_year, len(df)) return df
[docs] def fetch_tmy_weather_data( latitude: float, longitude: float, sample_year: Optional[int] = 2025, freq: str = "h", timezone: Optional[str] = None, save_to_file: bool = False, ) -> Tuple[pd.DataFrame, dict]: """ Fetch Typical Meteorological Year (TMY) weather data from PVGIS. Args: latitude: Latitude of the location longitude: Longitude of the location sample_year: Year to use for index (default: 2025). Set to None to keep original TMY index. freq: Frequency for output data ('h' for hourly, '15min' for 15-minute) timezone: Timezone string used to determine the location's whole-hour UTC offset (offset taken at Jan 1 of sample_year, i.e. standard time for northern-hemisphere locations). Auto-detected if None. save_to_file: Whether to save the data to CSV Returns: Tuple of (tmy_data DataFrame, metadata dict). When sample_year is set, the index is fixed-offset local time starting at local midnight of Jan 1; rows are rolled (not relabeled) so each timestamp remains the correct UTC instant for its irradiance values. Raises: ValueError: If sample_year is a leap year (TMY has 8760 hours) """ roll_utc_offset = None if sample_year is not None: # Check for leap year if sample_year % 4 == 0 and (sample_year % 100 != 0 or sample_year % 400 == 0): raise ValueError(f"Sample year {sample_year} is a leap year. TMY has 8760 hours. Use non-leap year.") # Auto-detect timezone if not provided if timezone is None: from timezonefinder import TimezoneFinder tf = TimezoneFinder() timezone = tf.timezone_at(lat=latitude, lng=longitude) utc_offset = pd.Timestamp(f"{sample_year}-01-01", tz=timezone).utcoffset() roll_utc_offset = round(utc_offset.total_seconds() / 3600) # PVGIS returns UTC-ordered rows. roll_utc_offset/coerce_year make pvlib # roll the data so the series starts at local midnight of sample_year # while keeping each row's timestamp the correct UTC instant — never # relabel the UTC-ordered rows with local-time labels. tmy_data, metadata = pvlib.iotools.get_pvgis_tmy( latitude, longitude, outputformat="json", usehorizon=True, map_variables=True, url="https://re.jrc.ec.europa.eu/api/v5_3/", timeout=120, roll_utc_offset=roll_utc_offset, coerce_year=sample_year, ) # Resample to 15-min if requested if freq in ("15min", "15T", "15m"): tmy_data = resample_tmy_to_15min(tmy_data, metadata) elif freq not in ("h", "H", "1h", "1H"): raise ValueError("freq must be 'h' or '15min'") if save_to_file: # Encode metadata in filename: {location}_tmy_{year_min}_{year_max}_{db}.csv try: inputs = metadata.get("inputs", {}) meta_loc = inputs.get("location", {}) rad_db = inputs.get("meteo_data", {}).get("radiation_db", "unknown") year_min = inputs.get("meteo_data", {}).get("year_min", "unknown") year_max = inputs.get("meteo_data", {}).get("year_max", "unknown") # Derive location name from coordinates (fallback) loc_name = f"lat{meta_loc.get('latitude', latitude):.0f}_lon{meta_loc.get('longitude', longitude):.0f}" db_slug = f"pvgis-{rad_db.lower()}" if rad_db != "unknown" else "pvgis" filename = f"weather/{loc_name}_tmy_{year_min}_{year_max}_{db_slug}.csv" except (KeyError, AttributeError): filename = f"weather/tmy_data_{sample_year if sample_year else 'original'}_{freq}.csv" os.makedirs(os.path.dirname(filename), exist_ok=True) tmy_data.to_csv(filename) logger.info("Saved TMY data to %s", filename) return tmy_data, metadata
[docs] def fetch_weather_data( latitude: float, longitude: float, start_date: str, end_date: str, tilt: float, azimuth: float, freq: str = "h", save_to_file: bool = True, location_name: Optional[str] = None, output_dir: str = "weather", ) -> pd.DataFrame: """ Fetch historical weather data from the Open-Meteo API. Args: latitude: Latitude of the location longitude: Longitude of the location start_date: Start date in format 'YYYY-MM-DD' end_date: End date in format 'YYYY-MM-DD' tilt: Tilt of the PV panel (degrees) azimuth: Azimuth of the PV system (0° S, -90° E, 90° W, 180° N) freq: Output frequency ('h' for hourly, '15min' for 15-minute) save_to_file: Whether to save the data to CSV location_name: Location name for filename (e.g., 'porto'). If None, uses lat/lon. output_dir: Directory to save the file (default: 'weather') Returns: DataFrame with weather variables Raises: ImportError: If openmeteo_requests is not installed Note: Responses are cached in a ``.cache.sqlite`` file created in the current working directory (30-day expiry). Delete it to force fresh API responses. """ if not HAS_OPENMETEO: raise ImportError( "openmeteo_requests is required for historical weather data. " "Install with: uv add openmeteo-requests requests-cache" ) # Setup the Open-Meteo API client with cache and retry. Cache expires # after 30 days so we don't serve indefinitely-stale entries if a single # bad response was ever written. cache_session = requests_cache.CachedSession(".cache", expire_after=timedelta(days=30)) retries = Retry(total=5, backoff_factor=0.2, status_forcelist=[500, 502, 503, 504]) cache_session.mount("https://", HTTPAdapter(max_retries=retries)) cache_session.mount("http://", HTTPAdapter(max_retries=retries)) openmeteo = openmeteo_requests.Client(session=cache_session) url = "https://archive-api.open-meteo.com/v1/archive" params = { "latitude": latitude, "longitude": longitude, "start_date": start_date, "end_date": end_date, "hourly": [ "temperature_2m", "wind_speed_10m", "shortwave_radiation", "direct_radiation", "diffuse_radiation", "direct_normal_irradiance", "global_tilted_irradiance", "terrestrial_radiation", ], "wind_speed_unit": "ms", "timezone": "GMT", "tilt": tilt, "azimuth": azimuth, } responses = openmeteo.weather_api(url, params=params) response = responses[0] # Process hourly data hourly = response.Hourly() hourly_data = { "date": pd.date_range( start=pd.to_datetime(hourly.Time(), unit="s"), end=pd.to_datetime(hourly.TimeEnd(), unit="s"), freq=pd.Timedelta(seconds=hourly.Interval()), inclusive="left", ), "temperature_2m": hourly.Variables(0).ValuesAsNumpy(), "wind_speed_10m": hourly.Variables(1).ValuesAsNumpy(), "shortwave_radiation": hourly.Variables(2).ValuesAsNumpy(), "direct_radiation": hourly.Variables(3).ValuesAsNumpy(), "diffuse_radiation": hourly.Variables(4).ValuesAsNumpy(), "direct_normal_irradiance": hourly.Variables(5).ValuesAsNumpy(), "global_tilted_irradiance": hourly.Variables(6).ValuesAsNumpy(), "terrestrial_radiation": hourly.Variables(7).ValuesAsNumpy(), } hourly_dataframe = pd.DataFrame(data=hourly_data) hourly_dataframe.set_index("date", inplace=True) # Resample to 15-min if requested (pass location for clear-sky scaling) if freq in ("15min", "15T", "15m"): hourly_dataframe = resample_to_15min(hourly_dataframe, method="makima", latitude=latitude, longitude=longitude) if save_to_file: start_year = start_date[:4] end_year = end_date[:4] if location_name: loc_slug = safe_path_slug(location_name) else: loc_slug = f"lat{latitude:.0f}_lon{longitude:.0f}" filename = os.path.join(output_dir, f"{loc_slug}_historical_{start_year}_{end_year}_openmeteo.csv") os.makedirs(output_dir, exist_ok=True) hourly_dataframe.to_csv(filename) logger.info("Saved weather data to %s", filename) return hourly_dataframe
[docs] def resample_tmy_to_15min(tmy_data: pd.DataFrame, metadata: dict) -> pd.DataFrame: """ Resample TMY data from hourly to 15-minute intervals using Makima interpolation. Uses clear-sky scaling for GHI to preserve solar physics. Args: tmy_data: DataFrame with hourly TMY data metadata: Metadata dict from PVGIS containing location info Returns: DataFrame with 15-minute intervals """ # Location setup for clear-sky model loc = metadata["inputs"]["location"] site = Location(loc["latitude"], loc["longitude"], altitude=loc["elevation"]) # Time handling df_60 = tmy_data.copy() start = df_60.index[0] end = df_60.index[-1] + pd.Timedelta(minutes=45) index_15 = pd.date_range(start, end, freq="15min", tz=df_60.index.tz) # Convert timestamps to unix floats for Scipy x_60 = df_60.index.view(np.int64) // 10**9 x_15 = index_15.view(np.int64) // 10**9 # Clear-sky scaling for irradiance (GHI, DNI, DHI) # Interpolate clearness indices instead of raw irradiance to preserve # sunrise/sunset transitions and physical consistency between components. cs_60 = site.get_clearsky(df_60.index) cs_15 = site.get_clearsky(index_15) df_15 = pd.DataFrame(index=index_15) epsilon = 5.0 # Increased epsilon to avoid divide-by-zero spikes near dawn/dusk for comp in ("ghi", "dni", "dhi"): if comp in df_60.columns: k_60 = (df_60[comp] / (cs_60[comp] + epsilon)).values # Clip K multiplier to physically reasonable max (e.g. 1.5x) to avoid massive dawn/dusk spikes k_60 = np.clip(k_60, 0, 1.5) makima_k = Akima1DInterpolator(x_60, k_60, method="makima") k_15 = makima_k(x_15) df_15[comp] = np.clip(k_15 * cs_15[comp], 0, None) # Interpolate non-irradiance columns directly with Makima met_cols = ["temp_air", "relative_humidity", "wind_speed"] for col in met_cols: if col in df_60.columns: y_60 = df_60[col].values makima_generic = Akima1DInterpolator(x_60, y_60, method="makima") df_15[col] = makima_generic(x_15) # Physical clipping if "relative_humidity" in df_15: df_15["relative_humidity"] = df_15["relative_humidity"].clip(0, 100) if "wind_speed" in df_15: df_15["wind_speed"] = np.clip(df_15["wind_speed"], 0, None) return df_15
[docs] def resample_to_15min( df_hourly: pd.DataFrame, method: str = "makima", non_negative_cols: Optional[List[str]] = None, latitude: Optional[float] = None, longitude: Optional[float] = None, ) -> pd.DataFrame: """ Resample hourly DataFrame to 15-minute intervals. When latitude/longitude are provided, uses clear-sky scaling for irradiance columns (GHI, DNI, DHI) to preserve solar physics at sunrise/sunset transitions. Otherwise falls back to direct interpolation. Supports both TMY column names (ghi, dni, dhi) and Open-Meteo column names (shortwave_radiation, direct_normal_irradiance, diffuse_radiation). Args: df_hourly: DataFrame with hourly DatetimeIndex method: Interpolation method ('makima', 'linear', 'cubic') non_negative_cols: Columns to clip at zero (auto-detected for solar/wind) latitude: Location latitude for clear-sky scaling (optional) longitude: Location longitude for clear-sky scaling (optional) Returns: DataFrame with 15-minute intervals Raises: ValueError: If DataFrame doesn't have DatetimeIndex """ # Ensure DatetimeIndex if not isinstance(df_hourly.index, pd.DatetimeIndex): raise ValueError("DataFrame must have a DatetimeIndex") df_hourly = df_hourly.sort_index() # Create target 15-min index target_index = pd.date_range( start=df_hourly.index[0], end=df_hourly.index[-1] + pd.Timedelta(minutes=45), freq="15min", ) # Convert timestamps to seconds for interpolation x_original = df_hourly.index.astype("int64") // 10**9 x_target = target_index.astype("int64") // 10**9 # Map column names to irradiance type (supports TMY and Open-Meteo conventions) irrad_col_map = {} # column_name -> clear-sky component ('ghi', 'dni', 'dhi') for col in df_hourly.columns: col_lower = col.lower() if col_lower in ("ghi", "shortwave_radiation", "global_horizontal_irradiance"): irrad_col_map[col] = "ghi" elif col_lower in ("dni", "direct_normal_irradiance"): irrad_col_map[col] = "dni" elif col_lower in ("dhi", "diffuse_radiation", "diffuse_horizontal_irradiance"): irrad_col_map[col] = "dhi" # Use clear-sky scaling if location is provided and we found irradiance columns use_clearsky = latitude is not None and longitude is not None and len(irrad_col_map) > 0 df_15min = pd.DataFrame(index=target_index) epsilon = 5.0 # Increased epsilon to avoid divide-by-zero spikes near dawn/dusk if use_clearsky: site = Location(latitude, longitude) cs_hourly = site.get_clearsky(df_hourly.index) cs_15min = site.get_clearsky(target_index) # Get numeric columns only numeric_df = df_hourly.select_dtypes(include=[np.number]) for col in numeric_df.columns: y_original = numeric_df[col].values if use_clearsky and col in irrad_col_map: # Clear-sky scaling: interpolate clearness index, not raw irradiance cs_comp = irrad_col_map[col] k_hourly = y_original / (cs_hourly[cs_comp].values + epsilon) # Clip K multiplier to physically reasonable max (e.g. 1.5x) to avoid massive dawn/dusk spikes k_hourly = np.clip(k_hourly, 0, 1.5) if method == "makima": interp_k = Akima1DInterpolator(x_original, k_hourly, method="makima") else: from scipy.interpolate import interp1d interp_k = interp1d(x_original, k_hourly, kind=method, fill_value="extrapolate") k_15min = interp_k(x_target) df_15min[col] = np.clip(k_15min * cs_15min[cs_comp].values, 0, None) else: # Direct interpolation for non-irradiance columns if method == "makima": interp = Akima1DInterpolator(x_original, y_original, method="makima") else: from scipy.interpolate import interp1d interp = interp1d(x_original, y_original, kind=method, fill_value="extrapolate") df_15min[col] = interp(x_target) # Auto-detect non-negative columns (solar/wind) — applies to columns not # already handled by clear-sky scaling if non_negative_cols is None: non_negative_cols = [] for col in df_15min.columns: if col in irrad_col_map and use_clearsky: continue # already clipped via clear-sky scaling if any(x in col.lower() for x in ["irrad", "radiation", "tilted", "terrestrial", "wind", "speed"]): non_negative_cols.append(col) # Clip negative values for col in non_negative_cols: if col in df_15min.columns: df_15min[col] = np.clip(df_15min[col], 0, None) return df_15min
[docs] def resample_to_hourly(df_15min: pd.DataFrame, agg_method: str = "mean") -> pd.DataFrame: """ Resample 15-minute DataFrame to hourly intervals. Args: df_15min: DataFrame with 15-minute DatetimeIndex agg_method: Aggregation method ('mean', 'sum', 'first', 'last') Returns: DataFrame with hourly intervals """ if agg_method == "mean": return df_15min.resample("h").mean() elif agg_method == "sum": return df_15min.resample("h").sum() elif agg_method == "first": return df_15min.resample("h").first() elif agg_method == "last": return df_15min.resample("h").last() else: raise ValueError(f"Unknown aggregation method: {agg_method}")
def csv_15min_to_hourly( input_file_name: str, output_file_name: str, datetime_column: str = "Datetime", datetime_format: str = "%d/%m/%Y %H:%M", ) -> Optional[pd.DataFrame]: """ Convert 15-minute interval CSV data to hourly data. Args: input_file_name: Path to input CSV file with 15-minute data output_file_name: Path for output CSV file with hourly data datetime_column: Name of the datetime column datetime_format: Format of the datetime string Returns: DataFrame with hourly aggregated data, or None on error """ try: df = pd.read_csv(input_file_name) df[datetime_column] = pd.to_datetime(df[datetime_column], format=datetime_format) df.set_index(datetime_column, inplace=True) hourly_df = df.resample("h").sum() hourly_df = hourly_df.reset_index() hourly_df.to_csv(output_file_name, index=False) logger.info( "Converted %s to hourly data at %s (%s -> %s)", input_file_name, output_file_name, df.shape, hourly_df.shape, ) return hourly_df except Exception as e: logger.error("Error processing file: %s", e) return None def csv_hourly_to_15min( input_file_name: str, output_file_name: str, datetime_column: str = "Datetime", datetime_format: str = "%d/%m/%Y %H:%M", non_negative_cols: Optional[List[str]] = None, latitude: Optional[float] = None, longitude: Optional[float] = None, ) -> Optional[pd.DataFrame]: """ Convert hourly CSV data to 15-minute intervals using Makima interpolation. Args: input_file_name: Path to input CSV file with hourly data output_file_name: Path for output CSV file with 15-minute data datetime_column: Name of the datetime column datetime_format: Format of the datetime string non_negative_cols: Columns to force >= 0 latitude: Location latitude for clear-sky scaling of irradiance (optional) longitude: Location longitude for clear-sky scaling of irradiance (optional) Returns: DataFrame with 15-minute interpolated data, or None on error """ try: df = pd.read_csv(input_file_name) df[datetime_column] = pd.to_datetime(df[datetime_column], format=datetime_format) df.set_index(datetime_column, inplace=True) df = df.sort_index() # Use the resample function df_15min = resample_to_15min( df, method="makima", non_negative_cols=non_negative_cols, latitude=latitude, longitude=longitude ) # Reset index for saving df_15min = df_15min.reset_index().rename(columns={"index": datetime_column}) df_15min.to_csv(output_file_name, index=False) logger.info( "Converted %s to 15-min data (Makima) at %s (%s -> %s)", input_file_name, output_file_name, df.shape, df_15min.shape, ) return df_15min except Exception as e: logger.error("Error processing file: %s", e) return None def _derive_steps_per_year(dates: pd.Series) -> int: """Derive expected rows per non-leap year from the median timestep.""" if len(dates) < 2: return 8760 step = dates.diff().median() if pd.isna(step) or step.total_seconds() <= 0: return 8760 hours_per_step = step.total_seconds() / 3600.0 return int(round(8760.0 / hours_per_step)) def select_random_year_and_replace_datetime(csv_file_path: str, target_year: int = 2025) -> Tuple[pd.DataFrame, int]: """ Load weather data, randomly select a year, and replace datetime with target year. Args: csv_file_path: Path to the CSV file target_year: Year to replace the selected year's datetime with Returns: Tuple of (DataFrame with target year dates, selected_year) """ df = pd.read_csv(csv_file_path) # Parse datetime with format detection try: df["date"] = pd.to_datetime(df["date"], format="ISO8601") except ValueError: try: df["date"] = pd.to_datetime(df["date"], format="%d/%m/%Y %H:%M") except ValueError: df["date"] = pd.to_datetime(df["date"], format="mixed") # Extract year and get available years df["year"] = df["date"].dt.year available_years = df["year"].unique() # Use numpy RNG so Monte Carlo's np.random.seed(...) controls this choice. selected_year = int(np.random.choice(available_years)) # Filter data for selected year selected_year_data = df[df["year"] == selected_year].copy() # Drop Feb 29 by date, not by row count, so this works at any resolution. feb_29_mask = (selected_year_data["date"].dt.month == 2) & (selected_year_data["date"].dt.day == 29) if feb_29_mask.any(): selected_year_data = selected_year_data[~feb_29_mask] # Validate against the data's own step size instead of assuming hourly data. expected_rows = _derive_steps_per_year(selected_year_data["date"]) if len(selected_year_data) != expected_rows: logger.warning("Year %s has %d rows, expected %d", selected_year, len(selected_year_data), expected_rows) # Replace year in datetime year_diff = target_year - selected_year selected_year_data["date"] = selected_year_data["date"] + pd.DateOffset(years=year_diff) # Cleanup selected_year_data = selected_year_data.drop("year", axis=1) selected_year_data = selected_year_data.reset_index(drop=True) return selected_year_data, selected_year
[docs] def preload_weather_by_year( csv_file_path: str, target_year: int = 2025, ) -> Dict[int, pd.DataFrame]: """ Pre-load weather CSV once and split into per-year DataFrames. Each year's dates are remapped to *target_year* so the resulting DataFrames can be used directly in simulation (same datetime grid as ``select_random_year_and_replace_datetime`` would produce). Args: csv_file_path: Path to the multi-year weather CSV target_year: Calendar year to remap all dates to Returns: Dict mapping original year → DataFrame with target-year dates, indexed by 'date' """ df = pd.read_csv(csv_file_path) # Parse datetime once try: df["date"] = pd.to_datetime(df["date"], format="ISO8601") except ValueError: try: df["date"] = pd.to_datetime(df["date"], format="%d/%m/%Y %H:%M") except ValueError: df["date"] = pd.to_datetime(df["date"], format="mixed") df["year"] = df["date"].dt.year available_years = df["year"].unique() result: Dict[int, pd.DataFrame] = {} for yr in available_years: yr_data = df[df["year"] == yr].copy() # Drop Feb 29 by date, not by row count, so this works at any resolution. feb_29_mask = (yr_data["date"].dt.month == 2) & (yr_data["date"].dt.day == 29) if feb_29_mask.any(): yr_data = yr_data[~feb_29_mask] expected_rows = _derive_steps_per_year(yr_data["date"]) if len(yr_data) != expected_rows: continue # skip incomplete years # Remap to target year year_diff = target_year - yr yr_data["date"] = yr_data["date"] + pd.DateOffset(years=year_diff) yr_data = yr_data.drop("year", axis=1).reset_index(drop=True) result[int(yr)] = yr_data return result
[docs] def fetch_tmy_nsrdb( latitude: float, longitude: float, api_key: Optional[str] = None, email: Optional[str] = None, year: str = "tmy", location_name: Optional[str] = None, freq: str = "h", save_to_file: bool = False, ) -> Tuple[pd.DataFrame, dict]: """ Fetch TMY data from NREL's NSRDB (National Solar Radiation Database) via pvlib. Uses pvlib.iotools.get_nsrdb_psm4_tmy (PSM4 API). Requires NREL_API_KEY and NREL_EMAIL environment variables, or pass them directly. Get a free key at https://developer.nrel.gov/signup/ Args: latitude: Latitude of the location longitude: Longitude of the location api_key: NREL API key (falls back to NREL_API_KEY env var) email: Email for NREL API (falls back to NREL_EMAIL env var) year: TMY variant — 'tmy' for TMY, or a specific year string like '2020' location_name: Location name for the output filename freq: Output frequency ('h' for hourly, '15min' for 15-minute) save_to_file: Whether to save the data to CSV Returns: Tuple of (weather DataFrame, metadata dict) Raises: ValueError: If API key or email not provided """ api_key = api_key or os.environ.get("NREL_API_KEY") email = email or os.environ.get("NREL_EMAIL") if not api_key or not email: raise ValueError( "NREL API key and email are required. Set NREL_API_KEY and NREL_EMAIL " "environment variables or pass them directly." ) df, metadata = pvlib.iotools.get_nsrdb_psm4_tmy( latitude=latitude, longitude=longitude, api_key=api_key, email=email, year=year, map_variables=True, ) # Keep only the columns we need (map_variables=True gives pvlib standard names) wanted = [c for c in ("ghi", "dni", "dhi", "temp_air", "wind_speed") if c in df.columns] df = df[wanted].copy() # Resample to 15-min if requested if freq in ("15min", "15T", "15m"): df = resample_to_15min(df, method="makima", latitude=latitude, longitude=longitude) if save_to_file and location_name: loc_slug = safe_path_slug(location_name) vintage = safe_path_slug(year) if year != "tmy" else "tmy" filename = f"weather/{loc_slug}_tmy_{vintage}_nsrdb.csv" os.makedirs(os.path.dirname(filename), exist_ok=True) df.to_csv(filename) logger.info("Saved NSRDB data to %s", filename) return df, metadata
[docs] def read_epw_file( filepath: str, freq: str = "h", latitude: Optional[float] = None, longitude: Optional[float] = None ) -> pd.DataFrame: """ Read an EPW (EnergyPlus Weather) file and return standardized weather DataFrame. EPW files can be downloaded from https://climate.onebuilding.org/ Args: filepath: Path to the .epw file freq: Output frequency ('h' for hourly, '15min' for 15-minute) latitude: Override latitude for clear-sky scaling (auto-detected from EPW if None) longitude: Override longitude for clear-sky scaling (auto-detected from EPW if None) Returns: DataFrame with standardized column names (ghi, dni, dhi, temp_air, wind_speed) """ df, meta = pvlib.iotools.read_epw(filepath) # Standardize column names rename_map = { "ghi": "ghi", "dni": "dni", "dhi": "dhi", "temp_air": "temp_air", "wind_speed": "wind_speed", } available = {k: v for k, v in rename_map.items() if k in df.columns} df = df[list(available.keys())].rename(columns=available) # Use EPW metadata for coordinates if not provided if latitude is None: latitude = meta.get("latitude") if longitude is None: longitude = meta.get("longitude") # Resample to 15-min if requested if freq in ("15min", "15T", "15m"): df = resample_to_15min(df, method="makima", latitude=latitude, longitude=longitude) return df
[docs] def extract_ambient_temperature(weather_df: pd.DataFrame) -> Optional[pd.Series]: """ Extract hourly ambient temperature from a weather DataFrame. Tries known column names in order of preference: - 'temp_air' — PVGIS TMY (pvlib standard name) - 'temperature_2m' — Open-Meteo historical - 'temp' — generic fallback - 'air_temperature'— alternative naming Returns: pd.Series of temperatures, or None if no recognised column found. """ for col in ("temp_air", "temperature_2m", "temp", "air_temperature"): if col in weather_df.columns: return weather_df[col] return None
def build_battery_temperature_series( temp_config: Any = None, index: Optional[pd.DatetimeIndex] = None, *, start_time: Optional[pd.Timestamp] = None, end_time: Optional[pd.Timestamp] = None, freq: str = "h", default_temp: float = 25.0, weather_df: Optional[pd.DataFrame] = None, indoor_model: Optional[Dict[str, Any]] = None, ) -> pd.Series: """Build the battery-temperature series used by degradation models. ``temp_config`` accepts the same forms used by internal runners: ``None``/``"weather"`` uses weather data, a number is a fixed temperature, and a string is treated as a CSV path. The indoor buffering model is applied by default and can be disabled with ``indoor_model={"enabled": False}``. """ if index is None: if start_time is None or end_time is None: raise ValueError("Either index or start_time/end_time must be provided.") index = pd.date_range(start=start_time, end=end_time, freq=freq) else: index = pd.DatetimeIndex(index) result: Optional[pd.Series] = None weather_indexed = weather_df if weather_indexed is not None and not isinstance(weather_indexed.index, pd.DatetimeIndex): date_col = next( (c for c in weather_indexed.columns if str(c).lower() in {"date", "datetime", "time"}), None, ) if date_col is not None: weather_indexed = weather_indexed.copy() weather_indexed[date_col] = pd.to_datetime(weather_indexed[date_col]) weather_indexed = weather_indexed.set_index(date_col) if temp_config is None or (isinstance(temp_config, str) and temp_config.lower() == "weather"): ambient = extract_ambient_temperature(weather_indexed) if weather_indexed is not None else None if ambient is not None: ambient = ambient.copy() if not isinstance(ambient.index, pd.DatetimeIndex) and len(ambient) == len(index): ambient.index = index result = ambient.reindex(index).ffill().fillna(default_temp) else: result = pd.Series(default_temp, index=index) elif isinstance(temp_config, (int, float)): result = pd.Series(float(temp_config), index=index) elif isinstance(temp_config, str): if os.path.exists(temp_config): try: df = pd.read_csv(temp_config) date_col = next((c for c in df.columns if c.lower() in ["date", "datetime", "time"]), None) val_col = next((c for c in df.columns if c.lower() in ["temp", "temperature", "t_cell", "t_amb"]), None) if date_col and val_col: df[date_col] = pd.to_datetime(df[date_col]) df.set_index(date_col, inplace=True) result = df[val_col].reindex(index).ffill().fillna(default_temp) except Exception: result = None if result is None: result = pd.Series(default_temp, index=index) else: result = pd.Series(default_temp, index=index) indoor_model = indoor_model or {} from breos.constants import ( DEFAULT_INDOOR_CEILING_C, DEFAULT_INDOOR_COUPLING_ALPHA, DEFAULT_INDOOR_FLOOR_C, DEFAULT_INDOOR_MODEL_ENABLED, DEFAULT_INDOOR_SETPOINT_C, ) if indoor_model.get("enabled", DEFAULT_INDOOR_MODEL_ENABLED): from breos.battery import apply_indoor_temperature_model result = apply_indoor_temperature_model( result, setpoint_c=indoor_model.get("setpoint_c", DEFAULT_INDOOR_SETPOINT_C), coupling_alpha=indoor_model.get("coupling_alpha", DEFAULT_INDOOR_COUPLING_ALPHA), floor_c=indoor_model.get("floor_c", DEFAULT_INDOOR_FLOOR_C), ceiling_c=indoor_model.get("ceiling_c", DEFAULT_INDOOR_CEILING_C), ) return result