"""
CPAU Electric Meter Implementation
This module provides the CpauElectricMeter class for retrieving electric
meter usage data from the CPAU portal.
"""
import calendar
import json
import logging
from datetime import date, datetime, timedelta
from typing import Optional, Iterator
from .meter import CpauMeter, UsageRecord
from .exceptions import CpauApiError
logger = logging.getLogger(__name__)
[docs]
class CpauElectricMeter(CpauMeter):
"""
Represents a CPAU electric meter and provides methods to retrieve usage data.
Supports five interval types:
- billing: Billing period data (CPAU's billing periods, roughly monthly)
- monthly: Calendar month aggregation (sum of daily data by month)
- daily: Daily aggregated usage
- hourly: Hourly usage data
- 15min: 15-minute interval usage data
"""
# Map interval names to API mode codes
# Note: 'monthly' is special - it aggregates daily data, not a direct API mode
_INTERVAL_MODE_MAP = {
'billing': 'M',
'monthly': None, # Aggregated from daily data
'daily': 'D',
'hourly': 'H',
'15min': 'MI'
}
[docs]
def get_available_intervals(self) -> list[str]:
"""
Get list of supported interval types for electric meters.
Returns:
['billing', 'monthly', 'daily', 'hourly', '15min']
"""
return list(self._INTERVAL_MODE_MAP.keys())
@property
def rate_category(self) -> str:
"""Get the rate category/schedule for this meter."""
return self._meter_info.get('MeterAttribute2', '')
[docs]
def get_usage(
self,
interval: str,
start_date: date,
end_date: Optional[date] = None
) -> list[UsageRecord]:
"""
Retrieve usage data for the specified interval and date range.
Args:
interval: One of 'billing', 'monthly', 'daily', 'hourly', '15min'
start_date: Start date (inclusive)
end_date: End date (inclusive). If None, defaults to 2 days ago.
Returns:
List of UsageRecord objects sorted by date
Raises:
ValueError: If interval is invalid or date range is invalid
CpauApiError: If API request fails
Notes:
- billing: Returns CPAU billing periods that overlap with the date range
- monthly: Returns calendar month aggregations of daily data
- Other intervals return data within the exact date range
- Date range is limited to data available from CPAU (typically not within last 2 days)
"""
# Validate interval
if interval not in self._INTERVAL_MODE_MAP:
logger.error(f"Invalid interval: {interval}")
raise ValueError(
f"Invalid interval '{interval}'. Must be one of: {', '.join(self.get_available_intervals())}"
)
# Default end_date to 2 days ago
if end_date is None:
end_date = date.today() - timedelta(days=2)
logger.info(f"Fetching {interval} usage data from {start_date} to {end_date}")
# Validate date range
if end_date < start_date:
logger.error(f"Invalid date range: end_date ({end_date}) < start_date ({start_date})")
raise ValueError(f"end_date ({end_date}) must be >= start_date ({start_date})")
# Adjust date range if it exceeds data availability limits
# Data is typically not available for the last 2 days
two_days_ago = date.today() - timedelta(days=2)
original_end_date = end_date
if end_date > two_days_ago:
logger.warning(f"Requested end_date ({end_date}) is beyond data availability limit ({two_days_ago}). Adjusting to {two_days_ago}.")
end_date = two_days_ago
# Check if adjusted range is still valid
if end_date < start_date:
logger.warning(f"After adjusting for data availability, no data is available in the requested range ({start_date} to {original_end_date})")
return []
# Get mode code
mode = self._INTERVAL_MODE_MAP[interval]
# Special handling for monthly (calendar month aggregation)
if interval == 'monthly':
logger.debug("Aggregating daily data into calendar months")
return self._aggregate_monthly(start_date, end_date)
logger.debug(f"Using API mode: {mode}")
# Fetch data based on interval type
if mode == 'M':
logger.debug("Fetching billing period data")
raw_records = self._fetch_monthly_data()
elif mode == 'D':
logger.debug("Fetching daily data")
raw_records = self._fetch_daily_data(start_date, end_date)
else: # Hourly or 15min
logger.debug(f"Fetching {interval} data")
raw_records = self._fetch_hourly_or_15min_data(mode, start_date, end_date)
logger.debug(f"Retrieved {len(raw_records)} raw records from API")
# Parse and filter records
usage_records = self._parse_records(raw_records, interval, start_date, end_date)
logger.info(f"Retrieved {len(usage_records)} {interval} usage records")
return usage_records
[docs]
def get_billing_usage(
self,
start_date: date,
end_date: Optional[date] = None
) -> list[UsageRecord]:
"""
Retrieve billing period data.
Convenience method equivalent to get_usage(interval='billing', ...)
Returns:
List of UsageRecord objects with billing_period attribute populated
"""
return self.get_usage('billing', start_date, end_date)
[docs]
def get_monthly_usage(
self,
start_date: date,
end_date: Optional[date] = None
) -> list[UsageRecord]:
"""
Retrieve calendar month aggregated usage data.
Convenience method equivalent to get_usage(interval='monthly', ...)
Aggregates daily data into calendar months.
Returns:
List of UsageRecord objects, one per calendar month
"""
return self.get_usage('monthly', start_date, end_date)
[docs]
def get_daily_usage(
self,
start_date: date,
end_date: Optional[date] = None
) -> list[UsageRecord]:
"""
Retrieve daily usage data.
Convenience method equivalent to get_usage(interval='daily', ...)
"""
return self.get_usage('daily', start_date, end_date)
[docs]
def get_hourly_usage(
self,
start_date: date,
end_date: Optional[date] = None
) -> list[UsageRecord]:
"""
Retrieve hourly usage data.
Convenience method equivalent to get_usage(interval='hourly', ...)
Note: For large date ranges, this makes one API call per day and may be slow.
"""
return self.get_usage('hourly', start_date, end_date)
[docs]
def get_15min_usage(
self,
start_date: date,
end_date: Optional[date] = None
) -> list[UsageRecord]:
"""
Retrieve 15-minute interval usage data.
Convenience method equivalent to get_usage(interval='15min', ...)
Note: For large date ranges, this makes one API call per day and may be slow.
"""
return self.get_usage('15min', start_date, end_date)
[docs]
def get_availability_window(self, interval: str) -> tuple[Optional[date], Optional[date]]:
"""
Find the earliest and latest dates for which data is available.
Uses binary search for efficiency (typically 10-15 API calls per boundary).
Args:
interval: One of 'billing', 'monthly', 'daily', 'hourly', '15min'
Returns:
Tuple of (earliest_date, latest_date) or (None, None) if no data found
Notes:
- For 'billing': Scans all billing periods (single API call)
- For 'monthly': Returns the daily data availability window (since monthly aggregates daily data)
- For other intervals: Uses binary search (10-15 API calls per boundary)
- Total execution time: typically 30-60 seconds for intervals requiring binary search
Raises:
ValueError: If interval is invalid
"""
# Validate interval
if interval not in self._INTERVAL_MODE_MAP:
logger.error(f"Invalid interval: {interval}")
raise ValueError(
f"Invalid interval '{interval}'. Must be one of: {', '.join(self.get_available_intervals())}"
)
logger.info(f"Finding data availability window for {interval} interval")
# Monthly aggregation uses daily data availability
if interval == 'monthly':
logger.debug("Monthly interval: using daily data availability window")
return self.get_availability_window('daily')
mode = self._INTERVAL_MODE_MAP[interval]
# Billing interval: fetch all and scan
if mode == 'M':
logger.debug("Billing interval: fetching all billing periods")
return self._find_billing_window()
# Daily/hourly/15min: use binary search
logger.debug(f"Using binary search for {interval} interval (mode={mode})")
earliest = self._binary_search_earliest(mode, interval)
latest = self._binary_search_latest(mode, interval)
if earliest:
logger.info(f"Availability window for {interval}: {earliest} to {latest}")
else:
logger.info(f"No data found for {interval} interval")
return (earliest, latest)
[docs]
def iter_usage(
self,
interval: str,
start_date: date,
end_date: Optional[date] = None,
chunk_days: int = 30
) -> Iterator[UsageRecord]:
"""
Iterate over usage data in chunks to avoid loading large datasets into memory.
Args:
interval: One of 'billing', 'monthly', 'daily', 'hourly', '15min'
start_date: Start date (inclusive)
end_date: End date (inclusive). If None, defaults to 2 days ago.
chunk_days: Number of days to fetch per API request (default 30)
Yields:
UsageRecord objects one at a time
Notes:
- Useful for processing large date ranges without loading all data into memory
- Billing and monthly intervals don't benefit from chunking (billing returns all periods, monthly aggregates daily data)
"""
if end_date is None:
end_date = date.today() - timedelta(days=2)
if interval in ['billing', 'monthly']:
# Billing/monthly data: just yield from get_usage (no chunking benefit)
for record in self.get_usage(interval, start_date, end_date):
yield record
return
# For other intervals, process in chunks
current_start = start_date
while current_start <= end_date:
current_end = min(current_start + timedelta(days=chunk_days - 1), end_date)
chunk_records = self.get_usage(interval, current_start, current_end)
for record in chunk_records:
yield record
current_start = current_end + timedelta(days=1)
# Private methods for fetching data
def _fetch_monthly_data(self) -> list[dict]:
"""Fetch all monthly billing period data."""
payload = {
'UsageOrGeneration': '1',
'Type': 'K',
'Mode': 'M',
'strDate': '',
'hourlyType': 'H',
'SeasonId': '',
'weatherOverlay': 0,
'usageyear': '',
'MeterNumber': self.meter_number,
'DateFromDaily': '',
'DateToDaily': '',
'IsTier': True,
'IsTou': False
}
data = self._session._make_api_request('LoadUsage', payload)
return data.get('objUsageGenerationResultSetTwo', [])
def _fetch_daily_data(self, start_date: date, end_date: date) -> list[dict]:
"""
Fetch daily data for the specified date range.
The API returns 30-day windows ending on strDate, so we may need
multiple API calls for ranges > 30 days.
"""
days_in_range = (end_date - start_date).days + 1
all_records = []
if days_in_range <= 30:
logger.debug(f"Fetching daily data with single API call ({days_in_range} days)")
# Single API call
payload = {
'UsageOrGeneration': '1',
'Type': 'K',
'Mode': 'D',
'strDate': end_date.strftime('%m/%d/%y'),
'hourlyType': 'H',
'SeasonId': 0,
'weatherOverlay': 0,
'usageyear': '',
'MeterNumber': self.meter_number,
'DateFromDaily': '',
'DateToDaily': '',
'IsTier': True,
'IsTou': False
}
data = self._session._make_api_request('LoadUsage', payload)
all_records = data.get('objUsageGenerationResultSetTwo', [])
else:
# Multiple API calls needed - fetch in 30-day chunks from end date backwards
num_calls = (days_in_range + 29) // 30 # Ceiling division
logger.debug(f"Fetching daily data with multiple API calls ({num_calls} calls for {days_in_range} days)")
current_end = end_date
seen_dates = set() # Track dates to avoid duplicates
call_count = 0
while current_end >= start_date:
call_count += 1
logger.debug(f"Daily data API call {call_count}/{num_calls} for date {current_end}")
payload = {
'UsageOrGeneration': '1',
'Type': 'K',
'Mode': 'D',
'strDate': current_end.strftime('%m/%d/%y'),
'hourlyType': 'H',
'SeasonId': 0,
'weatherOverlay': 0,
'usageyear': '',
'MeterNumber': self.meter_number,
'DateFromDaily': '',
'DateToDaily': '',
'IsTier': True,
'IsTou': False
}
data = self._session._make_api_request('LoadUsage', payload)
records = data.get('objUsageGenerationResultSetTwo', [])
# Add records, avoiding duplicates
# Note: Each date has multiple records (one per usage type: import/export)
for record in records:
usage_date = record.get('UsageDate')
usage_type = record.get('UsageType')
# Dedup key includes both date and type
record_key = f"{usage_date}_{usage_type}"
if usage_date and record_key not in seen_dates:
all_records.append(record)
seen_dates.add(record_key)
# Move back 30 days for next iteration
current_end = current_end - timedelta(days=30)
return all_records
def _fetch_hourly_or_15min_data(self, mode: str, start_date: date, end_date: date) -> list[dict]:
"""
Fetch hourly or 15-minute data for the specified date range.
The API only supports single day per request, so we make one request per day.
"""
days_in_range = (end_date - start_date).days + 1
logger.debug(f"Fetching hourly/15min data: {days_in_range} API calls (one per day)")
all_records = []
current_date = start_date
call_count = 0
while current_date <= end_date:
call_count += 1
logger.debug(f"Hourly/15min data API call {call_count}/{days_in_range} for date {current_date}")
payload = {
'UsageOrGeneration': '1',
'Type': 'K',
'Mode': mode,
'strDate': current_date.strftime('%m/%d/%y'),
'hourlyType': 'H',
'SeasonId': 0,
'weatherOverlay': 0,
'usageyear': '',
'MeterNumber': self.meter_number,
'DateFromDaily': '',
'DateToDaily': '',
'IsTier': True,
'IsTou': False
}
data = self._session._make_api_request('LoadUsage', payload)
records = data.get('objUsageGenerationResultSetTwo', [])
all_records.extend(records)
current_date = current_date + timedelta(days=1)
return all_records
def _aggregate_monthly(self, start_date: date, end_date: date) -> list[UsageRecord]:
"""
Aggregate daily data into calendar months.
Expands the date range to include full calendar months, but only includes
months where complete daily data is available (within the 2-day-ago limit).
For example, if start_date is 2025-08-15 and end_date is 2025-10-15, this
will fetch data from 2025-08-01 to 2025-10-31. However, if the last day of
a month exceeds the data availability limit, that month is excluded.
Args:
start_date: Start date for aggregation
end_date: End date for aggregation
Returns:
List of UsageRecord objects, one per calendar month
"""
# Calculate the "2 days ago" limit for data availability
two_days_ago = date.today() - timedelta(days=2)
# Expand the date range to full calendar months
# Start from the first day of the start month
expanded_start = start_date.replace(day=1)
# End on the last day of the end month using calendar.monthrange
_, last_day = calendar.monthrange(end_date.year, end_date.month)
expanded_end = end_date.replace(day=last_day)
# Check if the expanded end is beyond the data availability limit
# If so, move back to the last complete month that's fully available
if expanded_end > two_days_ago:
logger.debug(f"Expanded end date {expanded_end} exceeds data availability limit {two_days_ago}")
# Find the last day of the previous month
first_of_current_month = expanded_end.replace(day=1)
expanded_end = first_of_current_month - timedelta(days=1)
# Keep moving back until we find a month that's fully available
while expanded_end > two_days_ago:
first_of_current_month = expanded_end.replace(day=1)
expanded_end = first_of_current_month - timedelta(days=1)
logger.debug(f"Adjusted to last complete month ending: {expanded_end}")
# If adjusted end is before adjusted start, no complete months available
if expanded_end < expanded_start:
logger.warning(f"No complete months available in range {start_date} to {end_date}")
return []
logger.debug(f"Expanding date range for monthly aggregation: {start_date} to {end_date} -> {expanded_start} to {expanded_end}")
# Fetch daily data for the expanded range
logger.debug(f"Fetching daily data to aggregate into months: {expanded_start} to {expanded_end}")
daily_records = self.get_usage('daily', expanded_start, expanded_end)
# Group by calendar month
monthly_data = {}
for record in daily_records:
# Extract year-month key
record_date = record.date
if isinstance(record_date, datetime):
month_key = record_date.strftime('%Y-%m')
else:
month_key = record_date.strftime('%Y-%m')
if month_key not in monthly_data:
monthly_data[month_key] = {
'import_kwh': 0.0,
'export_kwh': 0.0,
}
monthly_data[month_key]['import_kwh'] += record.import_kwh
monthly_data[month_key]['export_kwh'] += record.export_kwh
# Convert to UsageRecord objects
usage_records = []
for month_key in sorted(monthly_data.keys()):
month_data = monthly_data[month_key]
# Create datetime for first day of the month
year, month = month_key.split('-')
month_date = datetime(int(year), int(month), 1)
net_kwh = month_data['import_kwh'] - month_data['export_kwh']
record = UsageRecord(
date=month_date,
import_kwh=month_data['import_kwh'],
export_kwh=month_data['export_kwh'],
net_kwh=net_kwh
# No billing period fields for calendar months
)
usage_records.append(record)
logger.info(f"Aggregated daily data into {len(usage_records)} calendar months")
return usage_records
def _parse_records(
self,
raw_records: list[dict],
interval: str,
start_date: date,
end_date: date
) -> list[UsageRecord]:
"""
Parse raw API records into UsageRecord objects.
Handles grouping of import/export records and filtering by date range.
"""
grouped_data = {}
is_billing = (interval == 'billing')
for record in raw_records:
if is_billing:
period_start_dt, period_end_dt = self._parse_billing_period(record)
# Filter to billing periods that overlap the requested range.
# If the period boundaries can't be parsed, include the record
# rather than silently dropping it.
if period_start_dt and period_end_dt:
if period_end_dt.date() < start_date or period_start_dt.date() > end_date:
continue
# Group by Year-Month
key = f"{record['Year']}-{record['Month']:02d}"
if key not in grouped_data:
if period_start_dt and period_end_dt:
billing_start = period_start_dt.strftime('%Y-%m-%d')
billing_end = period_end_dt.strftime('%Y-%m-%d')
billing_length = (period_end_dt.date() - period_start_dt.date()).days + 1
period_datetime = period_start_dt
else:
billing_start = None
billing_end = None
billing_length = None
period_datetime = datetime(record['Year'], record['Month'], 1)
grouped_data[key] = {
'date': period_datetime,
'billing_period_start': billing_start,
'billing_period_end': billing_end,
'billing_period_length': billing_length,
'export_kwh': 0.0,
'import_kwh': 0.0,
}
else:
# Daily/Hourly/15min data: group by UsageDate (and time for hourly/15min)
# Parse the usage date from API format (MM/DD/YY)
record_dt = datetime.strptime(record['UsageDate'], '%m/%d/%y')
record_date = record_dt.date()
# For daily mode, filter to requested date range
if interval == 'daily':
if record_date < start_date or record_date > end_date:
continue # Skip records outside the requested range
# Convert to datetime for output
if interval in ['hourly', '15min'] and record.get('Hourly'):
# Hourly/15min: combine date and time
time_str = record['Hourly'] # Format: "HH:MM"
key = f"{record['UsageDate']} {time_str}"
try:
record_datetime = datetime.strptime(
f"{record_dt.strftime('%Y-%m-%d')} {time_str}:00",
'%Y-%m-%d %H:%M:%S'
)
except ValueError:
record_datetime = record_dt
else:
# Daily: just the date
key = record['UsageDate']
record_datetime = record_dt
if key not in grouped_data:
grouped_data[key] = {
'date': record_datetime,
'export_kwh': 0.0,
'import_kwh': 0.0,
}
# Accumulate usage values
usage_type = record.get('UsageType', '')
usage_value = float(record.get('UsageValue', 0))
if usage_type == 'Eusage': # Export (generation)
grouped_data[key]['export_kwh'] = abs(usage_value)
elif usage_type == 'IUsage': # Import (consumption)
grouped_data[key]['import_kwh'] = usage_value
# Convert to UsageRecord objects
usage_records = []
for key in sorted(grouped_data.keys()):
period_data = grouped_data[key]
net_kwh = period_data['import_kwh'] - period_data['export_kwh']
record = UsageRecord(
date=period_data['date'],
import_kwh=period_data['import_kwh'],
export_kwh=period_data['export_kwh'],
net_kwh=net_kwh,
billing_period_start=period_data.get('billing_period_start'),
billing_period_end=period_data.get('billing_period_end'),
billing_period_length=period_data.get('billing_period_length')
)
usage_records.append(record)
return usage_records
@staticmethod
def _parse_billing_period(record: dict) -> tuple[Optional[datetime], Optional[datetime]]:
"""Return (start, end) datetimes for a billing record.
Tries the legacy ``BillPeriod`` field (formatted as
``"MM/DD/YY to MM/DD/YY"``) first, then falls back to the separate
``FromDate`` / ``ToDate`` fields the API now populates instead.
Returns ``(None, None)`` if neither yields parseable dates.
"""
bill_period = record.get('BillPeriod') or ''
if ' to ' in bill_period:
try:
start_str, end_str = bill_period.split(' to ')
return (
datetime.strptime(start_str.strip(), '%m/%d/%y'),
datetime.strptime(end_str.strip(), '%m/%d/%y'),
)
except ValueError:
pass
from_date = record.get('FromDate')
to_date = record.get('ToDate')
if from_date and to_date:
try:
return (
datetime.strptime(from_date, '%m/%d/%y'),
datetime.strptime(to_date, '%m/%d/%y'),
)
except ValueError:
pass
return (None, None)
def _find_billing_window(self) -> tuple[Optional[date], Optional[date]]:
"""
Find the earliest and latest billing period dates.
Returns all billing periods and scans for min/max dates.
"""
try:
raw_records = self._fetch_monthly_data()
if not raw_records:
logger.debug("No billing period data found")
return (None, None)
earliest_date = None
latest_date = None
for record in raw_records:
period_start_dt, period_end_dt = self._parse_billing_period(record)
if period_start_dt is None or period_end_dt is None:
continue
period_start_date = period_start_dt.date()
period_end_date = period_end_dt.date()
if earliest_date is None or period_start_date < earliest_date:
earliest_date = period_start_date
if latest_date is None or period_end_date > latest_date:
latest_date = period_end_date
logger.debug(f"Found billing window: {earliest_date} to {latest_date}")
return (earliest_date, latest_date)
except Exception as e:
logger.error(f"Error finding billing window: {e}")
return (None, None)
def _check_data_exists(self, mode: str, check_date: date) -> bool:
"""
Check if data exists for a given date and mode.
Args:
mode: API mode code ('D', 'H', 'MI')
check_date: Date to check
Returns:
True if data exists for this date, False otherwise
"""
try:
payload = {
'UsageOrGeneration': '1',
'Type': 'K',
'Mode': mode,
'strDate': check_date.strftime('%m/%d/%y'),
'hourlyType': 'H',
'SeasonId': 0,
'weatherOverlay': 0,
'usageyear': '',
'MeterNumber': self.meter_number,
'DateFromDaily': '',
'DateToDaily': '',
'IsTier': True,
'IsTou': False
}
data = self._session._make_api_request('LoadUsage', payload)
records = data.get('objUsageGenerationResultSetTwo', [])
# Check if any record matches the requested date
# (API may return a window of dates, not just the requested date)
check_date_str = check_date.strftime('%m/%d/%y')
for record in records:
if record.get('UsageDate') == check_date_str:
logger.debug(f"Check {check_date}: data found")
return True
logger.debug(f"Check {check_date}: no data")
return False
except Exception as e:
logger.debug(f"Error checking data for {check_date}: {e}")
return False
def _binary_search_earliest(self, mode: str, interval_name: str) -> Optional[date]:
"""
Use binary search to find the earliest date with data.
Args:
mode: API mode code ('D', 'H', 'MI')
interval_name: Interval name for logging (e.g., 'daily', 'hourly')
Returns:
Earliest date with data, or None if no data found
"""
# Search range: 10 years ago to 2 days ago
today = date.today()
min_date = today - timedelta(days=3650) # 10 years ago
max_date = today - timedelta(days=2) # 2 days ago (data availability)
logger.debug(f"Binary search for earliest {interval_name} data: {min_date} to {max_date}")
left = min_date
right = max_date
earliest_found = None
iterations = 0
while left <= right:
iterations += 1
mid = left + (right - left) // 2
if self._check_data_exists(mode, mid):
# Data exists, search earlier
earliest_found = mid
right = mid - timedelta(days=1)
else:
# No data, search later
left = mid + timedelta(days=1)
logger.debug(f"Earliest {interval_name} search completed in {iterations} iterations")
return earliest_found
def _binary_search_latest(self, mode: str, interval_name: str) -> Optional[date]:
"""
Use binary search to find the latest date with data.
Args:
mode: API mode code ('D', 'H', 'MI')
interval_name: Interval name for logging (e.g., 'daily', 'hourly')
Returns:
Latest date with data, or None if no data found
"""
# Search range: 30 days ago to tomorrow (to handle processing delays)
today = date.today()
min_date = today - timedelta(days=30) # Start far enough back
max_date = today + timedelta(days=1) # Check if data is near-real-time
logger.debug(f"Binary search for latest {interval_name} data: {min_date} to {max_date}")
left = min_date
right = max_date
latest_found = None
iterations = 0
while left <= right:
iterations += 1
mid = left + (right - left) // 2
if self._check_data_exists(mode, mid):
# Data exists, search later
latest_found = mid
left = mid + timedelta(days=1)
else:
# No data, search earlier
right = mid - timedelta(days=1)
logger.debug(f"Latest {interval_name} search completed in {iterations} iterations")
return latest_found