Jobs in Orionjs enable you to execute background tasks, schedule recurring operations, and handle asynchronous processing. Powered by the @orion-js/dogs package, jobs provide a robust way to manage both recurring and event-driven background operations.

Installation

pnpm add @orion-js/dogs

Defining Jobs Controllers

The recommended way to define jobs in Orionjs is to use the @Jobs() decorator on a class and the job-specific decorators on methods:

import {Jobs, RecurrentJob, EventJob, createEventJob, createRecurrentJob} from '@orion-js/dogs'
import {logger} from '@orion-js/logger'
import {Inject} from '@orion-js/services'
import {ExampleService} from '../services/ExampleService'

@Jobs()
export default class ExampleJobs {
  @Inject(() => ExampleService)
  private exampleService: ExampleService

  // Define a recurrent job using the method decorator
  @RecurrentJob()
  createExamples = createRecurrentJob({
    runEvery: '1m', // Run every minute
    resolve: async () => {
      logger.info('Creating example...')
      await this.exampleService.makeExample()
    }
  })

  // Define an event job using the method decorator
  @EventJob()
  sendWelcomeEmail = createEventJob({
    params: {
      email: { type: String },
      name: { type: String }
    },
    resolve: async (params) => {
      const {email, name} = params
      logger.info('Sending welcome email...', {email})
      await this.exampleService.sendWelcomeEmail(email, name)
    }
  })

  // Regular method to schedule an event job
  async registerUser(userData) {
    // Create user account
    const user = await this.exampleService.createUser(userData)

    // Schedule welcome email job
    await this.sendWelcomeEmail.schedule({
      params: {
        email: user.email,
        name: user.name
      }
    })

    return user
  }
}

With this approach:

  • The class is decorated with @Jobs() to mark it as a job controller
  • Each job is defined as a class property using createRecurrentJob or createEventJob with its specific decorator (@RecurrentJob or @EventJob)
  • Job parameters are validated based on the schema provided in the params option
  • You can use dependency injection with @Inject(() => Service)
  • The class can also include regular methods that aren’t jobs

Types of Jobs

Orionjs supports two types of jobs:

1. Recurrent Jobs

Jobs that run at specified intervals or on a schedule:

@Jobs()
export default class MaintenanceJobs {
  @Inject(() => DatabaseService)
  private databaseService: DatabaseService

  @RecurrentJob()
  dailyCleanup = createRecurrentJob({
    // Run every 24 hours
    runEvery: '24h',
    // Job priority (higher is more important, default: 100)
    priority: 100,
    // Optional: specify error handling
    onError: async function(error, params, context) {
      logger.error('Daily cleanup failed', {error: error.message})
      if (context.tries < 3) {
        return {action: 'retry', runIn: 60000} // Retry in 1 minute
      }
      return {action: 'dismiss'}
    },
    resolve: async () => {
      // Your job logic here
      await this.databaseService.cleanupOldRecords()
      return {status: 'success'}
    }
  })

  // Alternative: use getNextRun instead of runEvery
  @RecurrentJob()
  nightlyBackup = createRecurrentJob({
    getNextRun: () => {
      const nextRun = new Date()
      nextRun.setHours(3, 0, 0, 0) // Run at 3:00 AM
      if (nextRun < new Date()) {
        nextRun.setDate(nextRun.getDate() + 1) // Tomorrow if already past 3 AM
      }
      return nextRun
    },
    resolve: async () => {
      // Backup logic
    }
  })
}

2. Event Jobs

Jobs that run on-demand when triggered by an event:

@Jobs()
export default class NotificationJobs {
  @Inject(() => EmailService)
  private emailService: EmailService

  @EventJob()
  sendEmail = createEventJob({
    params: {
      to: { type: String },
      subject: { type: String },
      body: { type: String }
    },
    // Optional: configure error handling
    onError: async function(error, params, context) {
      if (context.tries < 3) {
        return {
          action: 'retry',
          runIn: Math.pow(2, context.tries) * 1000 // Exponential backoff
        }
      }
      return {action: 'dismiss'}
    },
    // Optional: configure stale job handling
    onStale: async function(params, context) {
      logger.warn('Email send job became stale', {params})
    },
    resolve: async (params) => {
      const {to, subject, body} = params
      
      // Use injected services
      await this.emailService.send(to, subject, body)
      
      return {sent: true}
    }
  })
}

Job Creation Options

RecurrentJob Options

OptionTypeDescription
runEverynumber | stringMilliseconds between runs or a string like ‘1d’, ‘4h’, etc. Ignored if getNextRun is provided.
getNextRunFunctionFunction returning a Date for the next execution. Takes precedence over runEvery.
prioritynumberJob priority. Higher numbers get executed first (default: 100).
onErrorFunctionCalled if the job fails. Can be used to retry or dismiss the job.
onStaleFunctionCalled if the job locktime is expired.
saveExecutionsFornumberTime in milliseconds to keep job execution history (default: 1 week). Set to 0 to disable.
resolveFunctionThe function that contains the job’s logic.

EventJob Options

OptionTypeDescription
paramsObject | SchemaSchema for validating job parameters.
onErrorFunctionCalled if the job fails. Can be used to retry or dismiss the job.
onStaleFunctionCalled if the job locktime is expired.
saveExecutionsFornumberTime in milliseconds to keep job execution history (default: 1 week). Set to 0 to disable.
resolveFunctionThe function that contains the job’s logic and receives validated params.

Starting Workers

In your application setup (typically in your main file), you start workers to process jobs:

import {startWorkers} from '@orion-js/dogs'

// Workers will automatically discover jobs defined using the @Jobs() decorator
const workersInstance = startWorkers({
  workersCount: 2, // Number of concurrent workers (default: 1)
  pollInterval: 3000, // Milliseconds between polling for new jobs (default: 3000)
  cooldownPeriod: 100, // Milliseconds to wait after job execution (default: 100)
  lockTime: 30000 // Lock time in milliseconds (default: 30000 = 30 seconds)
})

// To stop workers (e.g., when shutting down the application)
process.on('SIGTERM', async () => {
  await workersInstance.stop()
  process.exit(0)
})

Scheduling Event Jobs

Trigger an event job to run:

// Assuming you have this job defined
@EventJob()
sendEmail = createEventJob({
  params: {
    to: { type: String },
    subject: { type: String },
    body: { type: String }
  },
  resolve: async (params) => {
    // Job logic
  }
})

// Run immediately
await this.sendEmail.schedule({
  params: {
    to: 'user@example.com',
    subject: 'Welcome!',
    body: 'Welcome to our platform.'
  }
})

// Run after 5 minutes
await this.sendEmail.schedule({
  params: {
    to: 'user@example.com',
    subject: 'Reminder',
    body: 'Don\'t forget to complete your profile.'
  },
  runIn: 5 * 60 * 1000 // 5 minutes in milliseconds
})

// Run at a specific time
await this.sendEmail.schedule({
  params: {
    to: 'user@example.com',
    subject: 'Scheduled Announcement',
    body: 'New features are now available!'
  },
  runAt: new Date('2023-12-31T00:00:00')
})

// Ensure uniqueness
await this.sendEmail.schedule({
  params: {
    to: 'user@example.com',
    subject: 'Daily Update',
    body: 'Daily update content'
  },
  uniqueIdentifier: 'daily-update-user123', // Won't schedule duplicates with this identifier
  replace: true // Replace existing job with this identifier if it exists
})

Job Execution Context

The job’s resolve function receives additional parameters:

@RecurrentJob()
exampleJob = createRecurrentJob({
  runEvery: '1h',
  resolve: async (_, context) => {
    // context contains information about the job execution
    console.log('Job ID:', context.jobId)
    console.log('Execution attempt:', context.tries)
    console.log('Scheduled at:', context.createdAt)
    
    // Your job logic here
  }
})

Error Handling

Jobs support sophisticated error handling:

@EventJob()
riskyJob = createEventJob({
  params: {
    userId: { type: String }
  },
  onError: async (error, params, context) => {
    logger.error('Job failed', {
      error: error.message,
      userId: params.userId,
      attempt: context.tries
    })
    
    if (context.tries < 3) {
      // Retry with exponential backoff
      return {
        action: 'retry',
        runIn: Math.pow(2, context.tries) * 1000
      }
    }
    
    // After 3 failed attempts, send an alert and dismiss the job
    await sendAlertToAdmin(`Job failed for user ${params.userId}: ${error.message}`)
    return { action: 'dismiss' }
  },
  resolve: async (params) => {
    // Job implementation
  }
})

Monitoring Jobs

You can monitor job status and history:

import {getJobStatus, getJobHistory} from '@orion-js/dogs'

// Get status of all jobs
const status = await getJobStatus()
console.log('Active jobs:', status.active)
console.log('Scheduled jobs:', status.scheduled)

// Get history of a specific job
const history = await getJobHistory('sendEmail')
console.log('Recent executions:', history.executions)
console.log('Success rate:', history.stats.successRate)

Best Practices

  1. Keep Jobs Idempotent: Design jobs so they can be safely re-executed without side effects.

  2. Use Parameters Validation: Define schemas for job parameters to ensure they’re valid.

  3. Handle Errors Properly: Implement robust error handling with appropriate retry strategies.

  4. Set Realistic Lock Times: Ensure the lockTime is longer than a job’s expected execution time.

  5. Monitor Job Performance: Keep track of job durations to optimize processing.

  6. Organize by Domain: Group related jobs in domain-specific controller classes.

  7. Test Jobs Thoroughly: Write unit tests for job logic to ensure reliability.

  8. Check Job Status: Implement health checks to monitor job processing.