Delayed Jobs with Jesque
Jesque is an interesting project. It’s a Java port of Github’s Resque task queue library. Works on top of Redis. It is fast. It eats less. And doesn’t need a lot of maintenance, as long long your Redis is okay, you are okay! We have been happily using this in production for a while.
Problem
And then, we encountered a need for delayed jobs. We needed an ability to execute jobs in future with deterministic delay. We had 2 options:
1. Either, schedule these delayed jobs in Quartz – that was okay because we already had been using Quartz – and then, once Quartz jobs get fired, they publish a Task into Jesque, and let the workers handle the rest. (This was too contrived to implement, and would have become maintenance/reporting nightmare!)
2. We extend Jesque to support delayed jobs inherently.
Solution
We decided to go with option-2, and started exploring Redis datasets. Turned out, ZSET was all that we needed.
Jesque uses Redis LIST for job storage, workers keep polling the list, and LPOPs tasks from the LIST.
This is what we ended up doing:
When adding a delayed job,
1. Calculate future timestamp when the job should run,
2. Use that timestamp as SCORE to ZSET entry.
// Java final long delay = 10; //sec final long future = System.currentTimeMillis() + (delay * 1000); // future jedis.zadd(QUEUE, future, jobInfo); // Redis // ZADD <queue> <future> <job-information>
On the other hand, Workers’ poll logic was updated. For Delayed queues,
1. Check if there are any items with SCORE between -INF and now,
final long now = System.currentTimeMillis(); final Set<String> tasks = jedis.zrangeByScore(QUEUE, -1, now, 0, 1);
If tasks are non-empty, try to grab one to execute.
if (null != tasks && !tasks.isEmpty()) { String task = tasks.iterator().next(); // try to acquire this task if (jedis.zrem(QUEUE, task) == 1) { return task; // Return } }
This way, we ensure that no 2 workers would grab the same task to execute.
Also, an important point to note here is that – You don’t have to change your existing workers or redo new workers in any way. Just bind them to a Delayed Queue, and start publishing delayed tasks.
Example
// DelayedJobTest.java package net.greghaines.jesque; import static net.greghaines.jesque.utils.JesqueUtils.entry; import static net.greghaines.jesque.utils.JesqueUtils.map; import java.util.Arrays; import net.greghaines.jesque.client.Client; import net.greghaines.jesque.client.ClientPoolImpl; import net.greghaines.jesque.worker.Worker; import net.greghaines.jesque.worker.WorkerImpl; import redis.clients.jedis.JedisPool; public class DelayedJobTest { @SuppressWarnings("unchecked") public static void main(String[] args) throws InterruptedException { // Queue name final String QUEUE = "fooDelayed"; // Config final Config config = new ConfigBuilder().withHost("localhost").withPort(6379).withDatabase(0).build(); // Client final Client client = new ClientPoolImpl(config, new JedisPool("localhost")); long delay = 10; // seconds long future = System.currentTimeMillis() + (delay * 1000); // Future timestamp // Enqueue job client.delayedEnqueue(QUEUE, new Job( TestJob.class.getSimpleName(), new Object[] {"HELLO", "WORLD" } // arguments ), future); // End client.end(); // Worker final Worker worker = new WorkerImpl(config, Arrays.asList(QUEUE), map(entry(TestJob.class.getSimpleName(), TestJob.class))); final Thread workerThread = new Thread(worker); workerThread.start(); // start } }
And, this is the TestJob,
// TestJob.java package net.greghaines.jesque; import java.util.Date; public class TestJob implements Runnable { final String arg1; final String arg2; public TestJob(String arg1, String arg2) { this.arg1 = arg1; this.arg2 = arg2; } @Override public void run() { System.out.println("Job ran at=" + new Date() + ", with arg1=" + arg1 + " and arg2=" + arg2); } }
We have using this solution in production for quite sometime, and this has been pretty stable.
BTW, you can grab the source here: https://github.com/anismiles/jesque and give it a try yourself.
Happy Hacking!
Dear Sir,
My company is been following your work and this website has helped us a lot.
You are a really smart person and your employer is a very lucky company to have you.
We will continue follow your work closely… Continue with the good work…
Regards
Joao Pinto
September 10, 2013 at 2:34 am
[…] project, and runs on Redis which was already part of our stack. In the process, we also added delayed task functionality to […]
Jesque-Guice binding | animesh kumar
September 18, 2013 at 4:44 pm
Thanks Animesh, it looks awesome. I didn’t think of using Zset scores to store the ETA that’s smart 🙂
Yohan Launay
September 20, 2013 at 4:14 am
Hello,
Jesque is a good tool I want to use. But before, I need some more informations.Is it also possible to retrieve all open jobs in Jesque? This info could be useful for my monitioring and for dirty restarts. After dirty shutdown it could be useful to check all remaining Jobs with other distributed applications, if any other application has the same job already done or also in queue. Is this possible?
driokaja
January 8, 2015 at 1:18 pm