animesh kumar

Running water never grows stale. Keep flowing!

Posts Tagged ‘HPC

Fiddling with Cassandra 0.7-beta2

with 11 comments

[tweetmeme source=”anismiles” only_single=false http://www.URL.com%5D

I have been dilly-dallying with Cassandra 0.7 for quite some time. My intensions were to build Cassandra 0.7 support into Kundera (a JPA 1.0 compliant ORM library to work with Cassandra). I must admit that often times I was very upset about the lack of documentation on Cassandra and libraries that I had planned to use, Pelops and Hector. So I decided that I should post my findings for your help.

Now since Cassandra 0.7 beta-2 has been released, I will concentrate my talk around this release.

Installing Cassandra 0.7

  • Download 0.7.0-beta2 (released on 2010-10-01) from here: http://cassandra.apache.org/download/
  • Extract the jar to some location say, D:\apache-cassandra-0.7.0-beta2
  • Set CASSANDRA_HOME environment variable to D:\apache-cassandra-0.7.0-beta2
  • You can also update you PATH variable to include $CASSANDRA_HOME/bin
  • Now, to start the server you would need to run this command:
    > cassandra -start

That’s it.

Okay, since you’ve gotten the basics right. I would like to tell you few important things about this new Cassandra release.

  1. Unlike .6.x versions, 0.7.x employs YAML instead of XML, that is, you are going to find cassandra.yaml instead of storage-conf.xml.
  2. 0.7 allows you to manage entire cluster, Keyspaces, Column Families everything from Thrift API.
  3. There is also support for Apache Avro. (I haven’t explored this though, so no more comment)
  4. 0.7 comes with secondary index features. What does it mean? It means, you can look for your data not just by Row Identifier, but also by Column Values. Interesting huh?

If you look into cassandra.yaml, you will find a default Keyspace1 and few Column Families too, but Cassandra doesn’t load them. I am not sure why. Theoretically, everything defined in the yaml file should have been created at the start. I am going to dig around this. Anyways for now, let’s create some Keyspaces and few Column Families ourselves. We can use Thrift API (and Cassandra client which uses Thrift itself) or JMX interface.

Dealing with Cassandra Client

Cassandra comes with a command line interface tool cassandra-cli. This tool is really really impressive. You should certainly spend some time with it.

  • Start the client,
    > cassandra-cli
  • Connect to server,
    > [default@unknown] connect localhost/9160
  • Create a new keyspace, (I picked this up from cassandra.yaml)
    > [default@unknown] create keyspace Keyspace1 with replication_factor=1
  • Create Column Families,
    > [default@unknown] use Keyspace1
    > [default@Keyspace1] create column family Standard1 with column_type = ‘Standard’ and comparator = ‘BytesType’
  • Describe keyspace,
    > [default@Keyspace1] describe keyspace Keyspace1

And so on. Use ‘help’ to learn more about cassandra-cli.

JConsole

As I mentioned above, you can also use JMX to check what Keyspaces and Column Families exist in your server. But there is a little problem. Cassandra does not come with the mx4j-tools.jar, so you need to download and copy this jar to Cassandra’s lib folder. Download it from here:  http://www.java2s.com/Code/Jar/MNOPQR/Downloadmx4jtoolsjar.htm

Now, just run ‘jconsole’ and pick ‘org.apache.cassandra.thrift.CassandraDaemon’ process.

Java clientèle

Well, there are two serious contenders, Pelops and Hector. Both have released experimental support for Version 0.7. I had worked with Pelops earlier, so I thought this is time to give Hector a chance.

  • Download Hector (Sync release with Cassandra 0.7.0-beta2) from here: http://github.com/rantav/hector/downloads
    You can also use ‘git clone‘ to download the latest source.
  • Hector is a maven project. To compile the source into ‘jar’, just extract the release and run,
    > mvn package

My first program

To start with Hector, I thought to write a very small code to insert a Column and then later fetch it back. If you remember, in the previous section, we already created a keyspace ‘Keyspace1‘ and a Column Family ‘Standard1‘, and not we are going to make use of them.

import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.exceptions.HectorException;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.mutation.Mutator;
import me.prettyprint.hector.api.query.ColumnQuery;
import me.prettyprint.hector.api.query.QueryResult;

public class HectorFirstExample {

	public static void main(String[] args) throws Exception {

		String keyspaceName = "Keyspace1";
		String columnFamilyName = "Standard1";
		String serverAddress = "localhost:9160";

		// Create Cassandra cluster
		Cluster cluster = HFactory.getOrCreateCluster("Cluster-Name", serverAddress);
		// Create Keyspace
		Keyspace keyspace = HFactory.createKeyspace(keyspaceName, cluster);

		try {
			// Mutation
			Mutator mutator = HFactory.createMutator(keyspace, StringSerializer.get());
			// Insert a new column with row-id 'id-1'
			mutator.insert("id-1", columnFamilyName, HFactory.createStringColumn("Animesh", "Kumar"));

			// Look up the same column
			ColumnQuery columnQuery = HFactory.createStringColumnQuery(keyspace);
			columnQuery.setColumnFamily(columnFamilyName).setKey("id-1").setName("Animesh");
			QueryResult> result = columnQuery.execute();

			System.out.println("Read HColumn from cassandra: " + result.get());
		} catch (HectorException e) {
			e.printStackTrace();
		}
	}
}

That was simple. By the way, ‘Nate McCall‘ has written a set of example classes to help us understand Hector with Cassandra 0.7. Check it out here: http://github.com/zznate/hector-examples

I am working towards introducing Cassandra 0.7 support in Kundera, and will be publishing my findings intermittently.

Advertisements

Written by Animesh

October 14, 2010 at 9:26 pm

Posted in Technology

Tagged with , , , , ,

Kundera: now JPA 1.0 Compatible

with 82 comments

[tweetmeme source=”anismiles” only_single=false http://www.URL.com%5D

If you are new to Kundera, you should read Kundera: knight in the shining armor! to get a brief idea about it.

Kundera has reached a major milestone lately, so I thought to sum up the developments here. First and foremost, Kundera is now JPA 1.0 compatible, thought it doesn’t support relationships yet, it does support easy JPA style @Entity declarations and Linear JPA Queries. 🙂 Didn’t you always want to search over Cassandra?

To begin with let’s see what the changes are.

  1. Kundera do not have @CassandraEntity annotation anymore. It now expects JPA @Entity.
  2. Kundera specific @Id has been replaced with JPA @Id.
  3. Kundera specific @Column has been replaced with JPA @Column.
  4. @ColumnFamily, @SuperColumnFamily and @SuperColumn are still there, and are expected to be there for a long time to come, because JPA doesn’t have any of these ideas.
  5. @Index is introduced to control indexing of an entity bean. You can safely ignore it and let Kundera do the defaults for you.

I would recommend you to read about Entity annotation rules discussed in the earlier post. Apart from the points mentioned above, everything remains the same:  https://anismiles.wordpress.com/2010/06/30/kundera-knight-in-the-shining-armor/#general-rules

How to define an entity class?

@Entity						// makes it an entity class
@ColumnFamily("Authors")	// assign ColumnFamily type and name
public class Author {

	@Id	// row identifier
	String username;

	@Column(name = "email")	// override column-name
	String emailAddress;

	@Column
	String country;

	@Column(name = "registeredSince")
	Date registered;

	String name;

	public Author() { // must have a default constructor
	}

	// getters, setters etc.
}

There is an important deviation from JPA specification here.

  1. Unlike JPA you must explicitly annotate fields/properties you want to persist. Any field/property that is not @Column annotated will be ignored by Kundera.
  2. In short, the paradigm is reversed here. JPA assumes everything persist-able unless explicitly defined @Transient. Kundera expects everything transient unless explicitly defined @Column.

How to instantiate EntityManager?

Kundera expects some properties to be provided with before you can bootstrap it.

# kundera.properties
# Cassandra nodes to with Kundera will connect
kundera.nodes=localhost

#Cassandra port
kundera.port=9160

#Cassandra keyspace which Kundera will use
kundera.keyspace=Blog

#Whether or not EntityManager can have sessions, that is L1 cache.
sessionless=false

#Cassandra client implementation. It must implement com.impetus.kundera.CassandraClient
kundera.client=com.impetus.kundera.client.PelopsClient

You can define these properties in a java Map object, or in JPA persistence.xml or in a property file “kundera.properties” kept in the classpath.

  1. Instantiating with persistence.xml > Just replace the provider with com.impetus.kundera.ejb.KunderaPersistence which extends JPA PersistenceProvider. And either provide Kundera specific properties in the xml file or keep “kundera.properties” in the classpath.
  2. Instantiating in standard J2SE environment, with explicit Map object.
    Map map = new HashMap();
    map.put("kundera.nodes", "localhost");
    map.put("kundera.port", "9160");
    map.put("kundera.keyspace", "Blog");
    map.put("sessionless", "false");
    map.put("kundera.client", "com.impetus.kundera.client.PelopsClient");
    
    EntityManagerFactory factory = new EntityManagerFactoryImpl("test", map);
    EntityManager manager = factory.createEntityManager();
    
  3. Instantiating in standard J2SE environment, with “Kundera.properties” file. Pass null to EntityManagerFactoryImpl and it will automatically look for the property file.
    EntityManagerFactory factory = new EntityManagerFactoryImpl("test", null);
    EntityManager manager = factory.createEntityManager();
    

Entity Operations

Once you have EntityManager object you are good to go, applying all your JPA skills. For example, if you want to find an Entity object by key,

	try {
		Author author = manager.find(Author.class, "smile.animesh");
	} catch (PersistenceException pe) {
		pe.printStackTrace();
	}

Similarly, there are other JPA methods for various operations: merge, remove etc.

JPA Query

Note: Kundera uses Lucene to index your Entities. Beneath Lucene, Kundera uses Lucandra to store the indexes in Cassandra itself. One fun implication of using Lucene is that apart from regular JPA queries, you can also run Lucene queries. 😉

Here are some indexing fundamentals:

  1. By default, all entities are indexed along with with all @Column properties.
  2. If you do not want to index an entity, annotate it like, @Index (index=false)
  3. If you do not want to index a @column property of an entity, annotate it like, @Index (index=false)

That’s it. Here is an example of JPA query:

	// write a JPA Query
	String jpaQuery = "SELECT a from Author a";

	// create Query object
	Query query = manager.createQuery(jpaQuery);

	// get results
	List<Author> list = query.getResultList();
	for (Author a : list) {
		System.out.println(a.getUsername());
	}

Kundera also supports multiple “where” clauses with “AND”, “OR”, “=” and “like” operations.

	// find all Autors with email like anismiles
	String jpaQuery_for_emails_like = "SELECT a from Author a WHERE a.emailAddress like anismiles";

	// find all Authors with email like anismiles or username like anim
	String jpaQuery_for_email_or_name = "SELECT a from Author a WHERE a.emailAddress like anismiles OR a.username like anim";

I think this will enable you to play around with Kundera. I will be writing up more on how Kundera indexes various entities and how you can execute Lucene Queries in subsequent posts.

Kundera’s next milestones will be:

  1. Implementation of JPA listeners, @PrePersist @PostPersist etc.
  2. Implementation of Relationships, @OneToMany, @ManyToMany etc.
  3. Implementation of Transactional support, @Transactional

Written by Animesh

July 14, 2010 at 9:51 am

Posted in Technology

Tagged with , , , , ,

Kundera: knight in the shining armor!

with 37 comments

[tweetmeme source=”anismiles” only_single=false http://www.URL.com%5D

The idea behind Kundera is to make working with Cassandra drop-dead simple, and fun. Kundera does not reinvent the wheel by making another client library; rather it leverages the existing libraries, and builds – on top of them – a wrap-around API to developers do away with the unnecessary boiler plate codes, and program  a neater, cleaner code that reduces code-complexity and improves quality. And above all, improves productivity.

Download Kundera here: http://code.google.com/p/kundera/

Note: Kundera is now JPA 1.0 compatible, and there are some ensuing changes. You should read about it here: https://anismiles.wordpress.com/2010/07/14/kundera-now-jpa-1-0-compatible/

Objectives:

  • To completely remove unnecessary details, such as Column lists, SuperColumn lists, byte arrays, Data encoding etc.
  • To be able to work directly with Domain models just with the help of annotations
  • To eliminate “code plumbing”, so as to keep the flow of data processing clear and obvious
  • To completely separate out Cassandra and its obvious concerns from application-level logics for robust application development
  • To include the latest Cassandra developments without breaking anything, anywhere in the business layer

Cassandra Data Models

At the very basic level, Cassandra has Column and SuperColumn to hold your data. Column is a tuple with a name, value and a timestamp; while SuperColumn is Column of Columns. Columns are stored in a ColumnFamily, and SuperColumns in SuperColumnFamily. The most important thing to note is that Cassandra is not your old relational database, it is a flat system. No joins, No foreign keys, nothing. Everything you store here is 100% de-normalized.

Read more details here: https://anismiles.wordpress.com/2010/05/18/cassandra-data-model/

Using Kundera

Kundera defines a range of annotations to describe your Entity objects. Kundera is now JPA1.0 compatible. It builds a range of various Annotations, on top of JPA annotations, to suit its needs. Here are the basic rules:

General Rules

  • Entity classes must have a default no-argument constructor.
  • Entity classes must be annotated with @CassandraEntity @Entity (@CassandraEntity annotation is dropped in favor of JPA @Entity)
  • Entity classes for ColumnFamily must be annotated with @ColumnFamily(“column-family-name”)
  • Entity classes for SuperColumnFamily must be annotated with @SuperColumnFamily(“super-column-family-name”)
  • Each entity must have a field annotation with @Id
    • @Id field must of String type. (Since you can define sorting strategies in Cassandra’s storage-conf file, keeping @Id of String type makes life simpler, you will see later)
    • There must be 1 and only 1 @Id per entity.

Note: Kundera works only at property level for now, so all method level annotations are ignored. Idea: keep life simple. 🙂

ColumnFamily Rules

  1. You must define the name of the column family in @ColumnFamily, like @ColumnFamily (“Authors”) Kundera will link this entity class with “Authors” column family.
  2. Entities annotated with @ColumnFamily are scanned for properties for @Colum annotations.
  3. Each such field will qualify to become a Cassandra Column with
    1. Name: name of the property.
    2. Value: value of the property
  4. By default the name of the column will be the name of the property. However, you fancy changing the name, you can override it like, @Column (name=”fancy-name”)
    @Column (name="email")          // override column-name
    String emailAddress;
    
  5. Properties of type Integer, String, Long and Date are inherently supported, rest all will be serialized before they get saved, and de-serialized while getting read. Serialization has some inherent limitations; that is why Kundera discourages you to use custom objects as Cassandra Column properties. However, you are free to do as you want. Just read the serialization tweaks before insanity reins over you, 😉
  6. Kundera also supports Collection and Map properties. However there are few things you must take care of:
    • You must initialize any Collection or Map properties, like
      List<String> list = new ArrayList<String>();
      Set<String> set = new HashSet<String>();
      Map<String, String> map = new HashMap<String, String>();
      
    • Type parameters follow the same rule, described in #5.
    • If you don’t explicitly define the type parameter, elements will be serialized/de-serialized before saving and retrieving.
    • There is no guarantee that the Collection element order will be maintained.
    • Collection and Map both will create as many columns as the number of elements it has.
    • Collection will break into Columns  like,
      1. Name~0: Element at index 0
      2. Name~1: Element at index 1 and so on.

      Name follows rule #4.

    • Map will break into Columns like,
      1. Name~key1: Element at key1
      2. Name~key2: Element at key2 and so on.
    • Again, name follows rule #4.

SuperColumnFamily Rules

  1. You must define the name of the super column family in @SuperColumnFamily, like @SuperColumnFamily (“Posts”) Kundera will link this entity class with “Posts” column family.
  2. Entities annotated with @SuperColumnFamily are scanned for properties for 2 annotations:
    1. @Column and
    2. @SuperColumn
  3. Only properties annotated with both annotations are picked up, and each such property qualifies to become a Column and fall under SuperColumn.
  4. You can define the name of the column like you did for ColumnFamily.
  5. However, you must define the name of the SuperColumn a particular Column must fall under like, @SuperColumn(column = “super-column-name”)
    @Column
    @SuperColumn(column = "post")  // column 'title' will fall under super-column 'post'
    String title;
    
  6. Rest of the things are same as above.

Up and running in 5 minutes

Let’s learn by example. We will create a simple Blog application. We will have Posts, Tags and Authors.

Cassandra data model for “Authors” might be like,

ColumnFamily: Authors = {
    “Eric Long”:{		// row 1
        “email”:{
            name:“email”,
            value:“eric (at) long.com”
        },
        “country”:{
            name:“country”,
            value:“United Kingdom”
        },
        “registeredSince”:{
            name:“registeredSince”,
            value:“01/01/2002”
        }
    },
    ...
}

And data model for “Posts” might be like,

SuperColumnFamily: Posts = {
	“cats-are-funny-animals”:{		// row 1
		“post” :{		// super-column
			“title”:{
				“Cats are funny animals”
			},
			“body”:{
				“Bla bla bla… long story…”
			}
			“author”:{
				“Ronald Mathies”
			}
			“created”:{
				“01/02/2010"
			}
		},
		“tags” :{
			“0”:{
				“cats”
			}
			“1”:{
				“animals”
			}
		}
	},
	// row 2
}

Create a new Cassandra Keyspace: “Blog”

<Keyspace Name="Blog">
<!—family definitions-->

<!-- Necessary for Cassandra -->
<ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
<ReplicationFactor>1</ReplicationFactor>
<EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
</Keyspace>

Create 2 column families: SuperColumnFamily for “Posts” and ColumnFamily for “Authors”

<Keyspace Name="Blog">
<!—family definitions-->
<ColumnFamily CompareWith="UTF8Type" Name="Authors"/>
<ColumnFamily ColumnType="Super" CompareWith="UTF8Type" CompareSubcolumnsWith="UTF8Type" Name="Posts"/>

<!-- Necessary for Cassandra -->
<ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
<ReplicationFactor>1</ReplicationFactor>
<EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
</Keyspace>

Create entity classes

Author.java

@Entity			// makes it an entity class
@ColumnFamily ("Authors")	// assign ColumnFamily type and name
public class Author {

    @Id						// row identifier
    String username;

    @Column (name="email")	// override column-name
    String emailAddress;

    @Column
    String country;

    @Column (name="registeredSince")
    Date registered;

    String name;

    public Author () {		// must have a default constructor
    }

    ... // getters/setters etc.
}

Post.java

@Entity					// makes it an entity class
@SuperColumnFamily("Posts")			// assign column-family type and name
public class Post {

	@Id								// row identifier
	String permalink;

	@Column
	@SuperColumn(column = "post")	// column 'title' will be stored under super-column 'post'
	String title;

	@Column
	@SuperColumn(column = "post")
	String body;

	@Column
	@SuperColumn(column = "post")
	String author;

	@Column
	@SuperColumn(column = "post")
	Date created;

	@Column
	@SuperColumn(column = "tags")	// column 'tag' will be stored under super-column 'tags'
	List<String> tags = new ArrayList<String>();

	public Post () {		// must have a default constructor
	}

       ... // getters/setters etc.
}

Note the annotations, match them against the rules described above. Please see how “tags” property has been initialized. This becomes very important because Kundera uses Java Reflection to read and populate the entity classes. Anyways, once we have entity classes in place…

Instantiate EnityManager

Kundera now works as a JPA provider, and here is how you can instantiate EntityManager. https://anismiles.wordpress.com/2010/07/14/kundera-now-jpa-1-0-compatible/#entity-manager

EntityManager manager = new EntityManagerImpl();
manager.setClient(new PelopsClient());
manager.getClient().setKeySpace("Blog");

And that’s about it. You are ready to rock-and-roll like a football. Sorry, I just got swayed with FIFA fever. 😉

Supported Operations

Kundera supports JPA EntityManager based operations, along with JPA queries. Read more here: https://anismiles.wordpress.com/2010/07/14/kundera-now-jpa-1-0-compatible/#entity-operations


Save entities

Post post = ... // new post object
try {
manager.save(post);
} catch (IllegalEntityException e) { e.printStackTrace(); }
catch (EntityNotFoundException e) { e.printStackTrace(); }

If the entity is already saved in Cassandra database, it will be updated, else a new entity will be saved.
Load entity

try {
Post post = manager.load(Post.class, key); // key is the identifier, for our case, "permalink"
} catch (IllegalEntityException e) { e.printStackTrace(); }
catch (EntityNotFoundException e) { e.printStackTrace(); }

Load multiple entities

try {
List posts = manager.load(Post.class, key1, key2, key3...); // key is the identifier, "permalink"
} catch (IllegalEntityException e) { e.printStackTrace(); }
catch (EntityNotFoundException e) { e.printStackTrace(); }

Delete entity

try {
manager.delete(Post.class, key); // key is the identifier, "permalink"
} catch (IllegalEntityException e) { e.printStackTrace(); }
catch (EntityNotFoundException e) { e.printStackTrace(); }


Wow! Was it fun? Was it easy? I’m sure it was. Keep an eye on Kundera, we will be rolling out sooner-than-you-imagine more features like,

  1. Transaction support
  2. More fine-grained methods for better control
  3. Lazy-Loading/Selective-Loading of entity properties and many more.

Written by Animesh

June 30, 2010 at 7:12 pm

Posted in Technology

Tagged with , , , ,

ZooKeeper – Primer (contd.)

with 12 comments

[tweetmeme source=”anismiles” only_single=false http://www.URL.com%5D

>> continued from here.

Watches

The power of Zookeeper comes from Watches. Watches allow clients to get notified when a znode changes in some way. Watches are set by operations, and are triggered by ZooKeeper when anything gets changed. For example, a watch can be placed on a znode which will be triggered when the znode data changes or the znode itself gets deleted.

The catch here is that Watches are triggered only ONCE. This might look pretty restrictive at first, but this helps keep ZooKeeper simple and if our client is insistent for more notifications it can always re-register the watch.

There are 9 basic operations in ZooKeeper.

Operation Type Description
create Write Creates a znode. (parent must already exist)
delete Write Deletes a znode (must not have any children)
exists Read Tests whether a znode exists and retrieves its metadata
getACL, setACL Gets/Sets the ACL for a znode
getChildren Read Gets a list of the children of a znode
getData, setData Read/Write Gets/Sets the data associated with a znode
sync Synchronizes a client’s view of a znode.

The rule is: Watches are set by read operations, and triggered by write operations. Isn’t it very intuitive?

As stated in previous post, znodes maintain version numbers for data changes, ACL changes, and timestamps, to allow cache validations and coordinated updates. Every time you want to change znode’s data, or its ACL information, you need to provide the correct versio,n and after successful operation, version number further gets incremented. You can relate it to Hibernate’s optimistic locking methodology, where every row is assigned with a version to resolve concurrent modification conflicts. Anyways, we are talking about Watches here.

Read operations like exists, getChildren and getData set the Watches. And these Watches are triggered by write operations like, create, delete and setData. Important point to note here is that ACL operations do not trigger or register any Watches, though they indeed mess with version numbers. When a Watch is triggered, a watch event is generated and passed to the Watcher which can do whatever it wishes to do with it. Let us now find out when and how various watch events are triggered.

  1. Watch set with exists operation gets triggered when the znode is created, deleted or someone updates its data.
  2. Watch set on getData gets triggered when the znode is deleted or someone updates its data.
  3. Watch set on getChildren gets triggered when a new child is added or removed or when the znode itself gets deleted.

Let’s summarize it in a table:

ZooKeeper Watch operate on dual layer. You can specify a Watch while instantiating ZooKeeper object which will be notified about ZooKeeper’s state. The same Watch also gets notified for znode changes, if you haven’t specified any explicit Watch during read operations.

Let’s now try to connect to ZooKeeper.

public class ZkConnector {

    // ZooKeeper Object
    ZooKeeper zooKeeper;

    // To block any operation until ZooKeeper is connected. It's initialized
    // with count 1, that is, ZooKeeper connect state.
    java.util.concurrent.CountDownLatch connectedSignal = new java.util.concurrent.CountDownLatch(1);

    /**
     * Connects to ZooKeeper servers specified by hosts.
     *
     * @param hosts
     * @throws IOException
     * @throws InterruptedException
     */
    public void connect(String hosts) throws IOException, InterruptedException {
	zooKeeper = new ZooKeeper(
                hosts, // ZooKeeper service hosts
                5000,  // Session timeout in milliseconds
		// Anonymous Watcher Object
		new Watcher() {
        	    @Override
        	    public void process(WatchedEvent event) {
        		// release lock if ZooKeeper is connected.
        		if (event.getState() == KeeperState.SyncConnected) {
        		    connectedSignal.countDown();
        		}
        	    }
        	}
	);
	connectedSignal.await();
    }

    /**
     * Closes connection with ZooKeeper
     *
     * @throws InterruptedException
     */
    public void close() throws InterruptedException {
	zooKeeper.close();
    }

    /**
     * @return the zooKeeper
     */
    public ZooKeeper getZooKeeper() {
        // Verify ZooKeeper's validity
        if (null == zooKeeper || !zooKeeper.getState().equals(States.CONNECTED)){
	    throw new IllegalStateException ("ZooKeeper is not connected.");
        }
        return zooKeeper;
    }

}

The above class connects to the ZooKeeper service. When a ZooKeeper instance is created, connect() method, it starts a thread to connect to the service, and returns immediately. However, the constructor accepts a Watcher to notify about ZooKeeper state changes, we must wait for connection to get established before running any operation on ZooKeeper object.

In this example, I have used CountDownLatch class, which blocks the thread after ZooKeeper constructor has returned. This will hold the thread until its count is reduced by 1. When the client has changed its status, our anonymous Watcher receives a call to its process() method with WatchedEvent object, which then verifies the client’s state and reduces CountDownLatch counter by 1. And out object is ready to use.

This diagram captures ZooKeeper’s state transitions:

Okay. Since we are connected to ZooKeeper service, let’s try to do something meaningful.

Let’s say, we have two processes, pA and pB. The process pA picks up a chuck of data and performs some sort of operations, while process pB waits for pA to finish and then issues an email notifying about the data changes.

Simple, huh? Sure, it can be solved by using Java’s concurrent package. But we will do it using ZooKeeper for obvious gains like scalability. Here are the steps:

  1. Define a znode say, /game_is_over
    final String myPath = "/game_is_over”;
    
  2. Get ZooKeeper object
    ZkConnector zkc = new ZkConnector();
    zkc.connect("localhost");
    ZooKeeper zk = zkc.getZooKeeper();
    
  3. pB registers a Watch with ZooKeeper service with exists operation. This Watch will receive a call once the znode becomes available.
    zk.exists(myPath, new Watcher() {		// Anonymous Watcher
    	@Override
    	public void process(WatchedEvent event) {
    	   // check for event type NodeCreated
       	   boolean isNodeCreated = event.getType().equals(EventType.NodeCreated);
    	   // verify if this is the defined znode
    	   boolean isMyPath = event.getPath().equals(myPath);
    
    	   if (isNodeCreated &amp;&amp; isMyPath) {
    		//<strong>TODO</strong>: send an email or whatever
    	   }
    	}
    });
    
  4. pA, after finishing its job, creates the znode. Effectively alerting pB to start working.
    zk.create(
    	myPath, 		// Path of znode
    	null,			// Data not needed.
    	Ids.OPEN_ACL_UNSAFE, 	// ACL, set to Completely Open.
    	CreateMode.PERSISTENT	// Znode type, set to Persistent.
    );
    

That was easy. As soon as pA finishes its job, it creates a znode, ideally this should have been an ephemeral znode, on which pB already has registered a Watch which gets triggered off immediately, notifying pB to do its job.
With similar models, you can implement various distributed data-structures, locks, barriers etc on top of ZooKeeper. I will write few more posts on this, but for now you can refer to ZooKeeper’s recipes.
So, this is what ZooKeeper start-up primer is. This will get you kick-started immediately. However, there are still some fundamentals left to cover, like session, ACL, consistency models etc. Keep checking this space, I will write more on these in near future.

Written by Animesh

June 13, 2010 at 4:10 pm

Posted in Technology

Tagged with , ,

ZooKeeper – Primer

with one comment

[tweetmeme source=”anismiles” only_single=false http://www.URL.com%5D
Distributed collaborative applications involve a set of processes or agents interacting with one another to accomplish a common goal. They execute on Wide Area environments with little or no knowledge of the infrastructure and almost no control over the resources available. Besides, they need to sequence and order events, and ensure atomicity of actions. Above all, the application needs to keep itself from nightmarish bugs like race conditions, deadlocks and partial failures.

ZooKeeper helps to build a distributed application by working as a coordination service provider.

It’s reliable and highly available. It exposes a simple set of primitives upon which distributed applications can build higher level services for

  • Synchronization,
  • Configuration Maintenance,
  • Groups,
  • Naming,
  • Leader elections and other niche needs.

What lies beneath?

ZooKeeper maintains a shared hierarchical namespace modeled after standard file systems. The namespace consists of data registers, called znodes. They are similar to files and directories.

Note: Znodes store data in Memory primarily, with a logged backup on disk for reliability. It means that whatever data znodes can keep must fit into memory, hence it must be small, max to 1MB. On the other hand, it means high throughput and low latency.

Znodes are identified by unique absolute paths which are “/” delimited Unicode strings. To help achieve uniqueness, ZooKeeper provides sequential znodes where a globally maintained sequence number will be appended by ZooKeeper to paths, i.e. path “/zoo-1/tiger/white-” can be assigned with a sequence, say 5, and will become “/zoo-1/tiger/white-5”.

  1. A client can create a znode, store up to 1MB of data and associate as many as children znodes as it wants.
  2. Data access to and fro a znode is always atomic. Either the data is read and/or written in its entirety or it fails.
  3. There are no renames and no append semantics available.
  4. Each znode has an Access Control List (ACL) that restricts who can do what.
  5. Znodes maintain version numbers for data changes, ACL changes, and timestamps, to allow cache validations and coordinated updates.

Znodes can be one of two types: ephemeral and persistent. Once set, the type can’t be changed.

  1. Ephemeral znodes are deleted by ZooKeeper when the creating client’s session gets closed, while persistent znodes stay as long as not deleted explicitly.
  2. Ephemeral znodes can’t have children.
  3. Both types of znodes are visible to all clients eligible with ACL policy.

Up and Running

There are enough literature on installing ZooKeeper on Linux machine already. So, I am going to focus how to install ZooKeeper on Windows machines.

  1. Download and install Cygwin. http://www.cygwin.com/
  2. Download stable release of ZooKeeper. http://hadoop.apache.org/zookeeper/releases.html
  3. Unzip ZooKeeper to some directory, say, D:/iLabs/zookeeper-3.3.1
  4. Add a new environment variable ZOOKEEPER_INSTALL and point it to D:/iLabs/zookeeper-3.3.1
  5. Edit PATH variable and append $ZOOKEEPER_INSTALL/bin to it.
  6. Now start Cygwin.

Now, start ZooKeeper server.

$ zkServer.sh start

ouch! It threw an error:

ZooKeeper exited abnormally because it could not find the configuration file, zoo.cfg, which it expects in
$ZOOKEEPER_INSTALL/conf directory. This is a standard Java properties file.

Go ahead and create zoo.cfg file in the conf directory. Open it up, and add below properties:

# The number of milliseconds of each tick
tickTime=2000

# The directory where the snapshot is stored.
dataDir=D:/iLabs/zoo-data/

# The port at which the clients will connect
clientPort=2181

Go back to Cygwin, and issue the same command again. This time ZooKeeper should load properly.

Now, connect to ZooKeeper. You should probably open a new Cygwin window, and issue the following command.

$ zkCli.sh

This will connect to your ZooKeeper server running at localhost:2181 by default, and will open zk console.

Let’s create a znode, say /zoo-1

[zk: localhost:2181<CONNECTED> 1] create -s /zoo-1 “Hello World!” null

Flag –s creates a persistent znode. Hello World! is the data you assign to znode (/zoo-1) and null is its ACL.

To see all znodes,

[zk: localhost:2181<CONNECTED> 2] ls /
[zoo-1, zookeeper]

This means, there are 2 nodes at the root level, /zoo-1 and /zookeeper. ZooKeeper uses the /zookeeper sub-tree to store management information, such as information on quotas.

For more commands, type help. If you want to further explore on the command line tools, refer: http://hadoop.apache.org/zookeeper/docs/current/zookeeperStarted.html

continue reading the primer >>

Written by Animesh

June 8, 2010 at 3:08 pm

Posted in Technology

Tagged with , ,

Lucandra – an inside story!

with 14 comments

[tweetmeme source=”anismiles” only_single=false http://www.URL.com%5D

Lucene works with

  1. Index,
  2. Document,
  3. Field and
  4. Term.

An index contains a sequence of documents. A document is a sequence of fields. A field is a named sequence of terms. A term is a string that represents a word from text. This is the unit of search. It is composed of two elements, the text of the word, as a string, and the name of the field that the text occured in, an interned string. Note that terms may represent more than words from text fields, but also things like dates, email addresses, urls, etc.

Lucene’s index is inverted index. In normal indexes, you can look for a document to know what fields it contains. In inverted index, you look for a field to know all other documents it appears in. It’s kind of upside-down view of the world. But it makes searching blazingly fast.

Read More: http://lucene.apache.org/java/3_0_1/fileformats.html

On a very high level, you can think of lucene indexes as 2 buckets:

  1. Bucket-1 keeps all the Terms (with additional info like, term frequency, position etc.) and it knows which documents have these terms.
  2. Bucket-2 stores all leftover field info, majorly non-indexed info.

How Lucandra does it?

Lucandra needs 2 column families for each bucket described above.

  1. Family-1 to store Term info. We call it “TermInfo”
  2. Family-2 to store leftover info. We call it “Documents”

“TermInfo” family is a SuperColumnFamily. Each term gets stored in a separate row identified with TermKey (“index_name/field/term”) and stores SuperColumns containing Columns of various term information like, term frequency, position, offset, norms etc. This is how it looks:

"TermInfo" => {
    TermKey1:{                                        // Row 1
        docId:{
            name:docId,
            value:{
                Frequencies:{
                    name: Frequencies,
                    value: Byte[] of List[Number]
                },
                Position:{
                    name: Position,
                    value: Byte[] of List[Number]
                },
                Offsets:{
                    name: Offsets,
                    value: Byte[] of List[Number]
                },
                Norms:{
                    name: Norms,
                    value: Byte[] of List[Number]
                }
            }
        }
    },
    TermKey2 => {                                    // Row 2
    }
    ...
}

“Documents” family is a StandardColumnFamily. Each document gets stored in a separate row identified with DocId (“index_name/document_id”) and stores Columns of various storable fields. This looks like,

"Documents" => {
        DocId1: {                        // Row 1
            field1:{
                name: field1,
                value: binary storable content
            },
            field2{
                name: field2,
                value: binary storable content
            }
        },
        DocId2: {                        // Row 2
            field1:{
                name: field1,
                value: binary storable content
            },
        ...
        },
        ...
    }

The Lucandra Cassandra Keyspace looks like this:

<Keyspace Name="Lucandra">
    <ColumnFamily Name="TermInfo"
        CompareWith="BytesType"
        ColumnType="Super"
        CompareSubcolumnsWith="BytesType"
        KeysCached="10%" />
    <ColumnFamily Name="Documents"
        CompareWith="BytesType"
        KeysCached="10%" />

    <ReplicaPlacementStrategy>
        org.apache.cassandra.locator.RackUnawareStrategy
    </ReplicaPlacementStrategy>
    <ReplicationFactor>1</ReplicationFactor>
    <EndPointSnitch>
        org.apache.cassandra.locator.EndPointSnitch
    </EndPointSnitch>
</Keyspace>

Lucene has got many powerful features, like wildcards queries, result sorting, range queries etc. For Lucandra to have these features enabled, you must configure Cassandra with OrderedPreservingParitioner, i.e. OPP.

Cassandra comes with RandomPartitioner, i.e. RP by default, but

  1. RP does NOT support Range Slices, and
  2. If you scan through your keys, they will NOT come in order.

If you still insist on using RP, you might encounter some exceptions, and you might need to go to Lucandra source to amend range query sections.

java.lang.RuntimeException: InvalidRequestException(why:start key's md5 sorts after
end key's md5.this is not allowed; you probably should not specify end key at all,
under RandomPartitioner)
    at lucandra.LucandraTermEnum.loadTerms(LucandraTermEnum.java:217)
    at lucandra.LucandraTermEnum.skipTo(LucandraTermEnum.java:88)
    at lucandra.IndexReader.docFreq(IndexReader.java:163)
    at org.apache.lucene.search.IndexSearcher.docFreq(IndexSearcher.java:138)

This is what you need to change in Cassandra config:

<Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>

Benefits

  1. Since you can pull ranges of keys and groups of columns in Cassandra, you can really tune the performance of reads and minimize network IO for each query.
  2. Since writes are indexed in Cassandra, and Cassandra replicates itself, you don’t need to worry about optimizing the indexes or reopening the index to see new writes. With Lucene you need to take care of optimizing your indexes from time to time, and you need to re-instantiate your Searcher object to see new writes.
  3. So, with Cassandra underlying Lucene, you get a real-time distributed search engine.

Caveats

As we discussed in earlier post, you can extend Lucene either by implementing you own Directory class, or writing your own IndexReader and IndexWriter classes. And Lucandra does it using the former approach and it makes much more sense.

Read here: Apache Lucene and Cassandra

Benefits that Lucandra gets are because of Cassandra’s amazing capability to store and scale the key-value pairs. Directory class works in close proximity with IndexReader and IndexWriter to store and read indexes from some storage (filesystem and/or database). It generally receives huge chunks of sequential bytes, not a key-value pair, which would be difficult to store in Cassandra, and even if stored, it would not make optimum use of Cassandra.

Anyhow, given that Lucene is not very object oriented and almost never uses interfaces, using Lucandra’s IndexWriter and IndexReader seamlessly with your legacy codes will NOT be possible.

Lucandra’s IndexReader extends org.apache.lucene.index.IndexReader which makes this class fit for your legacy codes. You just need to instantiate it and then you can pass it around to your native code without much thought:

IndexReader indexReader = new IndexReader(INDEX_NAME, cassandraClient);
// Notice that the constructor is different.
IndexSearcher indexSearcher = new IndexSearcher(indexReader);

But mind you, Lucandra’s IndexReader will NOT help you walk through the indexed documents. Who needs it anyway? 😉

However, Lucandra’s IndexWriter is an independent class, and doesn’t extend or relates to org.apache.lucene.index.IndexWriter in any way. That makes it impossible to use this class in your legacy codes without re-factoring. But, to ease you pain, it does implement the methods with the same signature as native’s, e.g. addDocument, deleteDocuments etc. have the same signature. If that makes you a little happy. 🙂

Also, Lucandra attempts to re-write all related logic inside its IndexWriter, for example, logic to invoke analyzer to fetch terms, calculating term frequencies, offsets etc. This too makes Lucandra a bit weird for future portability. Whenever, Lucene introduces a new thing, or changes its logic in any way, Lucanadra will need to re-implement them. For example, Lucene recently introduced Payloads which add weights to specific terms, much like spans. It works by extending Similarity class with additional logic. Lucandra doesn’t support it. And to support, Lucandra would need to amend its code.

In short, I am trying to say that the way Lucandra is implemented it would make it difficult to inherently use any future Lucene enhancements, but – God forbid! – there is no other way around. Wish Lucene had a better structure!

Anyways, right now, Lucandra supports:

  1. Real-Time indexing
  2. Zero optimization
  3. Search
  4. Sort
  5. Range Queries
  6. Delete
  7. Wildcards and other Lucene magic
  8. Faceting/Highlighting

Apart from this, the way Lucandra uses Cassandra can also have some scalability issues with large data. You can find some clue here:
http://ria101.wordpress.com/2010/02/22/cassandra-randompartitioner-vs-orderpreservingpartitioner/

Performance

Lucandra claims that it’s slower that Lucene. Indexing is ~10% slower, and so is reading. However, I found it must better and faster than Lucene. I wrote comparative tests to index 15K documents, and search over the index. I ran the tests on my Dell-Latitude D520 with 3GB RAM, and Lucandra (single Cassandra node) was ~35% faster than Lucene during indexing, and ~20% for search. May be, I should try with bigger set of data.

is Lucandra production ready?

There is a Twitter search app http://sparse.ly which is built on Lucandra. This service uses Lucandra exclusively, without any relational or other sort of databases. Given the depth and breadth of twitter data, and that sparse.ly is pretty popular and stable, Lucandra does seem to be production ready.

🙂 But, may be, you should read the Caveats once more and see if you are okay with them.

Written by Animesh

May 27, 2010 at 8:03 am

Posted in Technology

Tagged with , , , ,

Connecting to Cassandra – 1

with 13 comments

Cassandra uses the Apache Thrift framework as its client API. Apache Thrift is a remote procedure call framework “scalable cross-language services development”. You can define data types and service interfaces in a thrift definition file, through which the compiler generates the code in your chosen languages. Effectively, it combines a software stack with a code generation engine to build services that work efficiently and seamlessly between a numbers of languages.

Apache Thrift – though is a state of art engineering feat – is not the best choice for a client API, especially for Cassandra.

  1. Cassandra supports multiple nodes, and you can connect to any node anytime. And this is an amazing thing, because if a node falls down, a client can connect to any other node available without pulling system down. Alas! Apache Thrift doesn’t support this inherently, you need to make you client aware of node-failures and write a strategy to pick up a next alive node.
  2. Thrift doesn’t support connection pooling. So, either you connect to the server every time, or keep a connection alive for a longer period of time. Or, perhaps, write a connection pool engine. Sad!

There are few clients available which make these things easier for you. They are like wrapper over Thrift to save you from a lot of nuisance. Anyhow, since even those clients work on top of Thrift, it makes sense to learn Thrift: to make our foundation strong.

Let’s first create a dummy Keyspace for ourselves:

<Keyspace Name="AddressBook">
<ColumnFamily CompareWith="UTF8Type" Name="Users" />

<!-- Necessary for Cassandra -->
<ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy
</ReplicaPlacementStrategy>
<ReplicationFactor>1</ReplicationFactor>
<EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
</Keyspace>

We created a new Keyspace “AddressBook” which has a ColumnFamily “Users” with sorting policy of “UTF8Type” type.

Connect to Cassandra Server:

private TTransport transport = null;
private Cassandra.Client client = null;

public Cassandra.Client connect(String host, int port) {
    try {
        transport = new TSocket(host, port);
        TProtocol protocol = new TBinaryProtocol(transport);
        Cassandra.Client client = new Cassandra.Client(protocol);
        transport.open();
        return client;
    } catch (TTransportException e) {
        e.printStackTrace();
    }
    return null;
}

The above code is pretty fundamental:

  1. Opens up a Socket at the given host and port.
  2. Defines a protocol, in this case, it’s binary.
  3. And instantiates the client object.
  4. Returns client object for further operations.

Note: Cassandra uses “9160” as its default port.

Disconnect from Cassandra Server:

public void disconnect() {
    try {
        if (null != transport) {
            transport.flush();
            transport.close();
        }
    } catch (TTransportException e) {
        e.printStackTrace();
    }
}

To close the connection in a descent way, you should invoke “flush” to take care of any data that might still be there in the transport buffer.

Store a data object:

Let’s say, our User object is something like below:

public class User {
    // unique
    private String username;
    private String email;
    private String phone;
    private String zip;

    // getter and setter here.
}

To model one User to Cassandra, we need 3 columns to store email, phone and zip and the name of the row would be username. Right? Let’s create a list to store these columns.

List<ColumnOrSuperColumn> columns = new ArrayList<ColumnOrSuperColumn>();

The List contains ColumnOrSuperColumn objects. Cassandra gives us an aggregate object which can contain either a Column or a SuperColumn. You wonder why? Because, Apache thrift doesn’t support inheritance. Anyways, now we will create columns and store them in this list.

// generate a timestamp.
long timestamp = new Date().getTime();
ColumnOrSuperColumn c = null;

// add email
c = new ColumnOrSuperColumn();
c.setColumn(new Column("email".getBytes("utf-8"), user.getEmail().getBytes("utf-8"), timestamp));
columns.add(c);

// add phone
c = new ColumnOrSuperColumn();
c.setColumn(new Column("phone".getBytes("utf-8"), user.getPhone().getBytes("utf-8"), timestamp));
columns.add(c);

// add zip
c = new ColumnOrSuperColumn();
c.setColumn(new Column("zip".getBytes("utf-8"), user.getZip().getBytes("utf-8"), timestamp));
columns.add(c);

Okay, so we have the list of columns populated. Now, we need a Map which will hold the rows, that is list of columns. Key to this map will be the name of the ColumnFamily.


Map<String, List<ColumnOrSuperColumn>> data = new HashMap<String, List<ColumnOrSuperColumn>>();
data.put("Users", columns); // “Users” is our ColumnFamily Name.

Great. We have everything in place. Now, we will use client.batch_insert to store everything at once. This will create row in the ColumnFamily identified by the given key.


client.batch_insert( "AddressBook",          // Keyspace
                      user.getUsername(),    // Row identifier key
                      data,                  // Map which contains the list of columns.
                      ConsistencyLevel.ANY   // Consistency level. Explained below.
);

ConsistencyLevel parameter is used for both read and write operations to determine when the request made by the client is successful. ConsistencyLevel.ANY means that a write action is successful when it has been written to at least one node. Read Cassandra Wiki for a detailed information.

In the next blog, we will see how to delete and update a record in Casandra.

Written by Animesh

May 24, 2010 at 10:42 am

Posted in Technology

Tagged with , , ,