Secretary Job Worker - Detailed Documentation¶
Complete documentation of the Secretary Job Worker system for asynchronous job processing.
Overview¶
The Secretary Job Worker is a generic background worker system that processes jobs asynchronously. It polls MongoDB for pending jobs and routes them to appropriate handlers based on job_type.
Architecture¶
Components¶
- SecretaryWorkerManager: Main worker manager that polls and routes jobs
- Processor Registry: Maps
job_typestrings to handler functions - Handlers: Async functions that process specific job types
- SecretaryJobRepository: MongoDB repository for job management
Job Lifecycle¶
PENDING → PROCESSING → COMPLETED/FAILED
- PENDING: Job is created and waiting to be processed
- PROCESSING: Job is currently being processed by a handler
- COMPLETED: Job completed successfully with results
- FAILED: Job failed with error information
Configuration¶
Configure the worker in config.yaml:
generic_worker:
active: true
max_concurrent: 3
poll_interval_sec: 5
Parameters¶
- active: Enable/disable the worker
- max_concurrent: Maximum concurrent jobs
- poll_interval_sec: Polling interval in seconds
Job Creation¶
Via API¶
curl -X POST "http://localhost:5001/api/jobs/" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"job_type": "pdf",
"parameters": {
"filename": "/path/to/file.pdf",
"extraction_method": "combined"
}
}'
Job Structure¶
{
"job_id": "job-123",
"job_type": "pdf",
"status": "pending",
"parameters": {
"filename": "/path/to/file.pdf",
"extraction_method": "combined",
"template": "MeetingMinutes"
},
"created_at": "2024-01-01T00:00:00Z"
}
Handler Registry¶
Handlers are registered at module import time:
from src.core.processing.registry import register
from src.core.processing.handlers.pdf_handler import handle_pdf_job
register("pdf", handle_pdf_job)
Handler Signature¶
async def handle_job(
job: Job,
repo: SecretaryJobRepository,
resource_calculator: ResourceCalculator
) -> None:
# Process job
pass
Available Handlers¶
PDF Handler (pdf)¶
Processes PDF files with text extraction and OCR.
Parameters:
- filename or url: PDF file path or URL
- extraction_method: native, ocr, mistral_ocr, openai_vision, combined
- template: Optional template name
- context: Optional context for template
- use_cache: Whether to use cache
Session Handler (session)¶
Processes session information with associated media.
Parameters:
- event: Event name
- session: Session name
- url: Session URL
- filename: Output filename
- track: Track name
- Additional session parameters
Transformer Handler (transformer)¶
Processes text transformation with templates.
Parameters:
- text: Text to transform
- url: URL to fetch text from (alternative to text)
- template: Template name
- context: Context for template
- source_language: Source language
- target_language: Target language
Progress Tracking¶
Jobs support progress tracking:
repo.update_job_progress(
job_id="job-123",
progress=JobProgress(
step="extracting_text",
percent=50,
message="Extracting text from PDF..."
)
)
Progress Updates¶
Progress is updated during processing:
- 0%: Initialization
- 25%: Reading file
- 50%: Processing
- 75%: Finalizing
- 100%: Completed
Webhook Integration¶
Jobs can be configured with webhooks for completion notifications.
Webhook Configuration¶
Include webhook in job parameters:
{
"job_type": "pdf",
"parameters": {
"filename": "/path/to/file.pdf",
"webhook": {
"url": "https://example.com/webhook",
"token": "webhook_auth_token",
"jobId": "client_job_id"
}
}
}
Webhook Payload¶
When job completes, a POST request is sent:
{
"phase": "completed",
"message": "Processing completed",
"data": {
"extracted_text": "...",
"metadata": {...}
}
}
Progress Webhooks¶
Progress updates are sent during processing:
{
"phase": "processing",
"progress": 50,
"message": "Extracting text from PDF...",
"process": {
"id": "job-123"
}
}
Error Handling¶
Jobs track errors:
repo.update_job_error(
job_id="job-123",
error=JobError(
code="PROCESSING_ERROR",
message="Failed to extract text",
details={"error_type": "OCR_ERROR"}
)
)
Error Structure¶
{
"code": "PROCESSING_ERROR",
"message": "Error message",
"details": {
"error_type": "OCR_ERROR",
"traceback": "..."
}
}
Logging¶
Jobs maintain log entries:
repo.add_log_entry(
job_id="job-123",
level="info",
message="Processing started"
)
Log Levels¶
debug: Debug informationinfo: General informationwarning: Warningserror: Errorscritical: Critical errors
Job Status Query¶
Query job status via API:
curl -X GET "http://localhost:5001/api/jobs/job-123" \
-H "Authorization: Bearer YOUR_API_KEY"
Response¶
{
"status": "success",
"data": {
"job_id": "job-123",
"job_type": "pdf",
"status": "completed",
"progress": {
"step": "completed",
"percent": 100,
"message": "Processing completed"
},
"results": {
"extracted_text": "...",
"metadata": {...}
},
"logs": [
{
"timestamp": "2024-01-01T00:00:00Z",
"level": "info",
"message": "Job started"
}
]
}
}
Creating Custom Handlers¶
1. Create Handler Function¶
from src.core.models.job_models import Job, JobProgress
from src.core.resource_tracking import ResourceCalculator
async def handle_custom_job(
job: Job,
repo: Any,
resource_calculator: ResourceCalculator
) -> None:
# Update progress
repo.update_job_progress(
job.job_id,
JobProgress(step="processing", percent=0)
)
# Process job
params = job.parameters
result = process_custom(params)
# Store results
repo.update_job_results(
job.job_id,
{"result": result}
)
# Mark as completed
repo.update_job_status(
job.job_id,
JobStatus.COMPLETED
)
2. Register Handler¶
from src.core.processing.registry import register
register("custom", handle_custom_job)
3. Create Job¶
curl -X POST "http://localhost:5001/api/jobs/" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"job_type": "custom",
"parameters": {
"input": "data"
}
}'
Related Documentation¶
- Handlers - Detailed handler documentation
- Generic Worker - Worker overview
- Job API Endpoints - API documentation