English

Google App Engine

NDB Asynchronous Operation

Experimental!

NDB is an experimental, innovative, and rapidly changing new feature for App Engine. Unfortunately, being on the bleeding edge means that we may make backwards-incompatible changes to NDB. We will inform the community when this feature is no longer experimental.

When optimizing an application's performance, consider its NDB use. For example, if an application reads a value that isn't in cache, that read takes a while. You might be able to speed up your application by performing Datastore actions in parallel with other things, or performing a few Datastore actions in parallel with each other.

The NDB API provides many asynchronous ("async") functions. Each of these functions lets an application send a request to the Datastore. The function returns immediately, returning a Future object. The application can do other things while the Datastore handles the request. After the Datastore handles the request, the application can get the results from the Future object.

Introduction

Suppose that one of your application's request handlers needs to use NDB to write something, perhaps to record the request. It also needs to carry out some other NDB operations, perhaps to fetch some data.

class MyRequestHandler(webapp.RequestHandler):
  def get(self):
    acct = Account.get_by_id(users.get_current_user().user_id())
    acct.view_counter += 1
    acct.put()
    # ...read something else from Datastore...
    template.render(...)

By replacing the call to put() with a call to its async equivalent put_async(), the application can do other things right away instead of blocking on put().

class MyRequestHandler(webapp.RequestHandler):
  def get(self):
    acct = Account.get_by_id(users.get_current_user().user_id())
    acct.view_counter += 1
    future = acct.put_async()
    # ...read something else from Datastore...
    template.render(...)
    future.get_result()

This allows the other NDB functions and template rendering to happen while the Datastore writes the data. The application doesn't block on the Datastore until it gets data from the Datastore.

In this example, it's a little silly to call future.get_result: the application never uses the result from NDB. That code is just in there to make sure that the request handler doesn't exit before the NDB put finishes; if the request handler exits too early, the put might never happen. As a convenience, you can decorate the request handler with @context.toplevel. This tells the handler not to exit until its asynchronous requests have finished. This in turn lets you send off the request and not worry about the result.

class MyRequestHandler(webapp.RequestHandler):
  @context.toplevel
  def get(self):
    acct = Account.get_by_id(users.get_current_user().user_id())
    acct.view_counter += 1
    acct.put_async() # Ignoring the Future this returns
    # ...read something else from Datastore...
    template.render(...)

Using Async APIs and Futures

Almost every synchronous NDB API has an _async counterpart. For example, put() has put_async(). The async function's arguments are always the same as those of the synchronous version. The return value of an async method is always either a Future or (for "multi" functions) a list of Futures.

A Future is an object that maintains state for an operation that has been initiated but may not yet have completed; all async APIs return one or more Futures. You can call the Future's get_result() function to ask it for the result of its operation; the Future then blocks, if necessary, until the result is available, and then gives it to you. get_result() returns the value that would be returned by the synchronous version of the API.

Note: If you've used Futures in certain other programming languages, you might think you can use a Future as a result directly. That doesn't work here. Those languages use implicit futures; NDB uses explicit futures. Call get_result() to get an NDB Future's result.

What if the operation raises an exception? That depends on when the exception occurs. If NDB notices a problem when making a request (perhaps an argument of the wrong type), the _async() method raises an exception. But if the exception is detected by, say, the Datastore server, the _async() method returns a Future, and the exception will be raised when your application calls its get_result(). Don't worry too much about this, it all ends up behaving quite natural; perhaps the biggest difference is that if a traceback gets printed, you'll see some pieces of the low-level asynchronous machinery exposed.

For example, suppose you are writing a guestbook application. If the user is logged in, you want to present a page showing the most recent guestbook posts. This page should also show the user their nickname. The application needs two kinds of information: the logged-in user's account information and the contents of the guestbook posts. The "synchronous" version of this application might look like:

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid) # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10) # I/O action 2
# ...render HTML based on this data...

There are two independent I/O actions here: getting the Account entity and fetching recent Guestbook entities. Using the synchronous API, these happen one after the other; we wait to receive the account information before fetching the guestbook entities. But the application doesn't need the account information right away. We can take advantage of this and use async APIs:

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid) # Start I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10) # Start I/O action 2
acct = acct_future.get_result() # Complete I/O action 1
recent_entries = recent_entries_future.get_result() # Complete I/O action 2
# ...render HTML based on this data...

This version of the code first creates two Futures (acct_future and recent_entries_future), and then waits for them. The server works on both requests in parallel. Each _async() function call creates a Future object and sends a request to the Datastore server. The server can start working on the request right away. The server responses may come back in any arbitrary order; the Future object link responses to their corresponding requests.

Synchronous requests don't overlap, but asynchronous ones can.
Synchronous vs. Async requests

The total (real) time spent in the async version is roughly equal to the maximum time across the operations. The total time spent in the synchronous version exceeds the sum of the operation times. If you can run more operations in parallel, then async operations help more.

To see how long your application's queries take or how many I/O operations it does per request, consider using Appstats. This tool can show charts similar to the drawing above based on instrumentation of a live app.

Using Tasklets

An NDB tasklet is a piece of code that might run concurrently with other code. Tasklets are a way to write concurrent functions without threads; tasklets are executed by an event loop and can suspend themselves blocking for I/O or some other operation using a yield statement. The notion of a blocking operation is abstracted into the Future class, but a tasklet may also yield an RPC in order to wait for that RPC to complete.

An application can use tasklets for finer control over asynchronous APIs. As an example, consider the following schema:

class Account(ndb.Model):
  email = ndb.StringProperty()
  nickname = ndb.StringProperty()

  def nick(self):
    return self.nickname or self.email # Whichever is non-empty

class Message(ndb.Model):
  text = ndb.StringProperty()
  when = ndb.DateTimeProperty()
  author = ndb.KeyProperty(kind=Account) # references Account

When displaying a message, it makes sense to show the author's nickname. The "synchronous" way to fetch the data to show a list of messages might look like this:

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
  acct = msg.author.get()
  self.response.out.write('<p>On %s, %s wrote:' % (msg.when, acct.nick()))
  self.response.out.write('<p>%s' % msg.text)

Unfortunately, this approach is inefficient. If you looked at it in Appstats, you would see that the "Get" requests are in series. You might see the following "staircase" pattern.

Synchronous "Gets" occur in series
Synchronous "Gets" occur in series.

This part of the program would be faster if those "Gets" could overlap. You might rewrite the code to use get_async, but it is tricky to keep track of which async requests and messages belong together.

The application can define its own "async" function by making it a tasklet. This allows you to organize the code in a less-confusing way. To do this,

  • decorate the function with @ndb.tasklet, and
  • have the function "return" its return value with raise ndb.Return(retval)

Furthermore, instead of using acct = ndb.get(...) or acct = ndb.get_async(...).get_result(), the function should use acct = yield ndb.get_async(...). This yield tells NDB that this is a good place to suspend this tasklet and let other tasklets run.

Decorating a generator function with @ndb.tasklet makes the function return a Future instead of a generator object.

For example:

@ndb.tasklet
def callback(msg):
  acct = yield ndb.get_async(msg.author)
  raise tasklet.Return('On %s, %s wrote:\n%s' % (msg.when, acct.nick(), msg.body))

qry = Messages.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
  print output

The map() calls callback() several times. But the yield ..._async() in callback() lets NDB's scheduler send off many async requests before waiting for any of them to finish.

Overlapping Async "Gets"
Overlapping Async "Gets"

If you look at this in Appstats, you might be surprised to see that these multiple Gets don't just overlap—they all go through in the same request. NDB implements an "autobatcher." The autobatcher bundles multiple requests up in a single batch RPC to the server; it does this in such a way that as long as there is more work to do (another callback may run) it collects keys. As soon as one of the results is needed, the autobatcher sends the batch RPC.

Tasklets, Parallel Queries, Parallel Yield

You can use tasklets so that multiple queries fetch records at the same time. For example, suppose your application has a page that displays the contents of a shopping cart and a list of special offers. The schema might look like this:

class Account(ndb.Model):
  ...

class InventoryItem(ndb.Model):
  name = ndb.StringProperty()
  ...

class CartItem(ndb.Model):
  account = ndb.KeyProperty(kind=Account)
  inventory = ndb.KeyProperty(kind=InventoryItem)
  quantity = ndb.IntegerProperty()
  ...

class SpecialOffer(ndb.Model):
  inventory = ndb.KeyProperty(kind=InventoryItem)

A "synchronous" function that gets shopping cart items and special offers might look like the following:

def get_cart_plus_offers(acct):
  cart = CartItem.query(CartItem.account == acct.key).fetch()
  offers = SpecialOffer.query().fetch(10)
  ndb.get_multi(