Introduction

MapReduce is a programming model and an associated implementation for processing and generating large data sets with a parallel, distributed algorithm on a cluster. It is a core component of the Hadoop ecosystem and is designed to handle vast amounts of data by dividing the work into a set of independent tasks.

Key Concepts

  1. Map Function

  • Input: Takes a set of data and converts it into another set of data, where individual elements are broken down into key-value pairs.
  • Output: Produces intermediate key-value pairs.

  1. Reduce Function

  • Input: Takes the intermediate key-value pairs produced by the map function and merges those values based on the key.
  • Output: Produces a smaller set of key-value pairs.

  1. Job

  • A complete MapReduce program is referred to as a job. It consists of multiple tasks, including map tasks and reduce tasks.

  1. Task

  • A task is the execution of a single map or reduce function on a subset of the data.

  1. Job Tracker and Task Tracker

  • Job Tracker: Manages the job by scheduling tasks, monitoring them, and re-executing failed tasks.
  • Task Tracker: Executes the tasks as directed by the Job Tracker.

MapReduce Workflow

  1. Input Splitting: The input data is split into fixed-size pieces called input splits.
  2. Mapping: Each split is processed by a map task, which produces intermediate key-value pairs.
  3. Shuffling and Sorting: The intermediate key-value pairs are shuffled and sorted by key.
  4. Reducing: The sorted key-value pairs are processed by reduce tasks to produce the final output.
  5. Output: The final output is written to the distributed file system.

Practical Example

Word Count Program

The word count program is a classic example of a MapReduce application. It counts the number of occurrences of each word in a given input set.

Code Example

import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Explanation

  • TokenizerMapper: This class extends the Mapper class. It tokenizes the input text and emits each word as a key with a value of 1.
  • IntSumReducer: This class extends the Reducer class. It sums up the values for each key (word) and emits the word with its total count.
  • Main Method: Configures and runs the job. It sets the input and output paths, the mapper and reducer classes, and the output key and value types.

Practical Exercise

Exercise: Implement a MapReduce Program to Count the Number of Lines in a Text File

  1. Objective: Write a MapReduce program that counts the number of lines in a given text file.
  2. Steps:
    • Create a mapper class that emits a constant key-value pair for each line.
    • Create a reducer class that sums up the values for the constant key.
    • Configure and run the job in the main method.

Solution

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class LineCount {

  public static class LineMapper
       extends Mapper<LongWritable, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text lineKey = new Text("line");

    public void map(LongWritable key, Text value, Context context
                    ) throws IOException, InterruptedException {
      context.write(lineKey, one);
    }
  }

  public static class LineReducer
       extends Reducer<Text, IntWritable, Text, IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "line count");
    job.setJarByClass(LineCount.class);
    job.setMapperClass(LineMapper.class);
    job.setCombinerClass(LineReducer.class);
    job.setReducerClass(LineReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

Explanation

  • LineMapper: Emits a constant key-value pair ("line", 1) for each line in the input.
  • LineReducer: Sums up the values for the constant key "line" to get the total number of lines.
  • Main Method: Configures and runs the job similarly to the word count example.

Conclusion

In this section, we covered the MapReduce framework, its key concepts, and the workflow. We also provided a practical example of a word count program and an exercise to count the number of lines in a text file. Understanding MapReduce is crucial for processing large datasets in a distributed environment, and it forms the backbone of many Hadoop applications. In the next module, we will delve deeper into the Hadoop ecosystem tools that complement the MapReduce framework.

© Copyright 2024. All rights reserved