Google Code offered in: English - Español - 日本語 - 한국어 - Português - Pусский - 中文(简体) - 中文(繁體)
Mapreduce is an experimental, innovative, and rapidly changing new feature for App Engine. Unfortunately, being on the bleeding edge means that we may make backwards-incompatible changes to Mapreduce. We will inform the community when this feature is no longer experimental.
This document presents an overview of the MapReduce Python API. It consists of the following sections:
Note: App Engine Mapreduce is Google's mapreduce with performance limits to allow developers to control their app's runtime costs. The programming design considerations however, are the same, which means that you can take advantage of ideas already available when you are designing your app. For more information on programming with mapreduce, see the Google Code University Mapreduce course. You can also view Mike Aizatsky’s 2011 Google IO conference presentation.
MapReduce is a computing model developed by Google to do efficient distributed computing over large data sets. As shown below, input data values in the MapReduce model are processed in three stages:
Input data values in the MapReduce model are processed in three stages:
In the MapReduce API, all three of these logical steps are tied together in a single class, MapreducePipeline
, as shown in the following figure:
As the figure shows, a MapReduce job proceeds as follows:
MapreducePipeline
runs the input reader to feed the data into the mapper.MapreducePipeline
runs its mapper function over the data, processing the data into discrete values and assigning keys.MapreducePipeline
knows where to get that shuffled data; your reducer code needn't worry about that.)
Currently, to use the MapReduce API, you must download the MapReduce library and unzip it to your application directory. You can download the Mapreduce library by downloading the MapReduce Bundle zip file from the App Engine Downloads page.
Note: Alternatively, check out the mapreduce
folder into your application directory:
svn checkout http://appengine-mapreduce.googlecode.com/svn/trunk/python/src/mapreduce
To enable mapreduce framework in the app add the following include to the app.yaml configuration file:
includes: - mapreduce/include.yaml
The App Engine adaptation of Google's MapReduce model is optimized for the needs of the App Engine environment, where resource quota management is a key consideration. This release of the MapReduce API provides the following features and capabilities:
In your code, you instantiate a MapreducePipeline
object inside the run
method of a PipelineBase
object as follows:
class WordCountPipeline(base_handler.PipelineBase): def run(self, filekey, blobkey): logging.debug("filename is %s" % filekey) output = yield mapreduce_pipeline.MapreducePipeline( "word_count", "main.word_count_map", "main.word_count_reduce", "mapreduce.input_readers.BlobstoreZipInputReader", "mapreduce.output_writers.BlobstoreOutputWriter", mapper_params={ "blob_key": blobkey, }, reducer_params={ "mime_type": "text/plain", }, shards=16) yield StoreOutput("WordCount", filekey, output)
The following arguments are supplied to the MapreducePipeline
object's run
method:
You must write your own mapper and reducer functions. (The shuffler feature is built in and you don't invoke it explicitly.) You can use the standard data input readers and output writers (BlobstoreZipInputReader
and BlobstoreOutputWriter
in the example).
To start a MapReduce job using the MapreducePipeline
object, you invoke the Pipeline
base class's start
method on it, as shown below:
def post(self): filekey = self.request.get("filekey") blob_key = self.request.get("blobkey") if self.request.get("word_count"): pipeline = WordCountPipeline(filekey, blob_key) pipeline.start()
If you wish, you can display a status monitor for your MapReduce jobs, as follows:
def post(self): filekey = self.request.get("filekey") blob_key = self.request.get("blobkey") if self.request.get("word_count"): pipeline = WordCountPipeline(filekey, blob_key) pipeline.start() self.redirect(pipeline.base_path + "/status?root=" + pipeline.pipeline_id)
To find out whether your mapreduce job is complete, you need to save the pipeline ID when you start the mapreduce job, as shown in the following mapreduce pipeline code:
class StartMapreduce(webapp.RequestHandler): def get(self): pipeline = mapreduce_pipeline.MapreducePipeline(...) pipeline.start() self.redirect('/wait?pipeline=' + pipeline.pipeline_id)
Notice the redirect above where the pipeline ID is saved.
Then in the handler where you want to do some work when the mapreduce job is complete, you get the mapreduce pipeline using the saved pipeline ID, and you check it to determine whether it is done.
class WaitHandler(webapp.RequestHandler): def get(self): pipeline_id = self.request.get('pipeline') pipeline = mapreduce_pipeline.MapreducePipeline.from_id(pipeline_id) if pipeline.has_finalized: # MapreducePipeline has completed else: # MapreducePipeline is still running
As shown above, the MapreducePipeline has_finalized
method is used to check for a completed job.