APScheduler Integration for Automated Alert Execution
Overview​
This implementation adds automated alert execution capabilities to the existing alert system using APScheduler. The system automatically schedules and executes alerts based on their frequency configurations stored in the database.
Architecture​
Components​
-
AlertScheduler Class (
services/alerts/scheduler.py)- Singleton pattern for managing the scheduler instance
- Handles job creation, scheduling, and execution
- Provides API for job management
-
Application Integration (
main.py)- Lifespan management for startup/shutdown
- Automatic scheduler initialization
- Graceful shutdown handling
-
API Endpoints (
api/api_v1/alerts.py)- Scheduler status monitoring
- Manual scheduler control
- Job reloading capabilities
-
Service Integration (
services/alerts.py)- Automatic scheduler updates on alert CRUD operations
- Job addition/removal on alert changes
Features​
✅ Implemented Features​
- BackgroundScheduler: Non-blocking scheduler running in background threads
- CronTrigger Support: Converts frequency configurations to cron expressions
- Job Persistence: Uses MemoryJobStore (can be upgraded to SQLAlchemy store)
- Event Monitoring: Job execution and error event listeners
- Automatic Integration: Scheduler updates when alerts are created/updated/deleted
- API Management: REST endpoints for scheduler control
- Graceful Shutdown: Proper cleanup on application termination
🔧 Configuration Options​
- Job Coalescing: Prevents duplicate executions
- Max Instances: Limits concurrent job executions
- Misfire Grace Time: Handles missed executions
- Thread Pool: Configurable worker threads
Usage​
Starting the Application​
The scheduler is automatically initialized when the FastAPI application starts:
# In main.py - lifespan manager handles startup/shutdown
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Initialize and start scheduler
# Shutdown: Gracefully stop scheduler
API Endpoints​
Get Scheduler Status​
GET /api/v1/alerts/scheduler/status
Response:
{
"status": "running",
"job_count": 5,
"jobs": [
{
"id": "alert_123e4567-e89b-12d3-a456-426614174000",
"name": "Alert: High CPLPV Alert",
"next_run_time": "2025-10-14T09:00:00+00:00",
"trigger": "cron[0 9 * * *]"
}
]
}
Reload Scheduler Jobs​
POST /api/v1/alerts/scheduler/reload
Start/Stop Scheduler​
POST /api/v1/alerts/scheduler/start
POST /api/v1/alerts/scheduler/stop
Frequency Configuration​
The system supports various frequency types:
Daily Alerts​
{
"type": "daily",
"time": "09:00",
"timezone": "America/Los_Angeles"
}
→ Cron: 0 9 * * *
Weekly Alerts​
{
"type": "weekly",
"time": "10:00",
"dayOfWeek": [1, 3, 5],
"timezone": "America/Los_Angeles"
}
→ Cron: 0 10 * * 1 (uses first day)
Monthly Alerts​
{
"type": "monthly",
"time": "08:00",
"dayOfMonth": [1, 15],
"timezone": "America/Los_Angeles"
}
→ Cron: 0 8 1 * * (uses first day)
Hourly Alerts​
{
"type": "hourly",
"time": "00:30",
"timezone": "America/Los_Angeles"
}
→ Cron: 30 * * * *
Implementation Details​
Database Schema​
The existing alerts table is used with the frequency JSONB field:
CREATE TABLE app.alerts (
id UUID PRIMARY KEY,
client_id UUID NOT NULL,
name VARCHAR NOT NULL,
frequency JSONB DEFAULT '{"type": "daily", "time": "00:00", "timezone": "America/Los_Angeles"}',
is_active BOOLEAN DEFAULT true,
-- ... other fields
);
Job Execution​
When a scheduled alert executes:
- Job Triggered: APScheduler calls
_execute_alert_job() - Database Session: New async session created for the job
- Alert Execution: Calls existing
trigger_alert_service() - System User: Uses system UUID for automated triggers
- Error Handling: Logs errors without failing the job
Error Handling​
- Scheduler Errors: Logged but don't fail application startup
- Job Errors: Logged with event listeners
- Database Errors: Handled gracefully with rollback
- Alert Creation: Scheduler errors don't fail alert creation
Security​
Permission Checks​
- Scheduler Management: Admin-only access via role permissions
- Alert Execution: Uses system user for automated triggers
- API Access: Protected by authentication middleware
System User​
Automated alert execution uses a system UUID:
system_user_id = "00000000-0000-0000-0000-000000000000"
Monitoring​
Logging​
The system provides comprehensive logging:
INFO - Alert scheduler initialized successfully
INFO - Scheduled alert 'High CPLPV Alert' (ID: 123...) with cron: 0 9 * * *
INFO - Job alert_123... executed successfully
ERROR - Job alert_456... failed: Database connection error
Event Listeners​
- Job Executed: Logs successful executions
- Job Error: Logs failures with exception details
Dependencies​
Required Packages​
APScheduler==3.11.0
sqlalchemy==1.4.52
asyncpg==0.30.0
Optional Upgrades​
For production environments, consider:
- SQLAlchemyJobStore: Persistent job storage
- RedisJobStore: Distributed job storage
- MongoDBJobStore: Document-based storage
Deployment​
Production Considerations​
- Job Persistence: Upgrade to SQLAlchemyJobStore for job survival across restarts
- Monitoring: Add health checks and metrics collection
- Scaling: Consider distributed job stores for multiple instances
- Timezone: Ensure proper timezone handling for global deployments
Environment Variables​
DATABASE_URL=postgresql+asyncpg://user:pass@host:port/db
Testing​
Manual Testing​
- Create Alert: Create an alert with frequency configuration
- Check Scheduler: Verify job appears in scheduler status
- Wait for Execution: Monitor logs for scheduled execution
- Verify Results: Check alert instances and tasks created
API Testing​
# Check scheduler status
curl -H "Authorization: Bearer <token>" \
http://localhost:8000/api/v1/alerts/scheduler/status
# Reload jobs
curl -X POST -H "Authorization: Bearer <token>" \
http://localhost:8000/api/v1/alerts/scheduler/reload
Troubleshooting​
Common Issues​
- Scheduler Not Starting: Check database connection and permissions
- Jobs Not Executing: Verify frequency configuration and cron parsing
- Permission Errors: Ensure system user has proper alert permissions
- Database Errors: Check alert data and client configurations
Debug Commands​
# Check scheduler status
from services import alert_scheduler
status = alert_scheduler.get_job_status()
print(status)
# Manual job execution
await alert_scheduler._execute_alert_job(alert_id, client_id)
Future Enhancements​
Planned Features​
- Job Persistence: SQLAlchemyJobStore for production
- Distributed Scheduling: Redis/MongoDB job stores
- Advanced Scheduling: More complex frequency patterns
- Metrics Collection: Performance monitoring
- Web UI: Scheduler management interface
Configuration Options​
- Retry Logic: Failed job retry mechanisms
- Dead Letter Queue: Failed job handling
- Job Dependencies: Chained alert execution
- Dynamic Scheduling: Runtime frequency changes
Conclusion​
The APScheduler integration provides a robust foundation for automated alert execution. The system is designed to be:
- Reliable: Graceful error handling and recovery
- Scalable: Configurable for different deployment sizes
- Maintainable: Clean separation of concerns
- Monitorable: Comprehensive logging and status reporting
The implementation follows best practices for background job processing and integrates seamlessly with the existing alert system architecture.