Skip to content

🔄 Task Queue

Background job processing, task scheduling, and asynchronous operations

Overview

The Task Queue system handles all background job processing and task scheduling across the Appgain platform. Built on Redis with Bull/BullMQ, it provides reliable, scalable, and persistent job processing for time-consuming operations.

🏗️ Architecture

Core Responsibilities

  • Background Job Processing: Execute time-consuming tasks asynchronously
  • Task Scheduling: Schedule jobs to run at specific times or intervals
  • Job Prioritization: Handle different job priorities and processing order
  • Retry Logic: Automatic retry mechanisms for failed jobs
  • Progress Tracking: Real-time job progress monitoring
  • Job Cancellation: Ability to cancel running or queued jobs

Technology Stack

  • Redis: Job storage and queue management
  • Bull/BullMQ: Job queue library for Node.js
  • Node.js: Runtime environment
  • Prometheus: Metrics collection
  • Grafana: Job monitoring dashboards

🔧 Configuration

Server Details

  • Redis Server: ovh-redis (Central Redis cluster)
  • Queue Workers: Distributed across multiple servers
  • Monitoring: Prometheus + Grafana integration

Environment Variables

# Redis Configuration
REDIS_HOST=ovh-redis
REDIS_PORT=6379
REDIS_PASSWORD=ask your direct manager for the access
REDIS_DB=0

# Queue Configuration
QUEUE_PREFIX=appgain
QUEUE_CONCURRENCY=10
QUEUE_RETRY_ATTEMPTS=3
QUEUE_RETRY_DELAY=5000

# Monitoring
PROMETHEUS_ENABLED=true
METRICS_PORT=9090
LOG_LEVEL=info

📊 Job Types & Priorities

Job Priority Levels

Priority Level Processing Time Retry Policy Use Cases
Critical 1 < 30 seconds 5 retries Payment processing, critical notifications
High 2 < 2 minutes 3 retries Email campaigns, data synchronization
Normal 3 < 5 minutes 5 retries Analytics processing, report generation
Low 4 < 30 minutes 10 retries Data cleanup, maintenance tasks
Batch 5 30+ minutes 2 retries Bulk operations, data migration

Job Categories

// Email Processing
const EMAIL_JOBS = {
  SEND_EMAIL: 'send-email',
  SEND_BULK_EMAIL: 'send-bulk-email',
  EMAIL_TEMPLATE_RENDER: 'email-template-render',
  EMAIL_BOUNCE_PROCESSING: 'email-bounce-processing'
};

// Analytics Processing
const ANALYTICS_JOBS = {
  PROCESS_USER_EVENTS: 'process-user-events',
  GENERATE_REPORTS: 'generate-reports',
  DATA_AGGREGATION: 'data-aggregation',
  CLEANUP_OLD_DATA: 'cleanup-old-data'
};

// Notification Processing
const NOTIFICATION_JOBS = {
  SEND_PUSH_NOTIFICATION: 'send-push-notification',
  SEND_SMS: 'send-sms',
  SEND_WHATSAPP: 'send-whatsapp',
  PROCESS_WEBHOOKS: 'process-webhooks'
};

// Data Synchronization
const SYNC_JOBS = {
  SYNC_SHOPIFY_DATA: 'sync-shopify-data',
  SYNC_ODOO_DATA: 'sync-odoo-data',
  SYNC_USER_DATA: 'sync-user-data',
  BACKUP_DATA: 'backup-data'
};

🔄 Job Processing Flow

Job Creation

// Create a job
const job = await emailQueue.add('send-email', {
  to: 'user@example.com',
  subject: 'Welcome to Appgain',
  template: 'welcome-email',
  data: { name: 'John Doe' }
}, {
  priority: 2,
  delay: 5000, // 5 seconds delay
  attempts: 3,
  backoff: {
    type: 'exponential',
    delay: 2000
  }
});

Job Processing

// Process jobs
emailQueue.process('send-email', async (job) => {
  const { to, subject, template, data } = job.data;

  try {
    // Send email logic
    await sendEmail(to, subject, template, data);

    // Update job progress
    await job.progress(100);

    return { success: true, messageId: 'msg_123' };
  } catch (error) {
    // Log error and retry
    console.error('Email sending failed:', error);
    throw error;
  }
});

Job Monitoring

// Job event listeners
emailQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed:`, result);
});

emailQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed:`, err);
});

emailQueue.on('progress', (job, progress) => {
  console.log(`Job ${job.id} progress: ${progress}%`);
});

🗄️ Queue Configuration

Queue Setup

// Queue configuration
const Queue = require('bull');

const emailQueue = new Queue('email-processing', {
  redis: {
    host: process.env.REDIS_HOST,
    port: process.env.REDIS_PORT,
    password: process.env.REDIS_PASSWORD
  },
  defaultJobOptions: {
    removeOnComplete: 100,
    removeOnFail: 50,
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000
    }
  }
});

// Queue settings
emailQueue.setMaxListeners(0);
emailQueue.process(process.env.QUEUE_CONCURRENCY || 10);

Queue Management

// Queue operations
class QueueManager {
  // Pause queue
  async pauseQueue(queueName) {
    const queue = new Queue(queueName);
    await queue.pause();
  }

  // Resume queue
  async resumeQueue(queueName) {
    const queue = new Queue(queueName);
    await queue.resume();
  }

  // Clean queue
  async cleanQueue(queueName, grace = 1000 * 60 * 60 * 24) {
    const queue = new Queue(queueName);
    await queue.clean(grace, 'completed');
    await queue.clean(grace, 'failed');
  }

  // Get queue stats
  async getQueueStats(queueName) {
    const queue = new Queue(queueName);
    const [waiting, active, completed, failed] = await Promise.all([
      queue.getWaiting(),
      queue.getActive(),
      queue.getCompleted(),
      queue.getFailed()
    ]);

    return {
      waiting: waiting.length,
      active: active.length,
      completed: completed.length,
      failed: failed.length
    };
  }
}

📈 Performance & Scaling

Worker Scaling

// Horizontal scaling with multiple workers
const cluster = require('cluster');
const numCPUs = require('os').cpus().length;

if (cluster.isMaster) {
  // Fork workers
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died`);
    cluster.fork(); // Replace dead worker
  });
} else {
  // Worker process
  require('./worker');
}

Redis Cluster Configuration

// Redis cluster setup
const Redis = require('ioredis');

const redis = new Redis.Cluster([
  { host: 'redis-node-1', port: 6379 },
  { host: 'redis-node-2', port: 6379 },
  { host: 'redis-node-3', port: 6379 }
], {
  redisOptions: {
    password: process.env.REDIS_PASSWORD,
    retryDelayOnFailover: 100,
    maxRetriesPerRequest: 3
  }
});

Performance Optimization

// Batch processing
const batchSize = 100;
const jobs = [];

for (let i = 0; i < 1000; i++) {
  jobs.push({
    name: 'process-data',
    data: { id: i },
    opts: { priority: 3 }
  });
}

// Add jobs in batches
for (let i = 0; i < jobs.length; i += batchSize) {
  const batch = jobs.slice(i, i + batchSize);
  await emailQueue.addBulk(batch);
}

🔍 Monitoring & Observability

Prometheus Metrics

// Custom metrics
const prometheus = require('prom-client');

const jobDuration = new prometheus.Histogram({
  name: 'job_duration_seconds',
  help: 'Job processing duration in seconds',
  labelNames: ['queue', 'job_type', 'status'],
  buckets: [0.1, 0.5, 1, 2, 5, 10, 30, 60]
});

const jobCounter = new prometheus.Counter({
  name: 'jobs_total',
  help: 'Total number of jobs processed',
  labelNames: ['queue', 'job_type', 'status']
});

// Record metrics
emailQueue.process('send-email', async (job) => {
  const startTime = Date.now();

  try {
    await processEmail(job.data);

    // Record success metrics
    jobDuration.labels('email', 'send-email', 'success')
      .observe((Date.now() - startTime) / 1000);
    jobCounter.labels('email', 'send-email', 'success').inc();

  } catch (error) {
    // Record failure metrics
    jobDuration.labels('email', 'send-email', 'failed')
      .observe((Date.now() - startTime) / 1000);
    jobCounter.labels('email', 'send-email', 'failed').inc();

    throw error;
  }
});

Health Checks

# Queue health check
GET /health/queues

# Response format
{
  "status": "healthy",
  "timestamp": "2024-01-01T00:00:00Z",
  "queues": {
    "email-processing": {
      "status": "active",
      "waiting": 10,
      "active": 5,
      "completed": 1000,
      "failed": 5
    },
    "analytics-processing": {
      "status": "active",
      "waiting": 2,
      "active": 3,
      "completed": 500,
      "failed": 1
    }
  }
}

Dashboard Metrics

// Queue dashboard data
const getQueueDashboard = async () => {
  const queues = ['email-processing', 'analytics-processing', 'notification-processing'];
  const stats = {};

  for (const queueName of queues) {
    const queue = new Queue(queueName);
    const [waiting, active, completed, failed] = await Promise.all([
      queue.getWaiting(),
      queue.getActive(),
      queue.getCompleted(),
      queue.getFailed()
    ]);

    stats[queueName] = {
      waiting: waiting.length,
      active: active.length,
      completed: completed.length,
      failed: failed.length,
      throughput: await queue.getJobCounts()
    };
  }

  return stats;
};

🚀 Deployment

Docker Configuration

# Dockerfile
FROM node:18-alpine

WORKDIR /app

COPY package*.json ./
RUN npm ci --only=production

COPY . .

EXPOSE 3000

CMD ["npm", "start"]

Docker Compose

# docker-compose.yml
version: '3.8'
services:
  task-queue-worker:
    build: .
    environment:
      - NODE_ENV=production
      - REDIS_HOST=redis
      - QUEUE_CONCURRENCY=10
    depends_on:
      - redis
    networks:
      - appgain-net
    deploy:
      replicas: 3

  redis:
    image: redis:7-alpine
    command: redis-server --requirepass ${REDIS_PASSWORD}
    volumes:
      - redis-data:/data
    networks:
      - appgain-net

volumes:
  redis-data:

networks:
  appgain-net:
    driver: bridge

Kubernetes Deployment

# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: task-queue-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: task-queue-worker
  template:
    metadata:
      labels:
        app: task-queue-worker
    spec:
      containers:
      - name: worker
        image: appgain/task-queue-worker:latest
        env:
        - name: REDIS_HOST
          value: "redis-cluster"
        - name: QUEUE_CONCURRENCY
          value: "10"
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"

🔧 Development

Local Development Setup

# Install dependencies
npm install

# Start Redis (Docker)
docker run -d -p 6379:6379 redis:7-alpine

# Start worker
npm run worker

# Start monitoring
npm run monitor

Testing Jobs

// Test job creation
const testJob = await emailQueue.add('test-email', {
  to: 'test@example.com',
  subject: 'Test Email',
  body: 'This is a test email'
}, {
  priority: 1,
  attempts: 1
});

// Test job processing
emailQueue.process('test-email', async (job) => {
  console.log('Processing test job:', job.data);
  return { success: true };
});

🔒 Security

Job Data Validation

// Job data validation
const Joi = require('joi');

const emailJobSchema = Joi.object({
  to: Joi.string().email().required(),
  subject: Joi.string().max(200).required(),
  template: Joi.string().required(),
  data: Joi.object().optional()
});

emailQueue.process('send-email', async (job) => {
  // Validate job data
  const { error, value } = emailJobSchema.validate(job.data);
  if (error) {
    throw new Error(`Invalid job data: ${error.message}`);
  }

  // Process validated data
  await sendEmail(value.to, value.subject, value.template, value.data);
});

Access Control

// Queue access control
const authenticateJob = (req, res, next) => {
  const token = req.headers.authorization?.split(' ')[1];

  if (!token) {
    return res.status(401).json({ error: 'No token provided' });
  }

  try {
    const decoded = jwt.verify(token, process.env.JWT_SECRET);
    req.user = decoded;
    next();
  } catch (error) {
    return res.status(401).json({ error: 'Invalid token' });
  }
};

📞 Support & Resources

Documentation

Monitoring Tools

  • Bull Board: Web-based queue monitoring
  • Prometheus: Metrics collection
  • Grafana: Dashboard visualization
  • Redis Commander: Redis management interface

Development Resources

  • GitHub Repository: Source code and issues
  • Postman Collection: API testing
  • Development Environment: Docker setup
  • Testing Framework: Jest + Bull testing utilities

Last updated: January 2024

Ask Chehab GPT