English

Google App Engine

MapReduce Overview

Experimental!

Mapreduce is an experimental, innovative, and rapidly changing new feature for App Engine. Unfortunately, being on the bleeding edge means that we may make backwards-incompatible changes to Mapreduce. We will inform the community when this feature is no longer experimental.

This document presents an overview of the MapReduce Python API. It consists of the following sections:

Introduction

Note: App Engine Mapreduce is Google's mapreduce with performance limits to allow developers to control their app's runtime costs. The programming design considerations however, are the same, which means that you can take advantage of ideas already available when you are designing your app. For more information on programming with mapreduce, see the Google Code University Mapreduce course. You can also view Mike Aizatsky’s 2011 Google IO conference presentation.

MapReduce is a computing model developed by Google to do efficient distributed computing over large data sets. As shown below, input data values in the MapReduce model are processed in three stages:

Input data values in the MapReduce model are processed in three stages:

  1. Values are mapped (assigned lookup keys) in parallel and stored in intermediate storage.
  2. The resulting key-value pairs are shuffled (collated by key).
  3. Finally, the collated values are reduced (manipulated to yield desired results).

In the MapReduce API, all three of these logical steps are tied together in a single class, MapreducePipeline, as shown in the following figure:

As the figure shows, a MapReduce job proceeds as follows:

  1. The job starts off with input data in some location (possibly Blobstore, as in the MapReduce Made Easy demo).
  2. MapreducePipeline runs the input reader to feed the data into the mapper.
  3. MapreducePipeline runs its mapper function over the data, processing the data into discrete values and assigning keys.
  4. The output of the mapper is shuffled for the reducer function.
  5. The reducer function grabs the shuffled data and does its own processing. (MapreducePipeline knows where to get that shuffled data; your reducer code needn't worry about that.)
  6. Finally, the output is written in the format generated by the output writer. However, you are responsible for saving the output key (to Blobstore, for example) so you can retrieve that data. The MapReduce Made Easy demo shows one way to do this using another pipeline for storing the output.

Downloading the Mapreduce Library

Currently, to use the MapReduce API, you must download the MapReduce library and unzip it to your application directory. You can download the Mapreduce library by downloading the MapReduce Bundle zip file from the App Engine Downloads page.

Note: Alternatively, check out the mapreduce folder into your application directory: svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce

To enable mapreduce framework in the app add the following include to the app.yaml configuration file:

includes:
- mapreduce/include.yaml

Features and Capabilities

The App Engine adaptation of Google's MapReduce model is optimized for the needs of the App Engine environment, where resource quota management is a key consideration. This release of the MapReduce API provides the following features and capabilities:

  • Processing rate limiting to slow down your mapper functions and space out the work, helping you avoid exceeding your resource quotas
  • Automatic sharding for faster execution, allowing you to use as many workers as you need to get your results faster
  • Standard data input readers for iterating over blob and datastore data.
  • Standard output writers
  • Status pages to let you see how your jobs are running

Instantiating a MapReduce Pipeline

In your code, you instantiate a MapreducePipeline object inside the run method of a PipelineBase object as follows:

class WordCountPipeline(base_handler.PipelineBase):

  def run(self, filekey, blobkey):
    logging.debug("filename is %s" % filekey)
    output = yield mapreduce_pipeline.MapreducePipeline(
        "word_count",
        "main.word_count_map",
        "main.word_count_reduce",
        "mapreduce.input_readers.BlobstoreZipInputReader",
        "mapreduce.output_writers.BlobstoreOutputWriter",
        mapper_params={
            "blob_key": blobkey,
        },
        reducer_params={
            "mime_type": "text/plain",
        },
        shards=16)
    yield StoreOutput("WordCount", filekey, output)

The following arguments are supplied to the MapreducePipeline object's run method:

  • The name of the MapReduce job, for display in the user interface and in any logs
  • The mapper function to use
  • The reducer function to use
  • The input reader to use to supply the mapper function with data
  • The output writer for the reducer function to use
  • The parameters (if any) to supply to the input reader
  • The parameters (if any) to supply to the output writer
  • The number of shards (workers) to use for the MapReduce job

You must write your own mapper and reducer functions. (The shuffler feature is built in and you don't invoke it explicitly.) You can use the standard data input readers and output writers (BlobstoreZipInputReader and BlobstoreOutputWriter in the example).

Starting a MapReduce Job

To start a MapReduce job using the MapreducePipeline object, you invoke the Pipeline base class's start method on it, as shown below:

  def post(self):
    filekey = self.request.get("filekey")
    blob_key = self.request.get("blobkey")

    if self.request.get("word_count"):
      pipeline = WordCountPipeline(filekey, blob_key)
      pipeline.start()

Showing the MapReduce Status Monitor

If you wish, you can display a status monitor for your MapReduce jobs, as follows:

  def post(self):
    filekey = self.request.get("filekey")
    blob_key = self.request.get("blobkey")

    if self.request.get("word_count"):
      pipeline = WordCountPipeline(filekey, blob_key)
      pipeline.start()

      self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)

Determining When a MapreducePipeline Job is Complete

To find out whether your mapreduce job is complete, you need to save the pipeline ID when you start the mapreduce job, as shown in the following mapreduce pipeline code:

class StartMapreduce(webapp.RequestHandler):
  def get(self):
    pipeline = mapreduce_pipeline.MapreducePipeline(...)
    pipeline.start()
    self.redirect('/wait?pipeline=' + pipeline.pipeline_id)

Notice the redirect above where the pipeline ID is saved.

Then in the handler where you want to do some work when the mapreduce job is complete, you get the mapreduce pipeline using the saved pipeline ID, and you check it to determine whether it is done.

class WaitHandler(webapp.RequestHandler):
  def get(self):
    pipeline_id = self.request.get('pipeline')
    pipeline = mapreduce_pipeline.MapreducePipeline.from_id(pipeline_id)
    if pipeline.has_finalized:
      # MapreducePipeline has completed
    else:
      # MapreducePipeline is still running

As shown above, the MapreducePipeline has_finalized method is used to check for a completed job.