Monday, July 23, 2012

Exploring Concurreny in Java


The Article contains my learning and notes about concurrency in Java

ConcurrentHashMap

Use ConcurrentHashMap if performance is critical while HashMap in a scenario multiple threads are accessing the HashMap rather than using the collection method
Collections.synchronizedMap(Map).
Here the method use to put lock on entire map object and hence update by any thread is reflected to all other thread.
Where as ConcurrentHashMap only lock the portion of data while updating the map allowing many threads to update the map and hence did not throw ConcurrentModificationException.

Synchronizing on the whole map fails to take advantage of a possible optimisation: because hash maps store their data in a series of separate buckets, it is in principle possible to lock only the portion of the map that is being accessed. This optimisation is generally called lock striping.
Java 5 brings a hash map optimised in this way in the form of ConcurrentHashMap. A combination of lock striping plus judicious use of volatile variables gives the class two highly concurrent properties:
  • Writing to a ConcurrentHashMap locks only a portion of the map;
  • Reads can generally occur without locking

Due to the concurrent nature of this collection the method that operate on entire map like size and isEmpty only return approximate value.The collection is expected to modified continuously.

Since ConcurrentHashMap can not locked for exclusive access, there are following atomic methods provided for general usage.



// Insert into map only if no value is mapped from K
V putIfAbsent(K key, V value);

// Remove only if K is mapped to V
boolean remove(K key, V value);

// Replace value only if K is mapped to oldValue
boolean replace(K key, V oldValue, V newValue);

// Replace value only if K is mapped to some value
V replace(K key, V newValue);
 
Like ConcurrentHashMap there is no such class name ConcurrentHashSet has been provided.


Thursday, July 19, 2012

Hadoop Notes

The content is my understanding of Hadoop.

What is Hadoop?
Hadoop is MapReduce framework implemented in Java for processing large amount of unstructured data.

Hadoop is designed on the concept of scale out architecture.


Hadoop Life Cycle
There are four phases of Hadoop life cycle
Map
Shuffle & Sort
Reduce





Hadoop IO
Hadoop does not use default Java IO framework.
Following are set of classed and interfaces provided by framework for IO operations.

Writable (I) - should be implemented for that are going to be serialized i.e. for values.
WritableComparable (I)- should be implemented for data are going to be used as key for comparison ie for key.
ObjectWritable(C) - A polymorphic Writable that writes an instance with it's class name. Handles arrays, strings and primitive types without a Writable wrapper less efficient as it writes(appends) the class declaration for each object it  writes. 

GenericWritable(C)- GenericWritable is a wrapper class for Writable instance. It implements the Configurable interface, so that it will be configured by the framework. The configuration is passed to the wrapped objects implementing Configurable interface before deserialization.

Usage - In scenario when we have two sequence files emitting same keys but different value type.
Since we have not have different value types from mappers  we can used the ObjectWritable or GenericWritable.
In the case where you know the type of classes that are going to be emitted by mapper, in advance then go for GenericWriable. It has a protected method Class[] getTypes();
which need to override and return the array of class types.

Text(C) - Text is a mutalbe variant of string in Hadoop.
This class stores text using standard UTF8 encoding. It provides methods to serialize, deserialize, and compare texts at byte level. The type of length is integer and is serialized using zero-compressed format.
In addition, it provides methods for string traversal without converting the byte array to a string.

Aprat from Text there are other mutable flavor of  jara wrapper classes provided like
IntergerWritable,LongWritable,ByteWritable etc.

RawComparator - A comparator that directly operates on byte representation of Object. 

WritableComparator -  extends RawComparator and hence provides compare method which directly compares bytes of argument comparable.

Hadoop provides following Input\Output formats.

SequenceFileInputFormat 

SequenceFileInputFormat is java specific file input format and hence only read by using Java API only.
SequenceFileInputFormat reads special binary files that are specific to Hadoop.
These files include many features designed to allow data to be rapidly read into Hadoop mappers.

Sequence files are block-compressed and provide direct serialization and deserialization of several arbitrary data types (not just text). Sequence files can be generated as the output of other MapReduce tasks and are an efficient intermediate representation for data that is passing from one MapReduce job to another.


MultiFileInputFormat/CombineFileInputFormat





Partitioner in Hadoop

Partioner(AC) - Hadoop Partitioner are used to partition the keys emitted by mapper  to different reducers.
Hadoop provide theree type of partitioner.
HashPartitioner - The default partitioner. Hash Besed Partitioning.

TotalOrderPartitioner -  partitions keys range and hence decides which key/value pair belongs to which partition.
Sampling technique is used to partition keys.
Sample is done by following ways
- SplitSampler - only samples first n records in a split, not so good for sorted data, because it doesn't select keys from throughout the split.
- IntervalSampler - chooses keys at regular intervals through the split and makes a better choice for sorted data.
- RandomSample - good general purpose sampler.

One of the use of TotalOrderPartitioner is creating global sorted file unless possible only by using single reducer which is highly inefficient for large files. 

BinaryPartitioner-



Combiners in Hadoop

Hadoop allows the users to specify a combiner function to be run on the map output.
The combiners are like mini reducers that run on mapper node.
The combiner fuction's output becomes input form the input to the reduce function.

The combiner are optimization semantic  and hence Hadoop makes no guarantee on how many times the combiner is applied or that is even applied at all i.e. the framework has option of using this performance optimization semantic.

And because of above limitation, in map reduce program the out put type of mapper and combiner must be same so as the input type of reducer.

Hence one of the key point here for programmer to take into account is that correctness of map reduce job should not depend on the execution of combiner.

Combiners reduce the amount of intermediate data shuffled across the network but doesn't actually reduce the number of key-value pairs that are emitted by the mapper in the first place.

With Hadoop combiners intermediate key-value pairs are materialized in an in-memory buffer and then "spilled" to local disk. Only in subsequent merges passes of on disk key-value pairs are combiner executed.
This process involves unnecessary object creation and destruction and further more object serialization and deserialization.

To avoid this one can use in mapper combining method.
However in mapper combining method suffers from memory limitation for skewed distribution of key-values.

Distributed Cache

Distributed Cache is read only cache available across the cluster.
Rather than serializing data in the job configuration, it is preferable to distribute the data set using Hadoop's distribute cache mechanism.

This provides a service for copying the file and archives to the task nodes in times for the task to use them when they run.
To save network bandwidth files are normally copied to any perticular node once per Job.

Distributed cache can be used to distribute the simple,read-only data /text files and/or more complex type such as archives,jars etc.

Distributed Cache tracks modification timestamps of the cache files.Cache files should not be modified by the application or externally while the job is executing.

 Joins in Hadoop

There are following popular join algorithms in Hadoop.

Map Side Join

Map side joins are more efficient than reduce side join.
Hadoop provides a way of using of map side join by providing
CompositeInputFormat.
However for data compiling to following condition.

- All data sets must be sorted by using the same comparator.
- All data sets must be partitioned by using same partitioner
- The number of partitioner in all data sets must be identical since a given key has to be in the same partition (number) in each dataset so that all partitions can hold a key are joined to gather.

  CompositeInputFormat ignores the block/split size. In CompositeInputFormat, the input files need to be sorted and partitioned identically... therefore, Hadoop has no way to determine where to split the file to maintain this property. It has no way to determine where to split the file to keep the files organized.
The only way to get around this is to split and partition the files manually into smaller splits. You can do this by passing the data through a mapreduce job (probably just identity mapper and identity reducer) with a larger amount of reducers. Just be sure to pass both of your data sets through with the same number of reducers.


A map reduce join will present all the key value pair of each partition, to a single map task in the key order.

Path dir1= the directory containing the part_xxxxx files for data set 1.
Path dir2= the directory containing the part_xxxxx files for data set 2.

and use CompositeInputFormat.compose to build the join statement.
set the InputFormat to CompositeInputFormat.
Also need to specify what the actual format for file (KeyValueTextInputFormat).

conf.setInputFormat(CompositeInputFormat.class);

String joinstatement = CompositeInputFormat.compose("inner",KeyValueTextInputFormat.class,dir1,dir2);

conf.set("mapred.join.expr",joinstatement);

The value class for map method will be TupleWritable.
For example impelementation go through following link

http://www.congiu.com/joins-in-hadoop-using-compositeinputformat/


Broadcast(Replication) Join

Broadcast join is used there need to join the smaller data set with large data set.
With the assumption that smaller data set will fit into memory easily.
The smaller data set is pushed into distributed cache and replicated among the cluster nodes.
 In init method of mapper a hash map(or other associative array) is built from smaller data set.
And join operation is performed in map method and output is emitted.