Introduction
MapReduce is a programming model and an associated implementation for processing and generating large data sets with a parallel, distributed algorithm on a cluster. It is a core component of the Hadoop ecosystem and is designed to handle vast amounts of data by dividing the work into a set of independent tasks.
Key Concepts
- Map Function
- Input: Takes a set of data and converts it into another set of data, where individual elements are broken down into key-value pairs.
- Output: Produces intermediate key-value pairs.
- Reduce Function
- Input: Takes the intermediate key-value pairs produced by the map function and merges those values based on the key.
- Output: Produces a smaller set of key-value pairs.
- Job
- A complete MapReduce program is referred to as a job. It consists of multiple tasks, including map tasks and reduce tasks.
- Task
- A task is the execution of a single map or reduce function on a subset of the data.
- Job Tracker and Task Tracker
- Job Tracker: Manages the job by scheduling tasks, monitoring them, and re-executing failed tasks.
- Task Tracker: Executes the tasks as directed by the Job Tracker.
MapReduce Workflow
- Input Splitting: The input data is split into fixed-size pieces called input splits.
- Mapping: Each split is processed by a map task, which produces intermediate key-value pairs.
- Shuffling and Sorting: The intermediate key-value pairs are shuffled and sorted by key.
- Reducing: The sorted key-value pairs are processed by reduce tasks to produce the final output.
- Output: The final output is written to the distributed file system.
Practical Example
Word Count Program
The word count program is a classic example of a MapReduce application. It counts the number of occurrences of each word in a given input set.
Code Example
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount { public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } } public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCount.class); job.setMapperClass(TokenizerMapper.class); job.setCombinerClass(IntSumReducer.class); job.setReducerClass(IntSumReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Explanation
- TokenizerMapper: This class extends the
Mapper
class. It tokenizes the input text and emits each word as a key with a value of 1. - IntSumReducer: This class extends the
Reducer
class. It sums up the values for each key (word) and emits the word with its total count. - Main Method: Configures and runs the job. It sets the input and output paths, the mapper and reducer classes, and the output key and value types.
Practical Exercise
Exercise: Implement a MapReduce Program to Count the Number of Lines in a Text File
- Objective: Write a MapReduce program that counts the number of lines in a given text file.
- Steps:
- Create a mapper class that emits a constant key-value pair for each line.
- Create a reducer class that sums up the values for the constant key.
- Configure and run the job in the main method.
Solution
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class LineCount { public static class LineMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text lineKey = new Text("line"); public void map(LongWritable key, Text value, Context context ) throws IOException, InterruptedException { context.write(lineKey, one); } } public static class LineReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf, "line count"); job.setJarByClass(LineCount.class); job.setMapperClass(LineMapper.class); job.setCombinerClass(LineReducer.class); job.setReducerClass(LineReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
Explanation
- LineMapper: Emits a constant key-value pair ("line", 1) for each line in the input.
- LineReducer: Sums up the values for the constant key "line" to get the total number of lines.
- Main Method: Configures and runs the job similarly to the word count example.
Conclusion
In this section, we covered the MapReduce framework, its key concepts, and the workflow. We also provided a practical example of a word count program and an exercise to count the number of lines in a text file. Understanding MapReduce is crucial for processing large datasets in a distributed environment, and it forms the backbone of many Hadoop applications. In the next module, we will delve deeper into the Hadoop ecosystem tools that complement the MapReduce framework.
Hadoop Course
Module 1: Introduction to Hadoop
- What is Hadoop?
- Hadoop Ecosystem Overview
- Hadoop vs Traditional Databases
- Setting Up Hadoop Environment
Module 2: Hadoop Architecture
- Hadoop Core Components
- HDFS (Hadoop Distributed File System)
- MapReduce Framework
- YARN (Yet Another Resource Negotiator)
Module 3: HDFS (Hadoop Distributed File System)
Module 4: MapReduce Programming
- Introduction to MapReduce
- MapReduce Job Workflow
- Writing a MapReduce Program
- MapReduce Optimization Techniques
Module 5: Hadoop Ecosystem Tools
Module 6: Advanced Hadoop Concepts
Module 7: Real-World Applications and Case Studies
- Hadoop in Data Warehousing
- Hadoop in Machine Learning
- Hadoop in Real-Time Data Processing
- Case Studies of Hadoop Implementations