Aerospike Data Modeling Part 5: Bucketing

Aerospike has a maximum record size, which is not an uncommon restriction in a database. When using storage, the entire record is located consecutively, creating predictability as well as performance. Through embedding, we can model and store large, complex objects. Ultimately, as objects get larger, these are restricted by maximum record size as well as practical issues of performance. How do we deal with that?

Activity Streams

Let’s look at a simple, well-known example—an activity stream. This could be a stream of posts from people you follow on Twitter or posts in a Slack channel you subscribe to. This is an example of a time-series. Let’s start with a basic JSON schema of how we could model this:

msg: {
from: "Joe",
to: [ "Bob", "Jane" ],
sent_ts: 1470074748, 
message: "Something silly...",
}

How can we achieve a consolidated view based on one user’s view of their stream? There are many potential technical solutions, but let’s limit the discussion to the three basic patterns:

  1. Fan out on read
  2. Fan out on write
  3. Fan out on write with bucketing

Fan Out on Read

In this pattern, we write once to the sender record with the message and a list of recipients. If we then wanted to show all the messages a user has sent and received, how would we do that? Showing sent messages is simple, but showing messages received is complicated. Even if we create a secondary index on the recipients (the to field), we have to perform a scatter-gather query; this data design causes inefficiencies and will not scale well.

To summarize this pattern:

  • One record per message sent
  • Multiple recipients stored in an array in that record
  • Recipient list has a secondary index
  • Reading a stream for a user requires finding all messages with a matching recipient by executing a scatter-gather query across the cluster

Fan Out on Write

So how do we simplify and speed the read path so we can quickly gather all messages destined for a single user or group? Instead of writing once and reading many times through an index, we can write to each user’s record. While this involves substantially more writes, it speeds the read path. We are simply denormalizing the message to each recipient.

This technique can be used in addition to the fan out on read pattern explained above. By writing to both the sender and receiver’s user objects, we can gain read performance at a small incremental cost.

user: {
  name: "Bob",
  stream: [
    { from: "Joe",
      to: ["Bob", "Jane"],
      sent_ts: 1470074748,
      msg: "Something silly..."
    }
  ]
}

To make this example simpler to follow, we will simply write the message to each recipient. In reality, you may want to store the message text just once and add the message’s primary key to a list of the users, but that’s just an optimization of this basic pattern.

Here’s some Python code to show how we could cross-post the message to each of the recipients (and the sender) and then print the stream for a given user:

import aerospike
import os
import time
import math
import hashlib

config = { 'hosts': [(os.environ.get("AEROSPIKE_HOST", "127.0.01"), 3000)],
'policies': { 'key': aerospike.POLICY_KEY_SEND }
}
wpolicy = {'gen': aerospike.POLICY_GEN_EQ}

client = aerospike.client(config).connect()

# Part Two - Fan out on write
def post_msg_fanout(sent_by, to, msg):
recipients = to
recipients.append(sent_by)
post = {'msg': msg, 'from': sent_by, 'sent_ts': long(time.time())}
for recipient in recipients:
client.list_insert(("test", "msgs", recipient), "stream", 0, post)

def get_inbox_fanout(user):
(key, meta, record) = client.get(("test", "msgs", "Jane"))
return record['stream']

# Send message
post_msg_fanout("Joe", ["Bob", "Jane"], "Silly message...")
post_msg_fanout("Jane", ["Bob"], "My 1st message...")
# Print the messages for "Jane"
messages = get_inbox_fanout("Jane")
for msg in messages:
print('{0}>> {1}'.format(msg['from'], msg['msg']))

When you run the code, you will see the following output:

>>> # Send message
... post_msg_fanout("Joe", ["Bob", "Jane"], "Silly message...")
>>> post_msg_fanout("Jane", ["Bob"], "My 1st message...")
>>> # Print the messages for "Jane"
... messages = get_inbox_fanout("Jane")
>>> for msg in messages:
... print('{0}>> {1}'.format(msg['from'], msg['msg']))
... 
Jane>> My 1st message...
Joe>> Silly message...

The post_msg_fanout function adds the message to the sender stream and to each of the recipients. The get_inbox_fanout function simply has to query the user’s record to get the stream of activity.

So, to summarize this pattern:

  • Add the record to the sender’s sent list
  • Add the record to each receiver’s received list
  • Reading an inbox requires one primary key lookup
  • Total messages (sent and received) are limited by the maximum record size

Fan Out on Write with Bucketing

Since we want to avoid record limits, we want to slice or bucket a set of the messages and encapsulate these within a single record. This could be a message count, time period (e.g., day or week), or any other criteria that makes sense for the use case. To summarize:

  • Each msg record contains an array of messages for the user
  • Posting a new message is added onto an array of messages for each recipient
  • Bucket msg record so there’s not too many per record

Let’s see how we do that with this Python example:

def create_user(user):
 client.put(("test", "users", user), {'total':0})

def calc_bucket(count):
 # Bucket size is 3, just to make testing easier
 return int(math.floor(count/3)) + 1

def post_msg_bucketing(sent_by, to, msg):
 recipients = to
 recipients.append(sent_by)
 post = {'msg': msg, 'from': sent_by, 'sent_ts': long(time.time())}
 for recipient in recipients:
 (key, meta, record) = client.get(("test", "users", recipient))
 count = record['total'] +1
 client.increment(key, "total", 1, meta, wpolicy)
 # Bucket size is 3 messages
 bucket_key = {'user': recipient, 'seq': calc_bucket(count)}
 h = hashlib.new("ripemd160")
 h.update(str(bucket_key))
 client.list_insert(("test", "msgs", h.hexdigest()), "stream", 0, post)

def get_inbox_bucketing(user):
 messages = []
 (key, meta, record) = client.get(("test", "users", user))
 # Find all the buckets based on the total messages received
 for i in range(calc_bucket(record['total']), 0, -1):
 bucket_key = {'user': user, 'seq': i}
 h = hashlib.new("ripemd160")
 h.update(str(bucket_key))
 (key, meta, record) = client.get(("test", "msgs", h.hexdigest()))
 messages.extend(record['stream'])
 return messages


# Post some messages
create_user("Joe")
create_user("Bob")
create_user("Jane")
post_msg_bucketing("Joe", ["Bob", "Jane"], "Silly message...")
post_msg_bucketing("Jane", ["Joe"], "My 1st message...")
post_msg_bucketing("Jane", ["Joe"], "My 2nd message...")
post_msg_bucketing("Jane", ["Joe"], "My 3rd message...")
# Get a users inbox
messages = get_inbox_bucketing("Jane")
for msg in messages:
 print('{0}>> {1}'.format(msg['from'], msg['msg']))

Looking at this code, the function calc_bucket is used to determine the slice that the message will be placed in. This function could decide on any criterion that suits the use case; however, in this case, we are slicing by message count. To make testing easier, we are implementing a size limit of three.

When we reconstruct the complete stream for the user, we want to directly access all the associated buckets and avoid doing a secondary index scan. We do this by creating a compound key of the user and bucket number (see Data Modeling Part 2). Using a hash function (RIPEMD-160 in this case), we get a consistent key, which we use as the primary key for the bucket. New messages are added to the front of the stream list by using list_insert and an index position of zero, i.e., the head of the list.

When the inbox is reassembled in the get_inbox_bucketing function, we iterate from the most recent bucket backwards so that the messages list is constructed in reverse chronological order. This is how most users would want to see the information—most recent first. On a typical web page, the full history is not presented at first; the user is typically asked to paginate through the stream, so the buckets could be retrieved one by one as needed.

Running the code, you will see the following output:

>>> # Post some messages
... create_user("Joe")
>>> create_user("Bob")
>>> create_user("Jane")
>>> post_msg_bucketing("Joe", ["Bob", "Jane"], "Silly message...")
>>> post_msg_bucketing("Jane", ["Joe"], "My 1st message...")
>>> post_msg_bucketing("Jane", ["Joe"], "My 2nd message...")
>>> post_msg_bucketing("Jane", ["Joe"], "My 3rd message...")
>>> # Get a users inbox
... messages = get_inbox_bucketing("Jane")
>>> for msg in messages:
... print('{0}>> {1}'.format(msg['from'], msg['msg']))
... 
Jane>> My 3rd message...
Jane>> My 2nd message...
Jane>> My 1st message...
Joe>> Silly message...

To summarize this pattern:

  • One copy of the message per recipient
  • The message stream is segmented or bucketed by a criteria
  • Bucketing criteria are used as part of a compound key
  • Reading an inbox requires several primary key lookups

Summary

As we can see, the bucketing principle can be applied to many domains and use cases when you need to deal with a long history that will not fit into a single record. Whether you slice by data size, volume, date, or some other criteria, this pattern can assist in time-series, streams, and many other use cases.

In the next section, we will examine the classic RDBMS problem of a debit/credit transaction.

References

Code samples can be found in Github.