English

Google App Engine

Using Push Queues with Python

In App Engine push queues, a task is a unit of work to be performed by the application. Each task is an object of the Task class. Each Task object contains an application-specific URL with a request handler for the task, and an optional data payload that parameterizes the task.

For example, consider a calendaring application that needs to notify an invitee, via email, that an event has been updated. The data payload for this task consists of the email address and name of the invitee, along with a description of the event. The webhook might live at /app_worker/send_email and contain a function that adds the relevant strings to an email template and sends the email. The app can create a separate task for each email it needs to send.

You can only use push queues within the App Engine environment; if you need to access App Engine tasks from outside of App Engine, use pull queues.

Using Push Queues in Python

A Python app sets up queues using a configuration file named queue.yaml (see Python Task Queue Configuration). If an app does not have a queue.yaml file, it has a queue named default with some default settings.

To enqueue a task, you call the taskqueue.add() function. (You can also create a Task object and call its add() method.) The task consists of data for a request, including a URL path, parameters, HTTP headers, and an HTTP payload. It can also include the earliest time to execute the task (the default is as soon as possible) and a name for the task. The task is added to a queue, then performed by the Task Queue service as the queue is processed.

The following example defines a task handler (CounterWorker) that increments a counter in the datastore, mapped to the URL /worker. It also defines a user-accessible request handler that displays the current value of the counter for a GET request, and for a POST request enqueues a task and returns. It's difficult to visualize the execution of tasks with the user-accessible handler if the queue processes them too quickly. Therefore, the task in this example should run at a rate no greater than once per second.

from google.appengine.api import taskqueue
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import template
from google.appengine.ext.webapp.util import run_wsgi_app

class Counter(db.Model):
    count = db.IntegerProperty(indexed=False)

class CounterHandler(webapp.RequestHandler):
    def get(self):
        self.response.out.write(template.render('counters.html',
                                                {'counters': Counter.all()}))

    def post(self):
        key = self.request.get('key')

        # Add the task to the default queue.
        taskqueue.add(url='/worker', params={'key': key})

        self.redirect('/')

class CounterWorker(webapp.RequestHandler):
    def post(self): # should run at most 1/s
        key = self.request.get('key')
        def txn():
            counter = Counter.get_by_key_name(key)
            if counter is None:
                counter = Counter(key_name=key, count=1)
            else:
                counter.count += 1
            counter.put()
        db.run_in_transaction(txn)

def main():
    run_wsgi_app(webapp.WSGIApplication([
        ('/', CounterHandler),
        ('/worker', CounterWorker),
    ]))

if __name__ == '__main__':
    main()

(In this example, 'counters.html' refers to a Django template that contains the HTML for a page that displays the counter value, and a button to trigger a POST request to the / URL.)

Note that this example is not idempotent. It is possible for the task queue to execute a task more than once. In this case, the counter is incremented each time the task is run, possibly skewing the results.

Task Execution

App Engine executes push tasks by calling application-specific URLs with request handlers for those tasks. These URLs must be local to your application root directory and specified as a relative URL. You attach this URL to the task definition using the url parameter of the Task class.

Programmatically referring to a bundled HTTP request in this fashion is sometimes called a "web hook." You can specify web hooks ahead of time, without waiting for their actual execution. Thus, an application can create many web hooks at once and then hand them off to App Engine. The system then processes them asynchronously (by invoking the HTTP request). This web hook model enables efficient parallel processing.

Note: If a task performs sensitive operations (such as modifying important data), you can secure the worker URL to prevent a malicious external user from calling it directly.

A task must finish executing and send an HTTP response value between 200–299 within 10 minutes of the original request. This deadline is separate from user requests, which have a 60-second deadline. If your task's execution nears the limit, App Engine raises a DeadlineExceededError (from the module google.appengine.runtime) that you can catch to save your work or log progress before the deadline passes. If the task failed to execute, App Engine retries it based on criteria that you can configure.

Tip: Use App Engine Backends if you need to process tasks requiring more than 10 minutes.

Task Request Headers

Requests from the Task Queue service contain the following HTTP headers:

  • X-AppEngine-QueueName, the name of the queue (possibly default)
  • X-AppEngine-TaskName, the name of the task, or a system-generated unique ID if no name was specified
  • X-AppEngine-TaskRetryCount, the number of times this task has been retried; for the first attempt, this value is 0
  • X-AppEngine-FailFast specifies that a task running on a backend fails immediately instead of waiting in a pending queue.
  • X-AppEngine-TaskETA, the target execution time of the task, specified in microseconds since January 1st 1970.

The Rate of Task Execution

You set the maximum processing rate for the entire queue when you configure the queue. App Engine uses a token bucket algorithm to execute tasks once they've been delivered to the queue. Each queue has a token bucket, and each bucket holds a certain number of tokens. Your app consumes a token each time it executes a task. If the bucket runs out of tokens, the system pauses until the bucket has more tokens. The rate at which the bucket is refilled is the limiting factor that determines the rate of the queue. See Defining Push Queues and Processing Rates for more details.

To ensure that the taskqueue system does not overwhelm your application, it may throttle the rate at which requests are sent. This throttled rate is known as the enforced rate. The enforced rate may be decreased when your application returns a 503 HTTP response code, or if there are no instances able to execute a request for an extended period of time. You can view the enforced rate on the Task Queue tab of the Administration Console.

The Order of Task Execution

The order in which tasks are executed depends on several factors:

  • The position of the task in the queue. App Engine attempts to process tasks based on FIFO (first in, first out) order. In general, tasks are inserted into the end of a queue, and executed from the head of the queue.
  • The backlog of tasks in the queue. The system attempts to deliver the lowest latency possible for any given task via specially optimized notifications to the scheduler. Thus, in the case that a queue has a large backlog of tasks, the system's scheduling may "jump" new tasks to the head of the queue.
  • The value of the task's eta property. This property specifies the earliest time that a task can execute. App Engine always waits until after the specified ETA to process push tasks.
  • The value of the task's countdown property. This property specifies the minimum number of seconds to wait before executing a task. Countdown and eta are mutually exclusive; if you specify one, do not specify the other.

Task Retries

If a push task request handler returns an HTTP status code within the range 200–299, App Engine considers the task to have completed successfully. If the task returns a status code outside of this range, App Engine retries the task until it succeeds. The system backs off gradually to avoid flooding your application with too many requests, but schedules retry attempts for failed tasks to recur at a maximum of once per hour.

You can also configure your own scheme for task retries using the retry_parameters directive in queue.yaml.

When implementing the code for tasks (as worker URLs within your app), it is important to consider whether the task is idempotent. App Engine's Task Queue API is designed to only invoke a given task once; however, it is possible in exceptional circumstances that a task may execute multiple times (such as in the unlikely case of major system failure). Thus, your code must ensure that there are no harmful side-effects of repeated execution.

Deferred Tasks

Setting up a handler for each distinct task (as described in the previous sections) can be cumbersome, as can serializing and deserializing complex arguments for the task—particularly if you have many diverse but small tasks that you want to run on the queue. The Python SDK includes a library (google.appengine.ext.deferred) exposing a simple function that allows you to bypass all the work of setting up dedicated task handlers and serializing and deserializing your parameters.

To use this library, you need to add the deferred builtin to app.yaml. For more information, please see the Built-in Handlers section of the Python Application Configuration page.

To use the deferred library, simply pass the function and its arguments to deferred.defer():

import logging

from google.appengine.ext import deferred

  def do_something_expensive(a, b, c=None):
      logging.info("Doing something expensive!")
      # Do your work here

  # Somewhere else
  deferred.defer(do_something_expensive, "Hello, world!", 42, c=True)

The deferred library packages your function call and its arguments, then adds it to the task queue. When the task is executed, the deferred library executes do_something_expensive("Hello, world!", 42).

For more information about using the deferred library in Python, please refer to Background Work with the Deferred Library

URL Endpoints

Push tasks reference their implementation via URL. For example, a task which fetches and parses an RSS feed might use a worker URL called /app_worker/fetch_feed. You can specify this worker URL or use the default. In general, you can use any URL as the worker for a task, so long as it is within your application; all task worker URLs must be specified as relative URLs:

from google.appengine.api import taskqueue

taskqueue.add(url='/path/to/my/worker')
taskqueue.add(url='/path?a=b&c=d', method='GET')

If you do not specify a worker URL, the task uses a default worker URL named after the queue:

/_ah/queue/queue_name

A queue's default URL is used if, and only if, a task does not have a worker URL of its own. If a task does have its own worker URL, then it is only invoked at the worker URL, never another. Once inserted into a queue, its url endpoint cannot be changed.

Warning! If a task does not have a worker URL, then the task is invoked against the queue's default URL, even if there is currently no handler defined for the queue's default URL. This results in a 404 which will be available, along with the exact URL that was tried, in your application's logs. Due to the failure state of this 404, the system saves the task and retries it until it is eventually successful. You can clear (or 'purge') tasks that can't complete successfully using the Administration Console.

You can also target tasks to App Engine Backends. Backends allow you to process tasks beyond the 10-minute deadline for task execution. See Push Queues and Backends for more information.

Securing URLs for Tasks

You can prevent users from accessing URLs of tasks by restricting access to administrator accounts. Task queues can access admin-only URLs. You can restrict a URL by adding login: admin to the handler configuration in app.yaml.

An example might look like this in app.yaml:

application: hello-tasks
version: 1
runtime: python
api_version: 1

handlers:
- url: /tasks/process
  script: process.py
  login: admin

Note: While task queues can use URL paths restricted with login: admin, they cannot use URL paths restricted with login: required.

For more information see Python Application Configuration: Requiring Login or Administrator Status.

To test a task web hook, sign in as an administrator and visit the URL of the handler in your browser.

Push Queues and the Development Server

When your app is running in the development server, tasks are automatically executed at the appropriate time just as in production. To disable tasks from running in the development server, run the following command:

dev_appserver.py --disable_task_runnning

You can examine and manipulate tasks from the developer console at: http://localhost:8080/_ah/admin/taskqueue.

To execute tasks, select the queue by clicking on its name, select the tasks to execute, and click Run Now. To clear a queue without executing any tasks, click Purge Queue.

Push Queues and Backends

Push tasks typically must finish execution within 10 minutes. If you have push tasks that require more time or computing resources to process, you can use App Engine Backends to process these tasks outside of the normal limits of App Engine applications. Backends are addressable. When you create the push task, all you need to do is address it to the URL of the backend you wish to process it. The following code sample demonstrates how to create a push task addressed to an instance 1 of a backend named backend1:

from google.appengine.api import taskqueue
...
    def post(self):
        key = self.request.get('key')

        # Add the task to the default queue.
        taskqueue.add(url='/path/to/my/worker/', params={'key': key},
                      target='1.backend1')

Quotas and Limits for Push Queues

Enqueuing a task in a push queue counts toward the following quotas:

  • Task Queue Stored Task Count
  • Task Queue Stored Task Bytes
  • Task Queue API Calls

The Task Queue Stored Task Bytes quota is configurable in queue.yaml by setting total_storage_limit. This quota counts towards your Stored Data (billable) quota.

Execution of a task counts toward the following quotas:

  • Requests
  • Incoming Bandwidth
  • Outgoing Bandwidth

The act of executing a task consumes bandwidth-related quotas for the request and response data, just as if the request handler were called by a remote client. When the task queue processes a task, the response data is discarded.

Once a task has been executed or deleted, the storage used by that task is reclaimed. The reclaiming of storage quota for tasks happens at regular intervals, and this may not be reflected in the storage quota immediately after the task is deleted.

For more information on quotas, see Quotas, and the "Quota Details" section of the Admin Console.

In addition to quotas, the following limits apply to the use of push queues:

Limit Amount
task object size 100KB
number of active queues (not including the default queue) 10 for free apps
100 for billed apps
queue execution rate 500 task invocations per second per queue
maximum countdown/ETA for a task 30 days from the current date and time
maximum number of tasks that can be added in a batch 100 tasks
maximum number of tasks that can be added in a transaction 5 tasks