Google Code offered in: English - Español - 日本語 - 한국어 - Português - Pусский - 中文(简体) - 中文(繁體)
Joe Gregorio
October 2008, updated October 2011
This is part four of a five-part series on effectively scaling your App Engine-based apps. To see the other articles in the series, see Related links.
When developing an efficient application on Google App Engine, you need to pay attention to how often an entity is updated. While App Engine's datastore scales to support a huge number of entities, it is important to note that you can only expect to update any single entity or entity group about five times a second. That is an estimate and the actual update rate for an entity is dependent on several attributes of the entity, including how many properties it has, how large it is, and how many indexes need updating. While a single entity or entity group has a limit on how quickly it can be updated, App Engine excels at handling many parallel requests distributed across distinct entities, and we can take advantage of this by using sharding.
The question is, what if you had an entity that you wanted to update faster than five times a second? For example, you might count the number of votes in a poll, the number of comments, or even the number of visitors to your site. Take this simple example:
class Counter(db.Model): count = db.IntegerProperty()
@PersistenceCapable(identityType = IdentityType.APPLICATION) public class Counter { @PrimaryKey @Persistent(valueStrategy = IdGeneratorStrategy.IDENTITY) private Long id; @Persistent private Integer count; public Long getId() { return id; } public Integer getCount() { return count; } // ... }
type Counter struct { Count int }
If you had a single entity that was the counter and the update rate
was too fast, then you would have contention as the serialized
writes would stack up and start to timeout. The way to solve this
problem is a little counter-intuitive if you are coming from a
relational database; the solution relies on the fact that reads
from the App Engine datastore are extremely fast and cheap since
entities that have been recently read or updated are cached in
memory. The way to reduce the contention is to build a sharded
counter – break the counter up into N
different counters. When you
want to increment the counter, you pick one of the shards at random
and increment it. When you want to know the total count,
you read all of the counter shards and sum up their individual counts. The
more shards you have, the higher the throughput you will have for
increments on your counter. This technique works for a lot more
than just counters and an important skill to learn is spotting the
entities in your application with a lot of writes and then finding
good ways to shard them.
Here is a very simple implementation of a sharded counter:
from google.appengine.ext import db import random class SimpleCounterShard(db.Model): """Shards for the counter""" count = db.IntegerProperty(required=True, default=0) NUM_SHARDS = 20 def get_count(): """Retrieve the value for a given sharded counter.""" total = 0 for counter in SimpleCounterShard.all(): total += counter.count return total def increment(): """Increment the value for a given sharded counter.""" def txn(): index = random.randint(0, NUM_SHARDS - 1) shard_name = "shard" + str(index) counter = SimpleCounterShard.get_by_key_name(shard_name) if counter is None: counter = SimpleCounterShard(key_name=shard_name) counter.count += 1 counter.put() db.run_in_transaction(txn)
import com.google.appengine.api.datastore.DatastoreService; import com.google.appengine.api.datastore.DatastoreServiceFactory; import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.datastore.EntityNotFoundException; import com.google.appengine.api.datastore.Key; import com.google.appengine.api.datastore.KeyFactory; import com.google.appengine.api.datastore.Query; import com.google.appengine.api.datastore.Transaction; import java.util.Random; /** * This initial implementation simply counts all instances of the * SimpleCounterShard kind in the datastore. The only way to increment the * number of shards is to add another shard by creating another entity in the * datastore. */ public class ShardedCounterV1 { private static final DatastoreService ds = DatastoreServiceFactory .getDatastoreService(); /** * Default number of shards. */ private static final int NUM_SHARDS = 20; /** * A random number generator, for distributing writes across shards. */ private final Random generator = new Random(); /** * Retrieve the value of this sharded counter. * * @return Summed total of all shards' counts */ public long getCount() { long sum = 0; Query query = new Query("SimpleCounterShard"); for (Entity e : ds.prepare(query).asIterable()) { sum += (Long) e.getProperty("count"); } return sum; } /** * Increment the value of this sharded counter. */ public void increment() { int shardNum = generator.nextInt(NUM_SHARDS); Key shardKey = KeyFactory.createKey("SimpleCounterShard", Integer.toString(shardNum)); Transaction tx = ds.beginTransaction(); Entity shard; try { shard = ds.get(tx, shardKey); long count = (Long) shard.getProperty("count"); shard.setUnindexedProperty("count", count + 1L); } catch (EntityNotFoundException e) { shard = new Entity(shardKey); shard.setUnindexedProperty("count", 1L); } ds.put(tx, shard); tx.commit(); } }
import PMF; import java.util.List; import java.util.Random; import javax.jdo.PersistenceManager; /** * This initial implementation simply counts all instances of the * SimpleCounterShard class in the datastore. The only way to increment the * number of shards is to add another shard by creating another entity in the * datastore */ public class ShardedCounter { private static final int NUM_SHARDS = 20; /** * Retrieve the value of this sharded counter. * * @return Summed total of all shards' counts */ public int getCount() { int sum = 0; PersistenceManager pm = PMF.get().getPersistenceManager(); try { String query = "select from " + SimpleCounterShard.class.getName(); List<SimpleCounterShard> shards = (List<SimpleCounterShard>) pm.newQuery(query).execute(); if (shards != null && !shards.isEmpty()) { for (SimpleCounterShard shard : shards) { sum += shard.getCount(); } } } finally { pm.close(); } return sum; } /** * Increment the value of this sharded counter. */ public void increment() { PersistenceManager pm = PMF.get().getPersistenceManager(); Random generator = new Random(); int shardNum = generator.nextInt(NUM_SHARDS); try { Query shardQuery = pm.newQuery(SimpleCounterShard.class); shardQuery.setFilter("shardNumber == numParam"); shardQuery.declareParameters("int numParam"); List<SimpleCounterShard> shards = (List<SimpleCounterShard>) shardQuery.execute(shardNum); SimpleCounterShard shard; // If the shard with the passed shard number exists, increment its count // by 1. Otherwise, create a new shard object, set its count to 1, and // persist it. if (shards != null && !shards.isEmpty()) { shard = shards.get(0); shard.setCount(shard.getCount() + 1); } else { shard = new SimpleCounterShard(); shard.setShardNumber(shardNum); shard.setCount(1); } pm.makePersistent(shard); } finally { pm.close(); } } }
package sharded_counter import ( "appengine" "appengine/datastore" "fmt" "os" "rand" ) type simpleCounterShard struct { Count int } const ( numShards = 20 shardKind = "SimpleCounterShard" ) // Count retrieves the value of the counter. func Count(c appengine.Context) (int, os.Error) { total := 0 q := datastore.NewQuery(shardKind) for t := q.Run(c); ; { var s simpleCounterShard _, err := t.Next(&s) if err == datastore.Done { break } if err != nil { return total, err } total += s.Count } return total, nil } // Increment increments the counter. func Increment(c appengine.Context) os.Error { return datastore.RunInTransaction(c, func(c appengine.Context) os.Error { shardName := fmt.Sprintf("shard%d", rand.Intn(numShards)) key := datastore.NewKey(c, shardKind, shardName, 0, nil) var s simpleCounterShard err := datastore.Get(c, key, &s) // A missing entity and a present entity will both work. if err != nil && err != datastore.ErrNoSuchEntity { return err } s.Count++ _, err = datastore.Put(c, key, &s) return err }, nil) }
In get_count()
(Python), getCount()
(Java) and Count()
(Go),
we simply loop over all the shards and add up
the individual shard counts. In increment()
(Python and Java) and Increment()
(Go),
we choose a shard at random and then read,
increment, and write it back to the datastore.
Note that we create the shards lazily, only creating them when they
are first incremented. The lazy creation of the shards allows the
number of shards to be increased (but never decreased) in the
future if more are needed. The value of NUM_SHARDS
/numShards
could be doubled
and the results from get_count()
/Count()
would not change since the
query only selects the shards that have been added to the
datastore, and increment()
/Increment()
will lazily create shards that aren't
there.
That is useful as an example to learn from, but a more general purpose counter would allow you to create named counters on the fly, increase the number of shards dynamically, and use memcache to speed up reads to shards. The exampe code that Brett Slatkin gave in his Google I/O talk does just that and I've included that code here, along with a function to increase the number of shards for a particular counter:
from google.appengine.api import memcache from google.appengine.ext import db import random class GeneralCounterShardConfig(db.Model): """Tracks the number of shards for each named counter.""" name = db.StringProperty(required=True) num_shards = db.IntegerProperty(required=True, default=20) class GeneralCounterShard(db.Model): """Shards for each named counter""" name = db.StringProperty(required=True) count = db.IntegerProperty(required=True, default=0) def get_count(name): """Retrieve the value for a given sharded counter. Parameters: name - The name of the counter """ total = memcache.get(name) if total is None: total = 0 for counter in GeneralCounterShard.all().filter('name = ', name): total += counter.count memcache.add(name, total, 60) return total def increment(name): """Increment the value for a given sharded counter. Parameters: name - The name of the counter """ config = GeneralCounterShardConfig.get_or_insert(name, name=name) def txn(): index = random.randint(0, config.num_shards - 1) shard_name = name + str(index) counter = GeneralCounterShard.get_by_key_name(shard_name) if counter is None: counter = GeneralCounterShard(key_name=shard_name, name=name) counter.count += 1 counter.put() db.run_in_transaction(txn) # does nothing if the key does not exist memcache.incr(name) def increase_shards(name, num): """Increase the number of shards for a given sharded counter. Will never decrease the number of shards. Parameters: name - The name of the counter num - How many shards to use """ config = GeneralCounterShardConfig.get_or_insert(name, name=name) def txn(): if config.num_shards < num: config.num_shards = num config.put() db.run_in_transaction(txn)
import com.google.appengine.api.datastore.DatastoreService; import com.google.appengine.api.datastore.DatastoreServiceFactory; import com.google.appengine.api.datastore.Entity; import com.google.appengine.api.datastore.EntityNotFoundException; import com.google.appengine.api.datastore.Key; import com.google.appengine.api.datastore.KeyFactory; import com.google.appengine.api.datastore.Query; import com.google.appengine.api.datastore.Transaction; import com.google.appengine.api.memcache.Expiration; import com.google.appengine.api.memcache.MemcacheService; import com.google.appengine.api.memcache.MemcacheService.SetPolicy; import com.google.appengine.api.memcache.MemcacheServiceFactory; import java.util.Random; /** * A counter which can be incremented rapidly. * * Capable of incrementing the counter and increasing the number of shards. When * incrementing, a random shard is selected to prevent a single shard from being * written too frequently. If increments are being made too quickly, increase * the number of shards to divide the load. Performs datastore operations using * the low level datastore API. */ public class ShardedCounter { /** * Convenience class which contains constants related to a named sharded * counter. The counter name provided in the constructor is used as * the entity key. */ private static final class Counter { /** * Entity kind representing a named sharded counter. */ private static final String KIND = "Counter"; /** * Property to store the number of shards in a given {@value #KIND} named * sharded counter. */ private static final String SHARD_COUNT = "shard_count"; } /** * Convenience class which contains constants related to the counter shards. * The shard number (as a String) is used as the entity key. */ private static final class CounterShard { /** * Entity kind prefix, which is concatenated with the counter name to form * the final entity kind, which represents counter shards. */ private static final String KIND_PREFIX = "CounterShard_"; /** * Property to store the current count within a counter shard. */ private static final String COUNT = "count"; } private static final DatastoreService ds = DatastoreServiceFactory .getDatastoreService(); /** * Default number of shards. */ private static final int INITIAL_SHARDS = 5; /** * The name of this counter. */ private final String counterName; /** * A random number generating, for distributing writes across shards. */ private final Random generator = new Random(); /** * The counter shard kind for this counter. */ private String kind; private final MemcacheService mc = MemcacheServiceFactory .getMemcacheService(); /** * Constructor which creates a sharded counter using the provided counter * name. * * @param counterName name of the sharded counter */ public ShardedCounter(String counterName) { this.counterName = counterName; kind = CounterShard.KIND_PREFIX + counterName; } /** * Increase the number of shards for a given sharded counter. Will never * decrease the number of shards. * * @param count Number of new shards to build and store */ public void addShards(int count) { Key counterKey = KeyFactory.createKey(Counter.KIND, counterName); incrementPropertyTx(counterKey, Counter.SHARD_COUNT, count, INITIAL_SHARDS + count); } /** * Retrieve the value of this sharded counter. * * @return Summed total of all shards' counts */ public long getCount() { Long value = (Long) mc.get(kind); if (value != null) { return value; } long sum = 0; Query query = new Query(kind); for (Entity shard : ds.prepare(query).asIterable()) { sum += (Long) shard.getProperty(CounterShard.COUNT); } mc.put(kind, sum, Expiration.byDeltaSeconds(60), SetPolicy.ADD_ONLY_IF_NOT_PRESENT); return sum; } /** * Increment the value of this sharded counter. */ public void increment() { // Find how many shards are in this counter. int numShards = getShardCount(); // Choose the shard randomly from the available shards. long shardNum = generator.nextInt(numShards); Key shardKey = KeyFactory.createKey(kind, Long.toString(shardNum)); incrementPropertyTx(shardKey, CounterShard.COUNT, 1, 1); mc.increment(kind, 1); } /** * Get the number of shards in this counter. * * @return shard count */ private int getShardCount() { try { Key counterKey = KeyFactory.createKey(Counter.KIND, counterName); Entity counter = ds.get(counterKey); Long shardCount = (Long) counter.getProperty(Counter.SHARD_COUNT); return shardCount.intValue(); } catch (EntityNotFoundException ignore) { return INITIAL_SHARDS; } } /** * Increment datastore property value inside a transaction. If the entity with * the provided key does not exist, instead create an entity with the supplied * initial property value. * * @param key the entity key to update or create * @param prop the property name to be incremented * @param increment the amount by which to increment * @param initialValue the value to use if the entity does not exist */ private void incrementPropertyTx(Key key, String prop, long increment, long initialValue) { Transaction tx = ds.beginTransaction(); Entity thing; long value; try { thing = ds.get(tx, key); value = (Long) thing.getProperty(prop) + increment; } catch (EntityNotFoundException e) { thing = new Entity(key); value = initialValue; } thing.setUnindexedProperty(prop, value); ds.put(tx, thing); tx.commit(); } }
import PMF; import java.util.List; import java.util.Random; import javax.jdo.PersistenceManager; import javax.jdo.Query; /** * A counter which can be incremented rapidly. * * Capable of incrementing the counter and increasing the number of shards. * When incrementing, a random shard is selected to prevent a single shard * from being written to too frequently. If increments are being made too * quickly, increase the number of shards to divide the load. Performs * datastore operations using JDO. */ public class ShardedCounter { private String counterName; public ShardedCounter(String counterName) { this.counterName = counterName; } public String getCounterName() { return counterName; } /** * Retrieve the value of this sharded counter. * * @return Summed total of all shards' counts */ public int getCount() { int sum = 0; PersistenceManager pm = PMF.get().getPersistenceManager(); try { Query shardsQuery = pm.newQuery(GeneralCounterShard.class, "counterName == nameParam"); shardsQuery.declareParameters("String nameParam"); List<GeneralCounterShard> shards = (List<GeneralCounterShard>) shardsQuery.execute(counterName); if (shards != null && !shards.isEmpty()) { for (GeneralCounterShard current : shards) { sum += current.getCount(); } } } finally { pm.close(); } return sum; } /** * Increment the value of this sharded counter. */ public void increment() { PersistenceManager pm = PMF.get().getPersistenceManager(); // Find how many shards are in this counter. int shardCount = 0; try { Counter current = getThisCounter(pm); shardCount = current.getShardCount(); } finally { pm.close(); } // Choose the shard randomly from the available shards. Random generator = new Random(); int shardNum = generator.nextInt(shardCount); pm = PMF.get().getPersistenceManager(); try { Query randomShardQuery = pm.newQuery(GeneralCounterShard.class); randomShardQuery.setFilter( "counterName == nameParam && shardNumber == numParam"); randomShardQuery.declareParameters("String nameParam, int numParam"); List<GeneralCounterShard> shards = (List<GeneralCounterShard>) randomShardQuery.execute(counterName, shardNum); if (shards != null && !shards.isEmpty()) { GeneralCounterShard shard = shards.get(0); shard.increment(1); pm.makePersistent(shard); } } finally { pm.close(); } } /** * Increase the number of shards for a given sharded counter. * Will never decrease the number of shards. * * @param count Number of new shards to build and store * @return Total number of shards */ public int addShards(int count) { PersistenceManager pm = PMF.get().getPersistenceManager(); // Find the initial shard count for this counter. int numShards = 0; try { Counter current = getThisCounter(pm); if (current != null) { numShards = current.getShardCount().intValue(); current.setShardCount(numShards + count); // Save the increased shard count for this counter. pm.makePersistent(current); } } finally { pm.close(); } // Create new shard objects for this counter. pm = PMF.get().getPersistenceManager(); try { for (int i = 0; i < count; i++) { GeneralCounterShard newShard = new GeneralCounterShard(getCounterName(), numShards); pm.makePersistent(newShard); numShards++; } } finally { pm.close(); } return numShards; } /** * @return Counter datastore object matching this object's counterName value */ private Counter getThisCounter(PersistenceManager pm) { Counter current = null; Query thisCounterQuery = pm.newQuery(Counter.class, "counterName == nameParam"); thisCounterQuery.declareParameters("String nameParam"); List<Counter> counter = (List<Counter>) thisCounterQuery.execute(counterName); if (counter != null && !counter.isEmpty()) { current = counter.get(0); } return current; } }
package sharded_counter import ( "appengine" "appengine/datastore" "appengine/memcache" "fmt" "os" "rand" ) type counterConfig struct { Shards int } type shard struct { Name string Count int } const ( defaultShards = 20 configKind = "GeneralCounterShardConfig" shardKind = "GeneralCounterShard" ) func memcacheKey(name string) string { return shardKind + ":" + name } // Count retrieves the value of the named counter. func Count(c appengine.Context, name string) (int, os.Error) { total := 0 mkey := memcacheKey(name) if _, err := memcache.JSON.Get(c, mkey, &total); err == nil { return total, nil } q := datastore.NewQuery(shardKind).Filter("Name =", name) for t := q.Run(c); ; { var s shard _, err := t.Next(&s) if err == datastore.Done { break } if err != nil { return total, err } total += s.Count } memcache.JSON.Set(c, &memcache.Item{ Key: mkey, Object: &total, Expiration: 60, }) return total, nil } // Increment increments the named counter. func Increment(c appengine.Context, name string) os.Error { // Get counter config. var cfg counterConfig ckey := datastore.NewKey(c, configKind, name, 0, nil) err := datastore.RunInTransaction(c, func(c appengine.Context) os.Error { err := datastore.Get(c, ckey, &cfg) if err == datastore.ErrNoSuchEntity { cfg.Shards = defaultShards _, err = datastore.Put(c, ckey, &cfg) } return err }) if err != nil { return err } err = datastore.RunInTransaction(c, func(c appengine.Context) os.Error { shardName := fmt.Sprintf("shard%d", rand.Intn(cfg.Shards)) key := datastore.NewKey(c, shardKind, shardName, 0, nil) var s shard err := datastore.Get(c, key, &s) // A missing entity and a present entity will both work. if err != nil && err != datastore.ErrNoSuchEntity { return err } s.Count++ _, err = datastore.Put(c, key, &s) return err }, nil) if err != nil { return err } memcache.Increment(c, memcacheKey(name), 1, 0) return nil } // IncreaseShards increases the number of shards for the named counter to n. // It will never decrease the number of shards. func IncreaseShards(c appengine.Context, name string, n int) os.Error { ckey := datastore.NewKey(c, configKind, name, 0, nil) return datastore.RunInTransaction(c, func(c appengine.Context) os.Error { var cfg counterConfig mod := false err := datastore.Get(c, ckey, &cfg) if err == datastore.ErrNoSuchEntity { cfg.Shards = defaultShards mod = true } else if err != nil { return err } if cfg.Shards < n { cfg.Shards = n mod = true } if mod { _, err = datastore.Put(c, ckey, &cfg) } return err }, nil) }
The Python source for both counters described above is available in the
Google App Engine Samples project as sharded-counters
.
The Java source is available in the demos
directory of the
Google App
Engine SDK project as shardedcounter
, or in the
Google App Engine Samples project as sharded-counters-java
.
While the web interface to the examples
isn't much to look at, it's instructive to use the admin interface
an inspect the data models after you have incremented both counters
a few times.
Sharding is one of many important techniques in building a scalable application and hopefully these examples will give you ideas of where you apply the technique in your application. The code in these articles is available under the Apache 2 license so feel free to start with them as you build your solutions.
Watch Brett Slatkin's Google I/O talk "Building Scalable Web Applications with Google AppEngine".