animesh kumar

Running water never grows stale. Keep flowing!

Delayed Jobs with Jesque

with 4 comments

Jesque is an interesting project. It’s a Java port of Github’s Resque task queue library. Works on top of Redis. It is fast. It eats less. And doesn’t need a lot of maintenance, as long long your Redis is okay, you are okay! We have been happily using this in production for a while.

Problem

And then, we encountered a need for delayed jobs. We needed an ability to execute jobs in future with deterministic delay. We had 2 options:

1. Either, schedule these delayed jobs in Quartz – that was okay because we already had been using Quartz – and then, once Quartz jobs get fired, they publish a Task into Jesque, and let the workers handle the rest. (This was too contrived to implement, and would have become maintenance/reporting nightmare!)

2. We extend Jesque to support delayed jobs inherently.

Solution

We decided to go with option-2, and started exploring Redis datasets. Turned out, ZSET was all that we needed.

Jesque uses Redis LIST for job storage, workers keep polling the list, and LPOPs tasks from the LIST.

This is what we ended up doing:

When adding a delayed job,

1. Calculate future timestamp when the job should run,
2. Use that timestamp as SCORE to ZSET entry.

// Java
final long delay = 10; //sec
final long future = System.currentTimeMillis() + (delay * 1000); // future
jedis.zadd(QUEUE, future, jobInfo); 

// Redis
// ZADD <queue> <future> <job-information>

On the other hand, Workers’ poll logic was updated. For Delayed queues,

1. Check if there are any items with SCORE between -INF and now,

final long now = System.currentTimeMillis();
final Set<String> tasks = jedis.zrangeByScore(QUEUE, -1, now, 0, 1);

If tasks are non-empty, try to grab one to execute.

if (null != tasks && !tasks.isEmpty()) {
    String task = tasks.iterator().next();
    // try to acquire this task
    if (jedis.zrem(QUEUE, task) == 1) {
         return task; // Return
    }
}

This way, we ensure that no 2 workers would grab the same task to execute.

Also, an important point to note here is that – You don’t have to change your existing workers or redo new workers in any way. Just bind them to a Delayed Queue, and start publishing delayed tasks.

Example

// DelayedJobTest.java
package net.greghaines.jesque;

import static net.greghaines.jesque.utils.JesqueUtils.entry;
import static net.greghaines.jesque.utils.JesqueUtils.map;

import java.util.Arrays;

import net.greghaines.jesque.client.Client;
import net.greghaines.jesque.client.ClientPoolImpl;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerImpl;
import redis.clients.jedis.JedisPool;

public class DelayedJobTest {

    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws InterruptedException {
        // Queue name
        final String QUEUE = "fooDelayed";
        
        // Config
        final Config config = new ConfigBuilder().withHost("localhost").withPort(6379).withDatabase(0).build();
        
        // Client
        final Client client = new ClientPoolImpl(config, new JedisPool("localhost"));
        long delay = 10; // seconds
        long future = System.currentTimeMillis() + (delay * 1000); // Future timestamp
        
        // Enqueue job
        client.delayedEnqueue(QUEUE, 
                new Job(
                        TestJob.class.getSimpleName(), 
                        new Object[] {"HELLO", "WORLD" } // arguments
                ), 
                future);
        // End
        client.end();

        // Worker
        final Worker worker = new WorkerImpl(config, Arrays.asList(QUEUE), map(entry(TestJob.class.getSimpleName(), TestJob.class)));
        final Thread workerThread = new Thread(worker);
        workerThread.start(); // start
    }
}

And, this is the TestJob,

// TestJob.java
package net.greghaines.jesque;

import java.util.Date;

public class TestJob implements Runnable {
    final String arg1;
    final String arg2;

    public TestJob(String arg1, String arg2) {
        this.arg1 = arg1;
        this.arg2 = arg2;
    }

    @Override
    public void run() {
        System.out.println("Job ran at=" + new Date() + ", with arg1=" + arg1 + " and arg2=" + arg2);
    }
}

We have using this solution in production for quite sometime, and this has been pretty stable.

BTW, you can grab the source here: https://github.com/anismiles/jesque and give it a try yourself.

Happy Hacking!

Advertisements

Written by Animesh

September 2, 2013 at 5:06 pm

4 Responses

Subscribe to comments with RSS.

  1. Dear Sir,

    My company is been following your work and this website has helped us a lot.
    You are a really smart person and your employer is a very lucky company to have you.
    We will continue follow your work closely… Continue with the good work…

    Regards

    Joao Pinto

    September 10, 2013 at 2:34 am

  2. […] project, and runs on Redis which was already part of our stack. In the process, we also added delayed task functionality to […]

  3. Thanks Animesh, it looks awesome. I didn’t think of using Zset scores to store the ETA that’s smart 🙂

    Yohan Launay

    September 20, 2013 at 4:14 am

  4. Hello,

    Jesque is a good tool I want to use. But before, I need some more informations.Is it also possible to retrieve all open jobs in Jesque? This info could be useful for my monitioring and for dirty restarts. After dirty shutdown it could be useful to check all remaining Jobs with other distributed applications, if any other application has the same job already done or also in queue. Is this possible?

    driokaja

    January 8, 2015 at 1:18 pm


Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: