Task Framework
Practical recipes for common task operations.
Create a New Task
Prerequisites
Python function that can be serialised (JSON-compatible arguments)
Understanding of what work the task will do
Steps
Define the task function in
gyrinx/core/tasks.py:
from django.tasks import task
@task
def send_notification(user_id: str, message: str):
"""Send a notification to a user."""
from gyrinx.core.models import User
user = User.objects.get(pk=user_id)
# ... send notification logicRegister the task in
gyrinx/tasks/registry.py:
Enqueue the task from your application code:
Notes
All arguments must be JSON-serialisable (strings, numbers, lists, dicts)
Use string UUIDs rather than UUID objects
Import the task lazily in
_get_tasks()to avoid circular imports
Schedule a Task to Run Periodically
Prerequisites
An existing registered task
Understanding of cron syntax
Steps
Add a schedule to the task registration:
Optionally specify a timezone:
Deploy - The Cloud Scheduler job is created automatically on startup.
Common Schedules
Every 5 minutes
*/5 * * * *
Every hour
0 * * * *
Daily at midnight
0 0 * * *
Weekly on Monday
0 0 * * 1
Monthly on the 1st
0 0 1 * *
Add a Kill Switch to a Task
Kill switches let you disable tasks at runtime without redeploying.
Steps
Add a setting in
settings.pythat reads from an environment variable:
Check the setting at the start of your task:
Document the setting in
docs/deployment-environment-variables.md.Disable in production by setting the environment variable:
Configure Retry Behaviour
Adjust how Pub/Sub retries failed tasks.
Steps
Set retry parameters in the task registration:
Guidelines
Scenario
ack_deadline
min_retry_delay
max_retry_delay
Quick task (<10s)
60
10
300
Normal task (<1m)
300
10
600
Long-running task
600
60
1800
Database-intensive
300
30
600
Notes
ack_deadlineis the time before Pub/Sub assumes the task failedRetry delay uses exponential backoff between min and max
For database-intensive tasks, longer delays help avoid overwhelming the database
Make a Task Idempotent
Tasks may be delivered more than once. Design for idempotency.
Pattern 1: Check Before Acting
Pattern 2: Upsert Operations
Pattern 3: Idempotency Keys
Test a Task Locally
Prerequisites
Development environment running
Task registered in registry
Steps
In development, tasks run immediately (no Pub/Sub):
To test the actual task function:
To test with Pub/Sub locally, you would need to:
Set up a local Pub/Sub emulator
Configure the backend to use it
This is not typically necessary for development
Remove a Scheduled Task
Steps
Remove the task from
gyrinx/tasks/registry.pyDeploy - The orphan cleanup will automatically delete the Cloud Scheduler job
Notes
The provisioning system detects and removes orphaned scheduler jobs
Orphan detection uses the
{env}--gyrinx-scheduler--prefix to identify managed jobsJobs are only deleted if they match the current environment
Debug a Failed Task
Steps
Check Cloud Logging for the task execution:
Filter by
task_nameortask_idLook for
task_started,task_failed,task_completedevents
Check Pub/Sub dead letter queue (if configured) for messages that exceeded retry limits
Reproduce locally:
Common failure causes:
429: Database connection pool exhausted (checkCONN_MAX_AGE, connection limits)500: Unhandled exception (check task code)400: Message format issues (check enqueue arguments)
Last updated