animesh kumar

Running water never grows stale. Keep flowing!

Posts Tagged ‘Delayed Jobs

Delayed Jobs with Jesque

with 4 comments

Jesque is an interesting project. It’s a Java port of Github’s Resque task queue library. Works on top of Redis. It is fast. It eats less. And doesn’t need a lot of maintenance, as long long your Redis is okay, you are okay! We have been happily using this in production for a while.

Problem

And then, we encountered a need for delayed jobs. We needed an ability to execute jobs in future with deterministic delay. We had 2 options:

1. Either, schedule these delayed jobs in Quartz – that was okay because we already had been using Quartz – and then, once Quartz jobs get fired, they publish a Task into Jesque, and let the workers handle the rest. (This was too contrived to implement, and would have become maintenance/reporting nightmare!)

2. We extend Jesque to support delayed jobs inherently.

Solution

We decided to go with option-2, and started exploring Redis datasets. Turned out, ZSET was all that we needed.

Jesque uses Redis LIST for job storage, workers keep polling the list, and LPOPs tasks from the LIST.

This is what we ended up doing:

When adding a delayed job,

1. Calculate future timestamp when the job should run,
2. Use that timestamp as SCORE to ZSET entry.

// Java
final long delay = 10; //sec
final long future = System.currentTimeMillis() + (delay * 1000); // future
jedis.zadd(QUEUE, future, jobInfo); 

// Redis
// ZADD <queue> <future> <job-information>

On the other hand, Workers’ poll logic was updated. For Delayed queues,

1. Check if there are any items with SCORE between -INF and now,

final long now = System.currentTimeMillis();
final Set<String> tasks = jedis.zrangeByScore(QUEUE, -1, now, 0, 1);

If tasks are non-empty, try to grab one to execute.

if (null != tasks && !tasks.isEmpty()) {
    String task = tasks.iterator().next();
    // try to acquire this task
    if (jedis.zrem(QUEUE, task) == 1) {
         return task; // Return
    }
}

This way, we ensure that no 2 workers would grab the same task to execute.

Also, an important point to note here is that – You don’t have to change your existing workers or redo new workers in any way. Just bind them to a Delayed Queue, and start publishing delayed tasks.

Example

// DelayedJobTest.java
package net.greghaines.jesque;

import static net.greghaines.jesque.utils.JesqueUtils.entry;
import static net.greghaines.jesque.utils.JesqueUtils.map;

import java.util.Arrays;

import net.greghaines.jesque.client.Client;
import net.greghaines.jesque.client.ClientPoolImpl;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerImpl;
import redis.clients.jedis.JedisPool;

public class DelayedJobTest {

    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws InterruptedException {
        // Queue name
        final String QUEUE = "fooDelayed";
        
        // Config
        final Config config = new ConfigBuilder().withHost("localhost").withPort(6379).withDatabase(0).build();
        
        // Client
        final Client client = new ClientPoolImpl(config, new JedisPool("localhost"));
        long delay = 10; // seconds
        long future = System.currentTimeMillis() + (delay * 1000); // Future timestamp
        
        // Enqueue job
        client.delayedEnqueue(QUEUE, 
                new Job(
                        TestJob.class.getSimpleName(), 
                        new Object[] {"HELLO", "WORLD" } // arguments
                ), 
                future);
        // End
        client.end();

        // Worker
        final Worker worker = new WorkerImpl(config, Arrays.asList(QUEUE), map(entry(TestJob.class.getSimpleName(), TestJob.class)));
        final Thread workerThread = new Thread(worker);
        workerThread.start(); // start
    }
}

And, this is the TestJob,

// TestJob.java
package net.greghaines.jesque;

import java.util.Date;

public class TestJob implements Runnable {
    final String arg1;
    final String arg2;

    public TestJob(String arg1, String arg2) {
        this.arg1 = arg1;
        this.arg2 = arg2;
    }

    @Override
    public void run() {
        System.out.println("Job ran at=" + new Date() + ", with arg1=" + arg1 + " and arg2=" + arg2);
    }
}

We have using this solution in production for quite sometime, and this has been pretty stable.

BTW, you can grab the source here: https://github.com/anismiles/jesque and give it a try yourself.

Happy Hacking!

Advertisements

Written by Animesh

September 2, 2013 at 5:06 pm