Google Code offered in: English - Español - 日本語 - 한국어 - Português - Pусский - 中文(简体) - 中文(繁體)
Pull queues allow you to design your own system to consume App Engine tasks. The task consumer can be part of your App Engine app (such as a backend) or a system outside of App Engine (using the Task Queue REST API). The task consumer leases a specific number of tasks for a specific duration, then processes and deletes them before the lease ends.
Using pull queues requires your application to handle some functions that are automated in push queues:
Pull queues require a specific configuration in queue.yaml
. For more information, please see Defining Pull Queues on the Task Queue configuration page.
The following sections describe the process of enqueuing, leasing, and deleting tasks using pull queues.
Pull queues allow a task consumer to process tasks outside of App Engine's default task processing system. If the task consumer is a part of your App Engine app, you can manipulate tasks using simple API calls from the google.appengine.api.taskqueue
module. Task consumers outside of App Engine can pull tasks using the Task Queue REST API.
The process works like this:
You can use pull queues within the App Engine environment using simple API calls to add tasks to a pull queue, lease them, and delete them after processing.
Before you begin, make sure to configure the pull queue in queue.yaml
.
To add tasks to a pull queue, simply get the queue using the queue name defined in queue.yaml
, and set the Task method
to PULL
. The following example enqueues tasks in a pull queue named pull-queue
:
from google.appengine.api import taskqueue q = taskqueue.Queue('pull-queue') tasks = [] payload_str = 'hello world' tasks.append(taskqueue.Task(payload=payload_str, method='PULL')) q.add(tasks)
Once you have added tasks to a pull queue, you can lease one or more tasks using lease_tasks(). There may be a short delay before tasks recently added using add() become available via lease_tasks(). When you request a lease, you specify the number of tasks to lease (up to a maximum of 1,000 tasks) and the duration of the lease in seconds (up to a maximum of one week). The lease duration needs to be long enough to ensure that the slowest task will have time to finish before the lease period expires. You can extend a task lease using modify_task_lease()
Leasing a task makes it unavailable for processing by another worker, and it remains unavailable until the lease expires. If you lease an individual task, the API selects the task from the front of the queue. If no such task is available, an empty list is returned.
This method returns a Task object containing a list of tasks leased from the queue.
Note: lease_tasks() operates only on pull queues. If you attempt to lease tasks added in a push queue, App Engine throws an exception. You can change a push queue to a pull queue by changing its definition in queue.yaml
. Please see Defining Pull Queues for more information. The following code sample leases 100 tasks from the queue pull-queue
for one hour:
from google.appengine.api import taskqueue q = taskqueue.Queue('pull-queue') q.lease_tasks(3600, 100)
In general, once a worker completes a task, it needs to delete the task from the queue. If you see tasks remaining in a queue after a worker finishes processing them, it is likely that the worker failed; in this case, the tasks need to be processed by another worker.
You can delete a list of tasks, such as that returned by lease_task(), simply by passing it to delete_tasks():
from google.appengine.api import taskqueue q = taskqueue.Queue('pull-queue') tasks = q.lease_tasks(3600, 100) # Perform some work with the tasks here q.delete_tasks(tasks)
You can use App Engine Backends as workers to lease and process pull queue tasks. Backends allow you to process more work without having to worry about request deadlines and other restrictions normally imposed by App Engine. Using backends with pull queues gives you processing efficiencies by allowing you to batch task processing using leases.
For more information about using backends, check out the Backends documentation.
If you need to use pull queues from outside App Engine, you must use the Task Queue REST API. The REST API is a Google web service accessible at a globally-unique URI of the form:
https://www.googleapis.com/taskqueue/v1beta1/projects/taskqueues
Google provides the following client libraries that you can use to call the Task Queue methods remotely:
In the table below, the first column shows each library's stage of development; note that some are still in early stages. The second column links to the main page for each library.
For libraries that have samples for the Google Task Queue, the third column in the table below links to them directly. If a library's samples page does not yet include a sample for this API, you can still use that library -- simply adapt one of the existing samples as needed.
Client library | Public repository | All client library samples |
---|---|---|
Google APIs Client Library for .NET (beta) | google-api-dotnet-client/ | .NET samples |
Google APIs Client Library for Go (alpha) | google-api-go-client/ | Go samples |
Google API Libraries for Google Web Toolkit (alpha) | gwt-google-apis/ | GWT samples |
Google APIs Client Library for Java (beta) | google-api-java-client/ | Java samples |
Google APIs Client Library for JavaScript (alpha) | google-api-javascript-client/ | JavaScript samples td> |
Google APIs Client Library for Objective C (alpha) | google-api-objectivec-client/ | Objective-C samples |
Google APIs Client Library for PHP (beta) | google-api-php-client/ | PHP samples |
Google APIs Client Library for Python (beta) | google-api-python-client/ | Python samples |
Google APIs Client Library for Ruby (alpha) | google-api-ruby-client/ | Ruby samples |
The REST API uses OAuth as the authorization mechanism. When you configure your pull queue, make sure that your queue.yaml
file supplies
the email addresses of the users that can access the queue using the REST API.
The OAuth scope for all methods is https://www.googleapis.com/auth/taskqueue
.
This section demonstrates the use of the REST API in an application called gtaskqueue, which is shipped with the samples (see above). Installing this application creates two Python binaries in /usr/local/bin
: gtaskqueue and gtaskqueue_puller. The gtaskqueue binary allows you to interact with the REST API via the command line. The gtaskqueue_puller binary is a command-line tool that can continually grab tasks from a pull queue, and execute an arbitrary binary for each task that is pulled. It also supports sending the output of the binary to an arbitrary URL.
The gtaskqueue tool uses the Google APIs Client Library for Python to interact with the REST API. The command-line functions are based on the gflags Python library. The sections below show the Python code used to import the library and use it to lease and delete tasks. The final section describes how to implement scaling in your application.
To begin using the library, you need to install it in your local environment. After installation, you can import the appropriate client libraries and build the taskqueue service:
from apiclient.discovery import build task_api = build('taskqueue', 'v1beta1')
Once you've built the task queue service, your application can access methods from the library allowing you to interact with the REST API. The following sections describe the two most common functions used with the Task Queue API, allowing you to lease and delete tasks.
The Google APIs Client Library provides methods that invoke the Tasks.lease method in the REST API. When you create a lease, you need to specify the number of tasks to lease (up to a maximum of 1,000 tasks); the API returns the specified number of in order of the oldest task ETA.
You also need to specify the the duration of the lease in seconds (up to a maximum of one week). The lease must be long enough to enable you to finish all the leased tasks, yet short enough that if your consumer crashes, the tasks will be available for lease by other clients relatively soon. Similarly, if you lease too many tasks at once and your client crashes, a large number of tasks will become unavailable until the lease expires.
The following code from the gtaskqueue sample shows how to lease tasks using the library.
def _get_tasks_from_queue(self): """Gets the available tasks from the taskqueue. Returns: Lease response object. """ try: tasks_to_fetch = self._num_tasks_to_lease() lease_req = self.task_api.tasks().lease(project=FLAGS.project_name, taskqueue=FLAGS.taskqueue_name, leaseSecs=FLAGS.lease_secs, numTasks=tasks_to_fetch, body={}) result = lease_req.execute() return result except HttpError, http_error: logger.error('Error during lease request: %s' % str(http_error)) return None
This code enables a command-line tool for leasing a specified number of tasks for a set duration:
gtaskqueue leasetask --project="gpullqueue1" \ --taskqueue_name=appengtaskpuller \ --lease_secs=30 \ --num_tasks=100
When run, this command-line tool constructs the following URI call to the REST API:
POST https://www.googleapis.com/taskqueue/v1beta1/projects/gpullqueue1/taskqueues/appengtaskpuller/tasks/lease?alt=json&lease_secs=30&numTasks=100
This request returns an array of 100 tasks with the following JSON structure:
{ "kind": "taskqueues#tasks", "items": [ { "kind": "taskqueues#task", "id": string, "queueName": string, "payloadBase64": string, "enqueueTimestamp": number, "leaseTimestamp": number } ... ] }
After processing each task, you need to delete it, as described in the following section.
In general, once a worker completes a task, it needs to delete the task from the queue. If you see tasks remaining in a queue after a worker finishes processing, it is likely that the worker failed; in this case, the tasks need to be processed by another worker.
You can delete an individual task or a list of tasks using the REST method Tasks.delete. You must know the name of a task in order to delete it. You can get the task name from the id
field of the Task object returned by Tasks.lease.
Call delete if you have finished a task, even if you have exceeded the lease time. Tasks should be idempotent, so even if a task lease expires and another client leases the task, performing the same task twice should not cause an error.
Note: When you delete a Task, it immediately becomes invisible to queries, but its name remains in the system for up to seven days. During this time, attempting to create another task with the same name will result in an "item exists" error. The system offers no method to determine if deleted task names are still in the system. To avoid these issues, we recommend that you let App Engine generate the task name automatically.
Returning to our gtaskqueue example, the following code snippet uses the Google APIs Client Library for Python to delete tasks from a queue:
def _delete_task_from_queue(self, task_api): try: delete_request = task_api.tasks().delete(project=FLAGS.project_name, taskqueue=FLAGS.taskqueue_name, task=self.task_id) delete_request.execute() except HttpError, http_error: logger.error('Error deleting task %s from taskqueue. Error details %s' %(self.task_id, str(http_error)))
This code enables a command for naming a task to delete:
gtaskqueue deletetask --project_name="gpullqueue1" \ --taskqueue_name=appengtaskpuller \ --task_name=taskID
When run, this command constructs the following URI call to the REST API:
DELETE https://www.googleapis.com/taskqueue/v1beta1/projects/gpullqueue1/taskqueues/appengtaskpuller/tasks/taskID
If the delete command is successful, the API returns an HTTP 200 response. If deletion fails, the API returns an HTTP failure code.
Enqueuing a task counts counts toward the following quotas:
Leasing a task counts toward the following quotas:
The Task Queue Stored Task Bytes quota is configurable in queue.yaml
by setting total_storage_limit. This quota counts towards your Stored Data (billable) quota.
In addition to quotas, the following limits apply to the use of pull queues:
Limit | Amount |
---|---|
Task Object Size | 1MB |
Number of Active Queues (Not Including the Default Queue) | 10 for free apps 100 for billed apps |
Maximum Number of Tasks That Can Be Added in a Batch | 100 tasks |
Maximum Number of Tasks That Can Be Added in a Transaction | 5 tasks |
Maximum Number of Tasks That You Can Lease in a Single lease_tasks() Operation | 1000 tasks |
Maximum Payload Size when Leasing a Batch of Tasks | 32MB 1MB when using the REST API |