Source code for cpau.watersmart_session

#!/usr/bin/env python3
"""
Watersmart.com session manager with automatic cookie handling.

This module provides a session manager that:
1. Authenticates using Playwright (headless)
2. Extracts and manages session cookies
3. Provides requests.Session for API calls
4. Automatically re-authenticates on 401 errors
"""

import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import requests
from playwright.sync_api import sync_playwright


logger = logging.getLogger(__name__)


[docs] class WatersmartSessionManager: """ Manages authenticated sessions for paloalto.watersmart.com. Handles SAML/SSO authentication via Playwright and provides a requests.Session with valid cookies for API access. Example: >>> manager = WatersmartSessionManager('username', 'password') >>> session = manager.get_session() >>> response = session.get('https://paloalto.watersmart.com/index.php/rest/v1/Chart/RealTimeChart') >>> data = response.json() """
[docs] def __init__(self, username: str, password: str, headless: bool = True, cache_dir: Optional[str] = None): """ Initialize session manager. Args: username: CPAU username password: CPAU password headless: Run Playwright in headless mode (default: True) cache_dir: Directory for caching cookies (default: ~/.cpau) """ self.username = username self.password = password self.headless = headless self.cache_dir = cache_dir self._cookies: Optional[list] = None self._authenticated_at: Optional[datetime] = None logger.debug(f"Initialized WatersmartSessionManager for user {username}")
[docs] def authenticate(self) -> None: """ Authenticate with watersmart.com using Playwright. Performs SAML/SSO login flow and extracts session cookies. Raises: Exception: If authentication fails """ logger.info("Authenticating with watersmart.com...") start_time = datetime.now() try: with sync_playwright() as p: browser = p.chromium.launch(headless=self.headless) context = browser.new_context() page = context.new_page() # Set longer timeout for authentication (60 seconds) page.set_default_timeout(60000) # Step 1: Login to CPAU portal logger.debug("Navigating to CPAU portal...") page.goto('https://mycpau.cityofpaloalto.org/Portal', timeout=60000) logger.debug("Filling in credentials...") page.fill('#txtLogin', self.username) page.fill('#txtpwd', self.password) logger.debug("Submitting login form...") # Wait for navigation after submitting the form with page.expect_navigation(timeout=60000): page.press('#txtpwd', 'Enter') logger.debug(f"Logged in, current URL: {page.url}") page.wait_for_load_state('domcontentloaded', timeout=60000) # Step 2: Navigate to watersmart (triggers SAML flow) logger.debug("Navigating to watersmart (SAML flow)...") try: # Navigate with a more lenient wait condition page.goto('https://paloalto.watersmart.com/index.php/trackUsage', wait_until='commit', timeout=60000) except Exception as e: # If navigation times out, check if we're still on a valid page logger.warning(f"Navigation completed with warning: {e}") # Wait a bit for any redirects and page rendering import time time.sleep(3) logger.debug(f"Final URL: {page.url}") # Verify authentication succeeded if 'login' in page.url.lower() or 'signin' in page.url.lower(): raise Exception("Authentication failed - redirected to login page") # Extract cookies self._cookies = context.cookies() self._authenticated_at = datetime.now() logger.debug(f"Extracted {len(self._cookies)} cookies from {page.url}") browser.close() except Exception as e: logger.error(f"Authentication failed: {e}") raise elapsed = (datetime.now() - start_time).total_seconds() logger.info(f"Authentication successful in {elapsed:.1f}s") logger.debug(f"Extracted {len(self._cookies)} cookies") # Save cookies to cache if cache directory is configured self._save_cookies_to_cache()
def _get_cache_path(self) -> Optional[str]: """ Get the path to the cookie cache file. Returns: Path to cache file, or None if caching is disabled """ if self.cache_dir is None: return None from pathlib import Path import os # Expand user home directory cache_dir = Path(self.cache_dir).expanduser() # Create directory if it doesn't exist cache_dir.mkdir(parents=True, exist_ok=True) # Set directory permissions to 0700 (user-only) os.chmod(cache_dir, 0o700) cache_file = cache_dir / 'watersmart_cookies.json' return str(cache_file) def _load_cached_cookies(self) -> bool: """ Try to load cookies from cache. Returns: True if cookies were loaded successfully, False otherwise """ cache_path = self._get_cache_path() if cache_path is None: logger.debug("Cookie caching disabled") return False from pathlib import Path import json import os cache_file = Path(cache_path) if not cache_file.exists(): logger.debug(f"No cookie cache found at {cache_path}") return False try: # Check file permissions stat_info = os.stat(cache_path) if stat_info.st_mode & 0o077: logger.warning(f"Cache file {cache_path} has insecure permissions, ignoring") return False # Load cache file with open(cache_path, 'r') as f: cache_data = json.load(f) # Validate cache data if cache_data.get('username') != self.username: logger.debug(f"Cache is for different user, ignoring") return False # Check cache age (max 10 minutes based on Phase 2 testing) auth_time_str = cache_data.get('authenticated_at') if auth_time_str: auth_time = datetime.fromisoformat(auth_time_str) age = datetime.now() - auth_time if age > timedelta(minutes=10): logger.debug(f"Cache is {age.total_seconds():.0f}s old (max 600s), ignoring") return False logger.debug(f"Cache is {age.total_seconds():.0f}s old, within valid window") # Load cookies self._cookies = cache_data.get('cookies', []) self._authenticated_at = datetime.fromisoformat(auth_time_str) if auth_time_str else None logger.info(f"Loaded {len(self._cookies)} cookies from cache (age: {age.total_seconds():.1f}s)") return True except Exception as e: logger.warning(f"Failed to load cookie cache: {e}") return False def _save_cookies_to_cache(self) -> None: """ Save current cookies to cache file. """ cache_path = self._get_cache_path() if cache_path is None or self._cookies is None: return import json import os try: cache_data = { 'username': self.username, 'authenticated_at': self._authenticated_at.isoformat() if self._authenticated_at else None, 'cookies': self._cookies } # Write cache file with open(cache_path, 'w') as f: json.dump(cache_data, f, indent=2) # Set file permissions to 0600 (user-only read/write) os.chmod(cache_path, 0o600) logger.debug(f"Saved {len(self._cookies)} cookies to cache: {cache_path}") except Exception as e: logger.warning(f"Failed to save cookie cache: {e}")
[docs] def is_authenticated(self) -> bool: """ Check if we have valid authentication cookies. Returns: bool: True if authenticated, False otherwise """ return self._cookies is not None
[docs] def get_session(self, force_refresh: bool = False) -> requests.Session: """ Get a requests.Session with authenticated cookies. Args: force_refresh: Force re-authentication even if already authenticated Returns: requests.Session: Session with valid cookies Raises: Exception: If authentication fails """ # Try loading cached cookies first (unless forcing refresh) if not force_refresh and not self.is_authenticated(): self._load_cached_cookies() # Authenticate if still not authenticated if force_refresh or not self.is_authenticated(): self.authenticate() # Create session with cookies session = requests.Session() for cookie in self._cookies: session.cookies.set( name=cookie['name'], value=cookie['value'], domain=cookie['domain'], path=cookie.get('path', '/') ) # Wrap session to handle 401 errors return _AutoRefreshSession(session, self)
[docs] def get_authentication_age(self) -> Optional[timedelta]: """ Get time since last authentication. Returns: timedelta: Time since authentication, or None if not authenticated """ if self._authenticated_at is None: return None return datetime.now() - self._authenticated_at
class _AutoRefreshSession: """ Wrapper around requests.Session that automatically re-authenticates on 401. This is an internal class - users should use WatersmartSessionManager.get_session(). """ def __init__(self, session: requests.Session, manager: WatersmartSessionManager): """ Initialize auto-refresh session wrapper. Args: session: Underlying requests.Session manager: WatersmartSessionManager for re-authentication """ self._session = session self._manager = manager def request(self, method: str, url: str, **kwargs) -> requests.Response: """ Make HTTP request with automatic re-authentication on 401. Args: method: HTTP method url: Request URL **kwargs: Additional arguments for requests Returns: requests.Response: Response object Raises: requests.exceptions.RequestException: On request failure (after retry) """ # Make request response = self._session.request(method, url, **kwargs) # Check if authentication expired if response.status_code == 401: logger.warning("Received 401 - re-authenticating...") # Re-authenticate self._manager.authenticate() # Get new session with fresh cookies new_session = requests.Session() for cookie in self._manager._cookies: new_session.cookies.set( name=cookie['name'], value=cookie['value'], domain=cookie['domain'], path=cookie.get('path', '/') ) # Update underlying session self._session = new_session # Retry request logger.debug(f"Retrying {method} {url}") response = self._session.request(method, url, **kwargs) if response.status_code == 401: logger.error("Still getting 401 after re-authentication") return response def get(self, url: str, **kwargs) -> requests.Response: """GET request with auto-refresh.""" return self.request('GET', url, **kwargs) def post(self, url: str, **kwargs) -> requests.Response: """POST request with auto-refresh.""" return self.request('POST', url, **kwargs) def put(self, url: str, **kwargs) -> requests.Response: """PUT request with auto-refresh.""" return self.request('PUT', url, **kwargs) def delete(self, url: str, **kwargs) -> requests.Response: """DELETE request with auto-refresh.""" return self.request('DELETE', url, **kwargs) def head(self, url: str, **kwargs) -> requests.Response: """HEAD request with auto-refresh.""" return self.request('HEAD', url, **kwargs) def options(self, url: str, **kwargs) -> requests.Response: """OPTIONS request with auto-refresh.""" return self.request('OPTIONS', url, **kwargs) def patch(self, url: str, **kwargs) -> requests.Response: """PATCH request with auto-refresh.""" return self.request('PATCH', url, **kwargs) # Expose underlying session attributes @property def cookies(self): """Access to session cookies.""" return self._session.cookies @property def headers(self): """Access to session headers.""" return self._session.headers # Example usage if __name__ == '__main__': import json from pathlib import Path # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) # Load credentials secrets_path = Path(__file__).parent.parent.parent / 'secrets.json' with open(secrets_path, 'r') as f: creds = json.load(f) # Create session manager manager = WatersmartSessionManager( username=creds['userid'], password=creds['password'], headless=True ) # Get session (authenticates automatically) session = manager.get_session() # Make API calls print("\nTesting API calls...") print("=" * 70) apis = [ 'https://paloalto.watersmart.com/index.php/rest/v1/Chart/RealTimeChart', 'https://paloalto.watersmart.com/index.php/rest/v1/Chart/annualChart?module=portal&commentary=full', ] for api_url in apis: print(f"\n{api_url}") response = session.get(api_url) print(f" Status: {response.status_code}") if response.status_code == 200: data = response.json() if 'data' in data: print(f" ✓ Success - data retrieved") else: print(f" ✗ Failed") # Check authentication age age = manager.get_authentication_age() if age: print(f"\nAuthenticated {age.total_seconds():.1f} seconds ago") print("\n" + "=" * 70) print("Session manager test complete!")