The content is my understanding of Hadoop.
What is Hadoop?
Hadoop is MapReduce framework implemented in Java for processing large amount of unstructured data.
Hadoop is designed on the concept of scale out architecture.
Hadoop Life Cycle
There are four phases of Hadoop life cycle
Map
Shuffle & Sort
Reduce
Hadoop IO
Hadoop does not use default Java IO framework.
Following are set of classed and interfaces provided by framework for IO operations.
Writable (I) - should be implemented for that are going to be serialized i.e. for values.
WritableComparable (I)- should be implemented for data are going to be used as key for comparison ie for key.
ObjectWritable(C) - A polymorphic Writable that writes an instance with it's class name.
Handles arrays, strings and primitive types without a Writable wrapper less efficient as it writes(appends) the class declaration for each object it writes.
GenericWritable(C)- GenericWritable is a wrapper class for Writable instance. It implements the Configurable interface, so that it will be configured by the framework. The configuration is passed to the wrapped objects
implementing
Configurable
interface
before deserialization.
Usage - In scenario when we have two sequence files emitting same keys but different value type.
Since we have not have different value types from mappers we can used the ObjectWritable or GenericWritable.
In the case where you know the type of classes that are going to be emitted by mapper, in advance then go for GenericWriable. It has a protected method Class[] getTypes();
which need to override and return the array of class types.
Text(C) - Text is a mutalbe variant of string in Hadoop.
This class stores text using standard UTF8 encoding. It provides methods
to serialize, deserialize, and compare texts at byte level. The type of
length is integer and is serialized using zero-compressed format.
In
addition, it provides methods for string traversal without converting the
byte array to a string.
Aprat from Text there are other mutable flavor of jara wrapper classes provided like
IntergerWritable,LongWritable,ByteWritable etc.
RawComparator - A comparator that directly operates on byte representation of Object.
WritableComparator - extends RawComparator and hence provides compare method which directly compares bytes of argument comparable.
Hadoop provides following Input\Output formats.
SequenceFileInputFormat
SequenceFileInputFormat is java specific file input format and hence only read by using Java API only.
SequenceFileInputFormat reads special binary files that are specific
to Hadoop.
These files include many features designed to allow data to be
rapidly read into Hadoop mappers.
Sequence files are block-compressed
and provide direct serialization and deserialization of several
arbitrary data types (not just text).
Sequence files can be generated as
the output of other MapReduce tasks and are an efficient intermediate
representation for data that is passing from one MapReduce job to
another.
MultiFileInputFormat/CombineFileInputFormat
Partitioner in Hadoop
Partioner(AC) - Hadoop Partitioner are used to partition the keys emitted by mapper to different reducers.
Hadoop provide theree type of partitioner.
HashPartitioner - The default partitioner. Hash Besed Partitioning.
TotalOrderPartitioner - partitions keys range and hence decides which key/value pair belongs to which partition.
Sampling technique is used to partition keys.
Sample is done by following ways
- SplitSampler - only samples first n records in a split, not so good for sorted data, because it doesn't select keys from throughout the split.
- IntervalSampler - chooses keys at regular intervals through the split and makes a better choice for sorted data.
- RandomSample - good general purpose sampler.
One of the use of TotalOrderPartitioner is creating global sorted file unless possible only by using single reducer which is highly inefficient for large files.
BinaryPartitioner-
Combiners in Hadoop
Hadoop allows the users to specify a combiner function to be run on the map output.
The combiners are like mini reducers that run on mapper node.
The combiner fuction's output becomes input form the input to the reduce function.
The combiner are optimization semantic and hence Hadoop makes no guarantee on how many times the combiner is applied or that is even applied at all i.e. the framework has option of using this performance optimization semantic.
And because of above limitation, in map reduce program the out put type of mapper and combiner must be same so as the input type of reducer.
Hence one of the key point here for programmer to take into account is that
correctness of map reduce job should not depend on the execution of combiner.
Combiners reduce the amount of intermediate data shuffled across the network but doesn't actually reduce the number of key-value pairs that are emitted by the mapper in the first place.
With Hadoop combiners intermediate key-value pairs are materialized in an in-memory buffer and then "spilled" to local disk. Only in subsequent merges passes of on disk key-value pairs are combiner executed.
This process involves unnecessary object creation and destruction and further more object serialization and deserialization.
To avoid this one can use
in mapper combining method.
However in mapper combining method suffers from memory limitation for skewed distribution of key-values.
Distributed Cache
Distributed Cache is read only cache available across the cluster.
Rather than serializing data in the job configuration, it is preferable to distribute the data set using Hadoop's distribute cache mechanism.
This provides a service for copying the file and archives to the task nodes in times for the task to use them when they run.
To save network bandwidth files are normally copied to any perticular node once per Job.
Distributed cache can be used to distribute the simple,read-only data /text files and/or more complex type such as archives,jars etc.
Distributed Cache tracks modification timestamps of the cache files.Cache files should not be modified by the application or externally while the job is executing.
Joins in Hadoop
There are following popular join algorithms in Hadoop.
Map Side Join
Map side joins are more efficient than reduce side join.
Hadoop provides a way of using of map side join by providing
CompositeInputFormat.
However for data compiling to following condition.
- All data sets must be sorted by using the same comparator.
- All data sets must be partitioned by using same partitioner
- The number of partitioner in all data sets must be identical since a given key has to be in the same partition (number) in each dataset so that all partitions can hold a key are joined to gather.
CompositeInputFormat ignores the block/split size. In
CompositeInputFormat, the input files need to be sorted and partitioned
identically... therefore, Hadoop has no way to determine where to split
the file to maintain this property. It has no way to determine where to
split the file to keep the files organized.
The only way to get around this is to split and partition the files
manually into smaller splits. You can do this by passing the data
through a mapreduce job (probably just identity mapper and identity
reducer) with a larger amount of reducers. Just be sure to pass both of
your data sets through with the same number of reducers.
A map reduce join will present all the key value pair of each partition, to a single map task in the key order.
Path dir1= the directory containing the part_xxxxx files for data set 1.
Path dir2= the directory containing the part_xxxxx files for data set 2.
and use CompositeInputFormat.compose to build the join statement.
set the InputFormat to CompositeInputFormat.
Also need to specify what the actual format for file (KeyValueTextInputFormat).
conf.setInputFormat(CompositeInputFormat.class);
String joinstatement = CompositeInputFormat.compose("inner",
KeyValueTextInputFormat.
class
,dir1,dir2);
conf.set("mapred.join.expr",joinstatement);
The value class for map method will be TupleWritable.
For example impelementation go through following link
http://www.congiu.com/joins-in-hadoop-using-compositeinputformat/
Broadcast(Replication) Join
Broadcast join is used there need to join the smaller data set with large data set.
With the assumption that smaller data set will fit into memory easily.
The smaller data set is pushed into distributed cache and replicated among the cluster nodes.
In init method of mapper a hash map(or other associative array) is built from smaller data set.
And join operation is performed in map method and output is emitted.