#!/usr/bin/env python3
"""
Watersmart.com session manager with automatic cookie handling.
This module provides a session manager that:
1. Authenticates using Playwright (headless)
2. Extracts and manages session cookies
3. Provides requests.Session for API calls
4. Automatically re-authenticates on 401 errors
"""
import logging
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
import requests
from playwright.sync_api import sync_playwright
logger = logging.getLogger(__name__)
[docs]
class WatersmartSessionManager:
"""
Manages authenticated sessions for paloalto.watersmart.com.
Handles SAML/SSO authentication via Playwright and provides
a requests.Session with valid cookies for API access.
Example:
>>> manager = WatersmartSessionManager('username', 'password')
>>> session = manager.get_session()
>>> response = session.get('https://paloalto.watersmart.com/index.php/rest/v1/Chart/RealTimeChart')
>>> data = response.json()
"""
[docs]
def __init__(self, username: str, password: str, headless: bool = True, cache_dir: Optional[str] = None):
"""
Initialize session manager.
Args:
username: CPAU username
password: CPAU password
headless: Run Playwright in headless mode (default: True)
cache_dir: Directory for caching cookies (default: ~/.cpau)
"""
self.username = username
self.password = password
self.headless = headless
self.cache_dir = cache_dir
self._cookies: Optional[list] = None
self._authenticated_at: Optional[datetime] = None
logger.debug(f"Initialized WatersmartSessionManager for user {username}")
[docs]
def authenticate(self) -> None:
"""
Authenticate with watersmart.com using Playwright.
Performs SAML/SSO login flow and extracts session cookies.
Raises:
Exception: If authentication fails
"""
logger.info("Authenticating with watersmart.com...")
start_time = datetime.now()
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=self.headless)
context = browser.new_context()
page = context.new_page()
# Set longer timeout for authentication (60 seconds)
page.set_default_timeout(60000)
# Step 1: Login to CPAU portal
logger.debug("Navigating to CPAU portal...")
page.goto('https://mycpau.cityofpaloalto.org/Portal', timeout=60000)
logger.debug("Filling in credentials...")
page.fill('#txtLogin', self.username)
page.fill('#txtpwd', self.password)
logger.debug("Submitting login form...")
# Wait for navigation after submitting the form
with page.expect_navigation(timeout=60000):
page.press('#txtpwd', 'Enter')
logger.debug(f"Logged in, current URL: {page.url}")
page.wait_for_load_state('domcontentloaded', timeout=60000)
# Step 2: Navigate to watersmart (triggers SAML flow)
logger.debug("Navigating to watersmart (SAML flow)...")
try:
# Navigate with a more lenient wait condition
page.goto('https://paloalto.watersmart.com/index.php/trackUsage', wait_until='commit', timeout=60000)
except Exception as e:
# If navigation times out, check if we're still on a valid page
logger.warning(f"Navigation completed with warning: {e}")
# Wait a bit for any redirects and page rendering
import time
time.sleep(3)
logger.debug(f"Final URL: {page.url}")
# Verify authentication succeeded
if 'login' in page.url.lower() or 'signin' in page.url.lower():
raise Exception("Authentication failed - redirected to login page")
# Extract cookies
self._cookies = context.cookies()
self._authenticated_at = datetime.now()
logger.debug(f"Extracted {len(self._cookies)} cookies from {page.url}")
browser.close()
except Exception as e:
logger.error(f"Authentication failed: {e}")
raise
elapsed = (datetime.now() - start_time).total_seconds()
logger.info(f"Authentication successful in {elapsed:.1f}s")
logger.debug(f"Extracted {len(self._cookies)} cookies")
# Save cookies to cache if cache directory is configured
self._save_cookies_to_cache()
def _get_cache_path(self) -> Optional[str]:
"""
Get the path to the cookie cache file.
Returns:
Path to cache file, or None if caching is disabled
"""
if self.cache_dir is None:
return None
from pathlib import Path
import os
# Expand user home directory
cache_dir = Path(self.cache_dir).expanduser()
# Create directory if it doesn't exist
cache_dir.mkdir(parents=True, exist_ok=True)
# Set directory permissions to 0700 (user-only)
os.chmod(cache_dir, 0o700)
cache_file = cache_dir / 'watersmart_cookies.json'
return str(cache_file)
def _load_cached_cookies(self) -> bool:
"""
Try to load cookies from cache.
Returns:
True if cookies were loaded successfully, False otherwise
"""
cache_path = self._get_cache_path()
if cache_path is None:
logger.debug("Cookie caching disabled")
return False
from pathlib import Path
import json
import os
cache_file = Path(cache_path)
if not cache_file.exists():
logger.debug(f"No cookie cache found at {cache_path}")
return False
try:
# Check file permissions
stat_info = os.stat(cache_path)
if stat_info.st_mode & 0o077:
logger.warning(f"Cache file {cache_path} has insecure permissions, ignoring")
return False
# Load cache file
with open(cache_path, 'r') as f:
cache_data = json.load(f)
# Validate cache data
if cache_data.get('username') != self.username:
logger.debug(f"Cache is for different user, ignoring")
return False
# Check cache age (max 10 minutes based on Phase 2 testing)
auth_time_str = cache_data.get('authenticated_at')
if auth_time_str:
auth_time = datetime.fromisoformat(auth_time_str)
age = datetime.now() - auth_time
if age > timedelta(minutes=10):
logger.debug(f"Cache is {age.total_seconds():.0f}s old (max 600s), ignoring")
return False
logger.debug(f"Cache is {age.total_seconds():.0f}s old, within valid window")
# Load cookies
self._cookies = cache_data.get('cookies', [])
self._authenticated_at = datetime.fromisoformat(auth_time_str) if auth_time_str else None
logger.info(f"Loaded {len(self._cookies)} cookies from cache (age: {age.total_seconds():.1f}s)")
return True
except Exception as e:
logger.warning(f"Failed to load cookie cache: {e}")
return False
def _save_cookies_to_cache(self) -> None:
"""
Save current cookies to cache file.
"""
cache_path = self._get_cache_path()
if cache_path is None or self._cookies is None:
return
import json
import os
try:
cache_data = {
'username': self.username,
'authenticated_at': self._authenticated_at.isoformat() if self._authenticated_at else None,
'cookies': self._cookies
}
# Write cache file
with open(cache_path, 'w') as f:
json.dump(cache_data, f, indent=2)
# Set file permissions to 0600 (user-only read/write)
os.chmod(cache_path, 0o600)
logger.debug(f"Saved {len(self._cookies)} cookies to cache: {cache_path}")
except Exception as e:
logger.warning(f"Failed to save cookie cache: {e}")
[docs]
def is_authenticated(self) -> bool:
"""
Check if we have valid authentication cookies.
Returns:
bool: True if authenticated, False otherwise
"""
return self._cookies is not None
[docs]
def get_session(self, force_refresh: bool = False) -> requests.Session:
"""
Get a requests.Session with authenticated cookies.
Args:
force_refresh: Force re-authentication even if already authenticated
Returns:
requests.Session: Session with valid cookies
Raises:
Exception: If authentication fails
"""
# Try loading cached cookies first (unless forcing refresh)
if not force_refresh and not self.is_authenticated():
self._load_cached_cookies()
# Authenticate if still not authenticated
if force_refresh or not self.is_authenticated():
self.authenticate()
# Create session with cookies
session = requests.Session()
for cookie in self._cookies:
session.cookies.set(
name=cookie['name'],
value=cookie['value'],
domain=cookie['domain'],
path=cookie.get('path', '/')
)
# Wrap session to handle 401 errors
return _AutoRefreshSession(session, self)
[docs]
def get_authentication_age(self) -> Optional[timedelta]:
"""
Get time since last authentication.
Returns:
timedelta: Time since authentication, or None if not authenticated
"""
if self._authenticated_at is None:
return None
return datetime.now() - self._authenticated_at
class _AutoRefreshSession:
"""
Wrapper around requests.Session that automatically re-authenticates on 401.
This is an internal class - users should use WatersmartSessionManager.get_session().
"""
def __init__(self, session: requests.Session, manager: WatersmartSessionManager):
"""
Initialize auto-refresh session wrapper.
Args:
session: Underlying requests.Session
manager: WatersmartSessionManager for re-authentication
"""
self._session = session
self._manager = manager
def request(self, method: str, url: str, **kwargs) -> requests.Response:
"""
Make HTTP request with automatic re-authentication on 401.
Args:
method: HTTP method
url: Request URL
**kwargs: Additional arguments for requests
Returns:
requests.Response: Response object
Raises:
requests.exceptions.RequestException: On request failure (after retry)
"""
# Make request
response = self._session.request(method, url, **kwargs)
# Check if authentication expired
if response.status_code == 401:
logger.warning("Received 401 - re-authenticating...")
# Re-authenticate
self._manager.authenticate()
# Get new session with fresh cookies
new_session = requests.Session()
for cookie in self._manager._cookies:
new_session.cookies.set(
name=cookie['name'],
value=cookie['value'],
domain=cookie['domain'],
path=cookie.get('path', '/')
)
# Update underlying session
self._session = new_session
# Retry request
logger.debug(f"Retrying {method} {url}")
response = self._session.request(method, url, **kwargs)
if response.status_code == 401:
logger.error("Still getting 401 after re-authentication")
return response
def get(self, url: str, **kwargs) -> requests.Response:
"""GET request with auto-refresh."""
return self.request('GET', url, **kwargs)
def post(self, url: str, **kwargs) -> requests.Response:
"""POST request with auto-refresh."""
return self.request('POST', url, **kwargs)
def put(self, url: str, **kwargs) -> requests.Response:
"""PUT request with auto-refresh."""
return self.request('PUT', url, **kwargs)
def delete(self, url: str, **kwargs) -> requests.Response:
"""DELETE request with auto-refresh."""
return self.request('DELETE', url, **kwargs)
def head(self, url: str, **kwargs) -> requests.Response:
"""HEAD request with auto-refresh."""
return self.request('HEAD', url, **kwargs)
def options(self, url: str, **kwargs) -> requests.Response:
"""OPTIONS request with auto-refresh."""
return self.request('OPTIONS', url, **kwargs)
def patch(self, url: str, **kwargs) -> requests.Response:
"""PATCH request with auto-refresh."""
return self.request('PATCH', url, **kwargs)
# Expose underlying session attributes
@property
def cookies(self):
"""Access to session cookies."""
return self._session.cookies
@property
def headers(self):
"""Access to session headers."""
return self._session.headers
# Example usage
if __name__ == '__main__':
import json
from pathlib import Path
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# Load credentials
secrets_path = Path(__file__).parent.parent.parent / 'secrets.json'
with open(secrets_path, 'r') as f:
creds = json.load(f)
# Create session manager
manager = WatersmartSessionManager(
username=creds['userid'],
password=creds['password'],
headless=True
)
# Get session (authenticates automatically)
session = manager.get_session()
# Make API calls
print("\nTesting API calls...")
print("=" * 70)
apis = [
'https://paloalto.watersmart.com/index.php/rest/v1/Chart/RealTimeChart',
'https://paloalto.watersmart.com/index.php/rest/v1/Chart/annualChart?module=portal&commentary=full',
]
for api_url in apis:
print(f"\n{api_url}")
response = session.get(api_url)
print(f" Status: {response.status_code}")
if response.status_code == 200:
data = response.json()
if 'data' in data:
print(f" ✓ Success - data retrieved")
else:
print(f" ✗ Failed")
# Check authentication age
age = manager.get_authentication_age()
if age:
print(f"\nAuthenticated {age.total_seconds():.1f} seconds ago")
print("\n" + "=" * 70)
print("Session manager test complete!")