🔄 Task Queue¶
Background job processing, task scheduling, and asynchronous operations
Overview¶
The Task Queue system handles all background job processing and task scheduling across the Appgain platform. Built on Redis with Bull/BullMQ, it provides reliable, scalable, and persistent job processing for time-consuming operations.
🏗️ Architecture¶
Core Responsibilities¶
- Background Job Processing: Execute time-consuming tasks asynchronously
- Task Scheduling: Schedule jobs to run at specific times or intervals
- Job Prioritization: Handle different job priorities and processing order
- Retry Logic: Automatic retry mechanisms for failed jobs
- Progress Tracking: Real-time job progress monitoring
- Job Cancellation: Ability to cancel running or queued jobs
Technology Stack¶
- Redis: Job storage and queue management
- Bull/BullMQ: Job queue library for Node.js
- Node.js: Runtime environment
- Prometheus: Metrics collection
- Grafana: Job monitoring dashboards
🔧 Configuration¶
Server Details¶
- Redis Server:
ovh-redis(Central Redis cluster) - Queue Workers: Distributed across multiple servers
- Monitoring: Prometheus + Grafana integration
Environment Variables¶
# Redis Configuration
REDIS_HOST=ovh-redis
REDIS_PORT=6379
REDIS_PASSWORD=ask your direct manager for the access
REDIS_DB=0
# Queue Configuration
QUEUE_PREFIX=appgain
QUEUE_CONCURRENCY=10
QUEUE_RETRY_ATTEMPTS=3
QUEUE_RETRY_DELAY=5000
# Monitoring
PROMETHEUS_ENABLED=true
METRICS_PORT=9090
LOG_LEVEL=info
📊 Job Types & Priorities¶
Job Priority Levels¶
| Priority | Level | Processing Time | Retry Policy | Use Cases |
|---|---|---|---|---|
| Critical | 1 | < 30 seconds | 5 retries | Payment processing, critical notifications |
| High | 2 | < 2 minutes | 3 retries | Email campaigns, data synchronization |
| Normal | 3 | < 5 minutes | 5 retries | Analytics processing, report generation |
| Low | 4 | < 30 minutes | 10 retries | Data cleanup, maintenance tasks |
| Batch | 5 | 30+ minutes | 2 retries | Bulk operations, data migration |
Job Categories¶
// Email Processing
const EMAIL_JOBS = {
SEND_EMAIL: 'send-email',
SEND_BULK_EMAIL: 'send-bulk-email',
EMAIL_TEMPLATE_RENDER: 'email-template-render',
EMAIL_BOUNCE_PROCESSING: 'email-bounce-processing'
};
// Analytics Processing
const ANALYTICS_JOBS = {
PROCESS_USER_EVENTS: 'process-user-events',
GENERATE_REPORTS: 'generate-reports',
DATA_AGGREGATION: 'data-aggregation',
CLEANUP_OLD_DATA: 'cleanup-old-data'
};
// Notification Processing
const NOTIFICATION_JOBS = {
SEND_PUSH_NOTIFICATION: 'send-push-notification',
SEND_SMS: 'send-sms',
SEND_WHATSAPP: 'send-whatsapp',
PROCESS_WEBHOOKS: 'process-webhooks'
};
// Data Synchronization
const SYNC_JOBS = {
SYNC_SHOPIFY_DATA: 'sync-shopify-data',
SYNC_ODOO_DATA: 'sync-odoo-data',
SYNC_USER_DATA: 'sync-user-data',
BACKUP_DATA: 'backup-data'
};
🔄 Job Processing Flow¶
Job Creation¶
// Create a job
const job = await emailQueue.add('send-email', {
to: 'user@example.com',
subject: 'Welcome to Appgain',
template: 'welcome-email',
data: { name: 'John Doe' }
}, {
priority: 2,
delay: 5000, // 5 seconds delay
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
}
});
Job Processing¶
// Process jobs
emailQueue.process('send-email', async (job) => {
const { to, subject, template, data } = job.data;
try {
// Send email logic
await sendEmail(to, subject, template, data);
// Update job progress
await job.progress(100);
return { success: true, messageId: 'msg_123' };
} catch (error) {
// Log error and retry
console.error('Email sending failed:', error);
throw error;
}
});
Job Monitoring¶
// Job event listeners
emailQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed:`, result);
});
emailQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed:`, err);
});
emailQueue.on('progress', (job, progress) => {
console.log(`Job ${job.id} progress: ${progress}%`);
});
🗄️ Queue Configuration¶
Queue Setup¶
// Queue configuration
const Queue = require('bull');
const emailQueue = new Queue('email-processing', {
redis: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
password: process.env.REDIS_PASSWORD
},
defaultJobOptions: {
removeOnComplete: 100,
removeOnFail: 50,
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
}
}
});
// Queue settings
emailQueue.setMaxListeners(0);
emailQueue.process(process.env.QUEUE_CONCURRENCY || 10);
Queue Management¶
// Queue operations
class QueueManager {
// Pause queue
async pauseQueue(queueName) {
const queue = new Queue(queueName);
await queue.pause();
}
// Resume queue
async resumeQueue(queueName) {
const queue = new Queue(queueName);
await queue.resume();
}
// Clean queue
async cleanQueue(queueName, grace = 1000 * 60 * 60 * 24) {
const queue = new Queue(queueName);
await queue.clean(grace, 'completed');
await queue.clean(grace, 'failed');
}
// Get queue stats
async getQueueStats(queueName) {
const queue = new Queue(queueName);
const [waiting, active, completed, failed] = await Promise.all([
queue.getWaiting(),
queue.getActive(),
queue.getCompleted(),
queue.getFailed()
]);
return {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length
};
}
}
📈 Performance & Scaling¶
Worker Scaling¶
// Horizontal scaling with multiple workers
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
// Fork workers
for (let i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('exit', (worker, code, signal) => {
console.log(`Worker ${worker.process.pid} died`);
cluster.fork(); // Replace dead worker
});
} else {
// Worker process
require('./worker');
}
Redis Cluster Configuration¶
// Redis cluster setup
const Redis = require('ioredis');
const redis = new Redis.Cluster([
{ host: 'redis-node-1', port: 6379 },
{ host: 'redis-node-2', port: 6379 },
{ host: 'redis-node-3', port: 6379 }
], {
redisOptions: {
password: process.env.REDIS_PASSWORD,
retryDelayOnFailover: 100,
maxRetriesPerRequest: 3
}
});
Performance Optimization¶
// Batch processing
const batchSize = 100;
const jobs = [];
for (let i = 0; i < 1000; i++) {
jobs.push({
name: 'process-data',
data: { id: i },
opts: { priority: 3 }
});
}
// Add jobs in batches
for (let i = 0; i < jobs.length; i += batchSize) {
const batch = jobs.slice(i, i + batchSize);
await emailQueue.addBulk(batch);
}
🔍 Monitoring & Observability¶
Prometheus Metrics¶
// Custom metrics
const prometheus = require('prom-client');
const jobDuration = new prometheus.Histogram({
name: 'job_duration_seconds',
help: 'Job processing duration in seconds',
labelNames: ['queue', 'job_type', 'status'],
buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60]
});
const jobCounter = new prometheus.Counter({
name: 'jobs_total',
help: 'Total number of jobs processed',
labelNames: ['queue', 'job_type', 'status']
});
// Record metrics
emailQueue.process('send-email', async (job) => {
const startTime = Date.now();
try {
await processEmail(job.data);
// Record success metrics
jobDuration.labels('email', 'send-email', 'success')
.observe((Date.now() - startTime) / 1000);
jobCounter.labels('email', 'send-email', 'success').inc();
} catch (error) {
// Record failure metrics
jobDuration.labels('email', 'send-email', 'failed')
.observe((Date.now() - startTime) / 1000);
jobCounter.labels('email', 'send-email', 'failed').inc();
throw error;
}
});
Health Checks¶
# Queue health check
GET /health/queues
# Response format
{
"status": "healthy",
"timestamp": "2024-01-01T00:00:00Z",
"queues": {
"email-processing": {
"status": "active",
"waiting": 10,
"active": 5,
"completed": 1000,
"failed": 5
},
"analytics-processing": {
"status": "active",
"waiting": 2,
"active": 3,
"completed": 500,
"failed": 1
}
}
}
Dashboard Metrics¶
// Queue dashboard data
const getQueueDashboard = async () => {
const queues = ['email-processing', 'analytics-processing', 'notification-processing'];
const stats = {};
for (const queueName of queues) {
const queue = new Queue(queueName);
const [waiting, active, completed, failed] = await Promise.all([
queue.getWaiting(),
queue.getActive(),
queue.getCompleted(),
queue.getFailed()
]);
stats[queueName] = {
waiting: waiting.length,
active: active.length,
completed: completed.length,
failed: failed.length,
throughput: await queue.getJobCounts()
};
}
return stats;
};
🚀 Deployment¶
Docker Configuration¶
# Dockerfile
FROM node:18-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --only=production
COPY . .
EXPOSE 3000
CMD ["npm", "start"]
Docker Compose¶
# docker-compose.yml
version: '3.8'
services:
task-queue-worker:
build: .
environment:
- NODE_ENV=production
- REDIS_HOST=redis
- QUEUE_CONCURRENCY=10
depends_on:
- redis
networks:
- appgain-net
deploy:
replicas: 3
redis:
image: redis:7-alpine
command: redis-server --requirepass ${REDIS_PASSWORD}
volumes:
- redis-data:/data
networks:
- appgain-net
volumes:
redis-data:
networks:
appgain-net:
driver: bridge
Kubernetes Deployment¶
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: task-queue-worker
spec:
replicas: 3
selector:
matchLabels:
app: task-queue-worker
template:
metadata:
labels:
app: task-queue-worker
spec:
containers:
- name: worker
image: appgain/task-queue-worker:latest
env:
- name: REDIS_HOST
value: "redis-cluster"
- name: QUEUE_CONCURRENCY
value: "10"
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
🔧 Development¶
Local Development Setup¶
# Install dependencies
npm install
# Start Redis (Docker)
docker run -d -p 6379:6379 redis:7-alpine
# Start worker
npm run worker
# Start monitoring
npm run monitor
Testing Jobs¶
// Test job creation
const testJob = await emailQueue.add('test-email', {
to: 'test@example.com',
subject: 'Test Email',
body: 'This is a test email'
}, {
priority: 1,
attempts: 1
});
// Test job processing
emailQueue.process('test-email', async (job) => {
console.log('Processing test job:', job.data);
return { success: true };
});
🔒 Security¶
Job Data Validation¶
// Job data validation
const Joi = require('joi');
const emailJobSchema = Joi.object({
to: Joi.string().email().required(),
subject: Joi.string().max(200).required(),
template: Joi.string().required(),
data: Joi.object().optional()
});
emailQueue.process('send-email', async (job) => {
// Validate job data
const { error, value } = emailJobSchema.validate(job.data);
if (error) {
throw new Error(`Invalid job data: ${error.message}`);
}
// Process validated data
await sendEmail(value.to, value.subject, value.template, value.data);
});
Access Control¶
// Queue access control
const authenticateJob = (req, res, next) => {
const token = req.headers.authorization?.split(' ')[1];
if (!token) {
return res.status(401).json({ error: 'No token provided' });
}
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
req.user = decoded;
next();
} catch (error) {
return res.status(401).json({ error: 'Invalid token' });
}
};
📞 Support & Resources¶
Documentation¶
Monitoring Tools¶
- Bull Board: Web-based queue monitoring
- Prometheus: Metrics collection
- Grafana: Dashboard visualization
- Redis Commander: Redis management interface
Development Resources¶
- GitHub Repository: Source code and issues
- Postman Collection: API testing
- Development Environment: Docker setup
- Testing Framework: Jest + Bull testing utilities
Last updated: January 2024
Ask Chehab GPT