animesh kumar

Running water never grows stale. Keep flowing!

Posts Tagged ‘Jesque

Jesque-Guice binding

with 2 comments

In our Java stack, we use Guice IOC library quite heavily. We are kind of smitten with it. Now, when we needed a task queue to process our background jobs, we settled with Jesque which is a port of Github’s Resque project, and runs on Redis which was already part of our stack. In the process, we also added delayed task functionality to this.

Problem

Anyways, the problem we were facing was with Injecting Guice dependencies into Jesque workers. Our workers performed heavy operations and at times had to read-write from DB, or speak with other services.

Jesque uses Java reflection APIs to instantiate and run its workers (which are Runnable classes) and so, it becomes very difficult to inject Guice managed objects and services into Jesque workers.

We started the hacky-and-ugly way by creating static references to relevant services that our workers needed. if it were for few services, this would have worked, but our workers kept growing in features and reach, and after a while the whole code base was stinking. We had to do something about it. Something elegant!

Solution

So, we created Jesque Guice binding project. You can annotate your worker classes, and Guice will then discover, register and start them. Let me show you some code.

First, let’s create an ExampleWorker. You have to remember that the Worker must implement Runnable interface, and it must be annotated with @Worker which Guice uses for discovery and binding.

// ExampleWorker

@Worker(job = "ExampleJob",                 // job name
        queues = { "EXP_QUEUE" },           // queue names
        enabled = true,                     // enabled
        count = 1,                          // 1 instance of this worker running
        events = { WorkerEvent.JOB_SUCCESS, WorkerEvent.WORKER_START }, // Events to listen to
        listener = EchoListener.class      // WorkerEventListener
)
public class ExampleWorker implements Runnable {
    // LOG
    private static final Logger LOG = LoggerFactory.getLogger(ExampleWorker.class);

    // Note: Only field level injection would work!
    @Inject
    ExampleService service;

    String arg1;
    String arg2;

    // Must keep an empty constructor for Guice to discover this
    public ExampleWorker() {
    }

    public ExampleWorker(String arg1, String arg2) {
        this.arg1 = arg1;
        this.arg2 = arg2;
    }

    @Override
    public void run() {
        LOG.info("Running worker={}, with arg1={}, arg2={}", new Object[] { getClass(), arg1, arg2 });
        // calling service
        service.serve();
    }

}

@Worker attributes let you control the behavior. The above worker, ExampleWorker, listens to EXP_QUEUE queue, accepts Jobs by name ExampleJob, has an WorkerEventListener defined by Guice Managed class EchoListener which listens to WorkerEvent JOB_SUCCESS and WORKER_START. There is only ONE instance of this worker running.

Note: ExampleWorker has been field @Injected with ExampleService. Please remember, that
Only field @Inject will work with Workers, because Jesque uses constructors to pass Job arguments.
– Also, you must keep an empty constructor around so as to let Guice discover this worker.

Now, since we have added a Listener, we must define it.

// EchoListener

public class EchoListener implements WorkerListener {
    // LOG
    private static final Logger LOG = LoggerFactory.getLogger(EchoListener.class);

    @Override
    public void onEvent(WorkerEvent event, Worker worker, String queue, Job job, Object runner,
            Object result, Exception ex) {
        LOG.info("onEvent ==>> queue={}, event={}", new Object[] { queue, event });
    }

}

For the sake of demonstration, this has been kept very minimal. But, mind you, you can @Inject any Guice managed objects into this, using constructor and/or field injection.

Now, let’s define the ExampleService that we want to @Inject into our worker.

// ExampleService

public class ExampleService {
    /** The Constant LOG. */
    private static final Logger LOG = LoggerFactory.getLogger(ExampleService.class);

    public void serve() {
        LOG.info("Heya! I am not here to serve, just to show you how injection works.");
    }
}

Wonderful! Let’s now bind these all together in a GuiceModule.

// ExampleModule

public class ExampleModule extends AbstractModule {

    @Override
    protected void configure() {
        // Jesque Guice
        install(new JesqueModule());

        // Jesque Client
        Config config = new ConfigBuilder().withHost("localhost").withPort(6379).withDatabase(0).build();
        bind(Config.class).toInstance(config);
        bind(Client.class).toInstance(new ClientImpl(config));

        // Worker
        bind(ExampleWorker.class).asEagerSingleton(); // Must be singleton
        // WorkerEventListener
        bind(EchoListener.class).in(Scopes.SINGLETON);
        // Worker Executor (This is where they actually run)
        bind(WorkerExecutor.class).to(SimpleThreadBasedWorkerExecutor.class);
        // Service (will be injected into workers)
        bind(ExampleService.class).asEagerSingleton();
    }

}

Here, first we install JesqueModule, and then bind other objects.
– Bind Jesque config and client.
– Bind Worker, Lister, Service etc.

You will notice we have also bound WorkerExecutor. This interface accepts net.greghaines.jesque.worker.Worker instance and runs that on a thread. Jesque-Guice comes with 2 simple implementations:

1. SimpleThreadBasedWorkerExecutor which run each net.greghaines.jesque.worker.Worker on an unmanaged separate thread, and
2. CachedThreadPoolBasedWorkerExecutor which creates a CachedThreadPool where net.greghaines.jesque.worker.Worker is run.

You can implement your own strategy or provide your own ExecutorService.

Run

Now we have everything, let’s run it then.

// Main

public class Main {
    /** The Constant LOG. */
    private static final Logger LOG = LoggerFactory.getLogger(Main.class);

    /**
     * @param args
     */
    public static void main(String[] args) {
        Injector injector = Guice.createInjector(Stage.DEVELOPMENT, new Module[] { new ExampleModule() });

        // Get Jesque client
        Client client = (Client) injector.getInstance(Client.class);
        LOG.info("Publish jobs");
        // Push jobs
        client.enqueue("EXP_QUEUE", new Job("ExampleJob", "hello", "job1"));
        client.enqueue("EXP_QUEUE", new Job("ExampleJob", "hello", "job2"));
    }
}

You should see,

// Job - 1
DEBUG c.s.commons.jesque.GuiceAwareWorker - Injecting dependencies into worker instance = com.strumsoft.commons.jesque.example.ExampleWorker@6b754699
DEBUG c.s.commons.jesque.GuiceAwareWorker - Delegating to run worker instance = com.strumsoft.commons.jesque.example.ExampleWorker@6b754699
INFO  c.s.c.jesque.example.ExampleWorker - Running worker=class com.strumsoft.commons.jesque.example.ExampleWorker, with arg1=hello, arg2=job1
INFO  c.s.c.jesque.example.ExampleService - Heya! I am not here to serve, just to show you how injection works.
INFO  c.s.c.jesque.example.EchoListener - onEvent ==>> queue=EXP_QUEUE, event=JOB_SUCCESS

// Job -2
DEBUG c.s.commons.jesque.GuiceAwareWorker - Injecting dependencies into worker instance = com.strumsoft.commons.jesque.example.ExampleWorker@6602e323
DEBUG c.s.commons.jesque.GuiceAwareWorker - Delegating to run worker instance = com.strumsoft.commons.jesque.example.ExampleWorker@6602e323
INFO  c.s.c.jesque.example.ExampleWorker - Running worker=class com.strumsoft.commons.jesque.example.ExampleWorker, with arg1=hello, arg2=job2
INFO  c.s.c.jesque.example.ExampleService - Heya! I am not here to serve, just to show you how injection works.
INFO  c.s.c.jesque.example.EchoListener - onEvent ==>> queue=EXP_QUEUE, event=JOB_SUCCESS

Works… eh? 🙂

You can grab the project source at github https://github.com/anismiles/jesque-guice Give it a try. I hope this will help some of you folks. Share your thoughts with me.

Happy hacking!

Written by Animesh

September 18, 2013 at 4:44 pm

Delayed Jobs with Jesque

with 4 comments

Jesque is an interesting project. It’s a Java port of Github’s Resque task queue library. Works on top of Redis. It is fast. It eats less. And doesn’t need a lot of maintenance, as long long your Redis is okay, you are okay! We have been happily using this in production for a while.

Problem

And then, we encountered a need for delayed jobs. We needed an ability to execute jobs in future with deterministic delay. We had 2 options:

1. Either, schedule these delayed jobs in Quartz – that was okay because we already had been using Quartz – and then, once Quartz jobs get fired, they publish a Task into Jesque, and let the workers handle the rest. (This was too contrived to implement, and would have become maintenance/reporting nightmare!)

2. We extend Jesque to support delayed jobs inherently.

Solution

We decided to go with option-2, and started exploring Redis datasets. Turned out, ZSET was all that we needed.

Jesque uses Redis LIST for job storage, workers keep polling the list, and LPOPs tasks from the LIST.

This is what we ended up doing:

When adding a delayed job,

1. Calculate future timestamp when the job should run,
2. Use that timestamp as SCORE to ZSET entry.

// Java
final long delay = 10; //sec
final long future = System.currentTimeMillis() + (delay * 1000); // future
jedis.zadd(QUEUE, future, jobInfo); 

// Redis
// ZADD <queue> <future> <job-information>

On the other hand, Workers’ poll logic was updated. For Delayed queues,

1. Check if there are any items with SCORE between -INF and now,

final long now = System.currentTimeMillis();
final Set<String> tasks = jedis.zrangeByScore(QUEUE, -1, now, 0, 1);

If tasks are non-empty, try to grab one to execute.

if (null != tasks && !tasks.isEmpty()) {
    String task = tasks.iterator().next();
    // try to acquire this task
    if (jedis.zrem(QUEUE, task) == 1) {
         return task; // Return
    }
}

This way, we ensure that no 2 workers would grab the same task to execute.

Also, an important point to note here is that – You don’t have to change your existing workers or redo new workers in any way. Just bind them to a Delayed Queue, and start publishing delayed tasks.

Example

// DelayedJobTest.java
package net.greghaines.jesque;

import static net.greghaines.jesque.utils.JesqueUtils.entry;
import static net.greghaines.jesque.utils.JesqueUtils.map;

import java.util.Arrays;

import net.greghaines.jesque.client.Client;
import net.greghaines.jesque.client.ClientPoolImpl;
import net.greghaines.jesque.worker.Worker;
import net.greghaines.jesque.worker.WorkerImpl;
import redis.clients.jedis.JedisPool;

public class DelayedJobTest {

    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws InterruptedException {
        // Queue name
        final String QUEUE = "fooDelayed";
        
        // Config
        final Config config = new ConfigBuilder().withHost("localhost").withPort(6379).withDatabase(0).build();
        
        // Client
        final Client client = new ClientPoolImpl(config, new JedisPool("localhost"));
        long delay = 10; // seconds
        long future = System.currentTimeMillis() + (delay * 1000); // Future timestamp
        
        // Enqueue job
        client.delayedEnqueue(QUEUE, 
                new Job(
                        TestJob.class.getSimpleName(), 
                        new Object[] {"HELLO", "WORLD" } // arguments
                ), 
                future);
        // End
        client.end();

        // Worker
        final Worker worker = new WorkerImpl(config, Arrays.asList(QUEUE), map(entry(TestJob.class.getSimpleName(), TestJob.class)));
        final Thread workerThread = new Thread(worker);
        workerThread.start(); // start
    }
}

And, this is the TestJob,

// TestJob.java
package net.greghaines.jesque;

import java.util.Date;

public class TestJob implements Runnable {
    final String arg1;
    final String arg2;

    public TestJob(String arg1, String arg2) {
        this.arg1 = arg1;
        this.arg2 = arg2;
    }

    @Override
    public void run() {
        System.out.println("Job ran at=" + new Date() + ", with arg1=" + arg1 + " and arg2=" + arg2);
    }
}

We have using this solution in production for quite sometime, and this has been pretty stable.

BTW, you can grab the source here: https://github.com/anismiles/jesque and give it a try yourself.

Happy Hacking!

Written by Animesh

September 2, 2013 at 5:06 pm