December 14, 2016

[edit] I’ve linked to the HN thread (at the bottom) and also mentioned that we’re hiring (also at the bottom).

How Bazaarvoice solved data denormalization

I recently read Liron Shapira’s really insightful blog article called Data denormalization is broken” (https://hackernoon.com/data-denormalization-is-broken-7b697352f405#.24urwx4e2).

He starts:

Backend engineers can’t write elegant and performant application-layer code for many types of everyday business logic. That’s because no one has yet invented a denormalization engine”, a database with a more general kind of indexer.

I couldn’t agree more, and his article is extremely well reasoned and clearly presented. I highly recommend reading it.

At Bazaarvoice, we have a decentralized team structure, in which separate product lines, services, etc. are maintained by small, focused teams.

Many of these teams are operating at a scale that requires them to:

  1. use at least one search index to answer production queries
  2. include some up-front denormalizations to optimize the queries

We started a replatforming effort 5 or 6 years ago, and that gave us the opportunity to revisit many data-related problems, among them the denormalization one. When I say that the system we made solves” denormalization, I mean that, yes, it correctly and efficiently computes them, but more importantly that you configure it with a simple spec.

A little background

polloi

Just for context, the denormalization system I’m describing is called Polloi. This is derived from the phrase hoi polloi”, or the populace”.

The idea is to democratize the construction of denormalized indices.

I.e., we wanted to make denormalization easy enough that teams within Bazaarvoice will be empowered to customize and optimize their own indices. Otherwise, if it’s too much effort, this responsilibility will naturally trend back toward a central team, which will hurt the business’s agility in the long run.

streaming data

Bazaarvoice has rebuilt itself on a streaming data backend. We use a distributed database that provides a data bus”, which lets users create subscriptions to data changes. (FYI: the Cassandra Summit talk)

But Polloi will work with any data stream.

Motivating example

Bazaarvoice’s primary domain is user reviews and Q&A. We collect this content on products, and the products are organized into categories.

Here’s an amusingly simple version of our data model:

a crude data model diagrama crude data model diagram

Review

[{
  "type": "review",
  "id":   "r1",
  "product": {"id":"p3"},
  "status": "APPROVED",
  "rating": 1,
  "text":  "I hate this product. >:("
},
{
  "type": "review",
  "id":   "r9",
  "product": {"id":"p3"},
  "status": "APPROVED",
  "rating": 5,
  "text":  "This thing is the best thing ever. You changed my life!!!"
},
{
  "type": "review",
  "id":   "r13",
  "product": {"id":"p3"},
  "status": "REJECTED",
  "rating": 5,
  "text":  "Here, follow this spam link: http://****.  Also, check out this profanity: **** ** *** *****"
}]

Product

[{
  "type": "product",
  "id":   "p3",
  "name": "Some Kind of Thing (tm)",
  "description": "This is a product of some kind"
}]

For the sake of discussion, let’s say we want to support these use cases:

  • (uc1) query for reviews by product name
  • (uc2) show the average rating of a product over its approved reviews (alongside its other details)

In reality, our application has many more use cases than these to support. We also have an API, which supports some very different access patterns, and additionally, we have other products with totally different data and use cases, such as curated content from social media streams.

Also, this is all running at a pretty decent scale, so we are way beyond running these use cases against our main database.

Just to make the example concrete, we are generally indexing into Elasticsearch to support our queries: simple indexing

(uc1) requires this (pseudo)code:

products = search({ "query": { "match" : { "name" : "thing" } } })
PRODUCT_IDS = products.hits.map(product => product.id)
reviews = search({ "query": { "constant_score" : { "filter" : { "terms" : { "user" : PRODUCT_IDS } } } } })
return reviews

(uc2) requires this:

product = get("p3")
result = search({ "query": { "constant_score" : { "bool" : { "filter" : [ { "term" : { "product.id" : "p3" } },
                                                                          { "term" : { "status" : "APPROVED" } } ] } }
                  "aggregations": { "avgRating" : { "avg" : { "field" : "rating" } } }
                })
return (product, result.avgRating)

Both require two queries, and Elasticsearch makes them reasonably efficient. But at Bazaarvoice, it is common to be shooting for over 10K requests/sec and under 100ms/request, which would be hard to satisfy with this approach.

However, if we pull the product’s name onto the review and compute the average rating ahead of time, (uc1) can become a single query, and (uc2) can actually become a single super-fast get.

The performance requirements, in combination with the scale of the data and the volume of requests dictates that we will need to consider denormalization as a technique for speeding queries up.

Denormalize below the level of application

This leads us to the question of how we can compute the denormalizations.

Let’s start with the application logic that writes and modifies the data. Hypothetically, write-time is the perfect opportunity to also update denormalized values. But as Liron illustrates, this can become a nightmare for even a moderately complex application.

Beyond that, for Bazaarvoice, and for many other large companies, this is actually intractable, as data enters and leaves the company through a number of independent applications. It would be crazy for all data-writing applications to have to maintain denormalizations on behalf of data-querying apps.

materialized views

So we’re not going to denormalize in the application layer. What about in the database?

Many of the comments on Liron’s post point to materialized views in Oracle, DB2, SQL Server, and now, apparently, Postgres. Or, equivalently, they point out that you can implement materialized views / denormalization using triggers in MySQL. To this, I would add that a number of other databases provide equivalent functionality, for example, Cassandra 3+.

If you’re already using one of these, and you’re looking for denormalizations, you may as well try out the materialized view. However, I don’t think I agree that these are the solution” to denormalization. Primarily, there are many, many factors that play into the database you choose for your use case. You may be constrained by other factors and won’t be able to choose one that offers materialized views.

Even if your database offers a hook to compute denormalizations on the fly, you may find that it is too costly to use. For the Bazaarvoice applications I’m describing, they query out of a search index instead of a database. We figured to compute denormalized values, we could just hook into the change stream. Every time a document changed, a background process would re-compute the relevant denormalizations. But we quickly discovered that this caused our application’s queries to suffer (a lot). It depends on your read/write patterns how bad this is, but denormalization magnifies your write workload. In our case, we found that each write turned into ~60 denormalization queries. That’s not a nice thing to do either to your database or to your search index.

There is clearly a lot you can do to batch and throttle materialization workloads, but fundamentally we found that there was just too much competition between the materialization queries and the application ones.

Finally, returning to the separation of concerns, just as the application is most maintainable when it only has to worry about proper application concerns, the database or search index likewise will be most maintainable when it only needs to worry about its own concerns. And there are plenty: durability, availability, query performance, etc.

Our database team is focused on keeping a massive globally-distributed, multi-master, streaming data platform up and running and reasonably consistent, not to mention continuing development work on the database itself. They shouldn’t be distracted by ad-hoc requests as dozens of different teams constantly want to materialize different queries.

Conversely, those teams want to move fast to implement features and solve problems. They don’t want to coordinate with the database team to define denormalizations.

In other words, I’m convinced that at scale, you really want a separate system to run your denormalizations. And it needs to be under the control of the team who does the searches.

Therefore, it should be easy enough to create and operate denormalizers that you can create many, one for each use case.

Denormalize between your database and your application

Polloi basically consumes a stream of normalized data and produces a stream of denormalized data. As such, it will work with any source database and any destination whatsoever.

Sounds great! But what is Polloi like to use? What Liron wants (and what we all deserve) is a denormalizer that not just works, but that is easy to use.

For the record, I have some ideas for a revision of the Polloi rules language to make it even easier and more intuitive, but here’s a look at an example Polloi configuration from the Bazaarvoice domain.

Denormalizing with Polloi

Polloi is configured with two text files: a schema, and a rules file.

Schema

type        string
title       string
text        string
status      string
rating      integer
product     ref
name        string

The schema tells Polloi what attributes to include, either in the computations, or just in the output documents.

Rules

[type="review"]product = expand(product, name)
[type="product"]averageRating = average(_product[type="review",status="APPROVED"].rating)

The rules define the denormalizations, in this case, we add the product name to reviews and compute average ratings for products.

Without getting into too much detail,

  • expand lets you traverse a reference (in this case, product)
  • average … computes an average.
  • _product is an inverse reference. I.e., it matches all documents that have a product reference back to the document in question.
  • The expressions in square brackets are predicates. On the left side, they are filtering which rules apply to which types of documents, and on the right of _product, they filter which documents get included in the average computation.

At this point, we compile our code into a package and deploy it using an orchestration service we provide, which gives us a new denormalizing stream processor:

polloi as a denormalizing stream processor

That produces:

Review

[{
  "type": "review",
  "id":   "r1",
  "product": {"id":"p3", "name": "Some Kind of Thing (tm)"},
  "status": "APPROVED",
  "rating": 1,
  "text":  "I hate this product. >:("
},
{
  "type": "review",
  "id":   "r9",
  "product": {"id":"p3", "name": "Some Kind of Thing (tm)"},
  "status": "APPROVED",
  "rating": 5,
  "text":  "This thing is the best thing ever. You changed my life!!!"
},
{
  "type": "review",
  "id":   "r13",
  "product": {"id":"p3", "name": "Some Kind of Thing (tm)"},
  "status": "REJECTED",
  "rating": 5,
  "text":  "Here, follow this spam link: http://****.  Also, check out this profanity: **** ** *** *****"
}]

Product

[{
  "type": "product",
  "id":   "p3",
  "name": "Some Kind of Thing (tm)",
  "description": "This is a product of some kind",
  "averageRating": 3.0
}]

With this data in our index, here is the new code for our use cases:

(uc1)

return search({ "query": { "match" : { "product.name" : "thing" } } })

(uc2)

return get("p3")

To compare the performance more concretely, I ran some sample queries against a testing cluster that has all our data loaded. Just to simplify things, I’ll assume all use cases just fetch the first page of results. Here’s what I got:

use case normalized denormalized
(uc1) 1.3s + 0.5s for each returned product = 1.8s 1s
(uc2) 60ms for the get + 1s for the aggregation = 1.06s 60ms = 0.06s

It’s not scientific by any means, but it serves to illustrate the point.

Keep in mind that these results are against a cold, un-loaded test cluster. At this moment (10:30pm on a Friday), our production application is doing 11.2K queries per second with a 99th percentile response time of 120ms. There’s no way we could sustain that with just normalized data in our indices.

Denormalization democracy, realized

Bazaarvoice has been running its heaviest display use cases with Polloi for several years now. To sum up our experience, I would say that thinking of denormalization as an architectural problem is a small paradigm shift that really decouples projects and opens up a lot of scale and innovation possibilities.

Just to drive the point home, I’ll leave you with this diagram showing how multiple teams can define their own denormalizations, plug them into the company’s data stream, index their own optimized data, and power their applications:

streaming data architecture

This does not come without trade-offs, however. I’m planning to write more about the broader challenges and opportunities of operating a company on streaming data backend.

Let me know what you think! I’ve enabled comments on the blog, you can discuss on Hacker News (https://news.ycombinator.com/item?id=13176411), or you can shoot me an email (see $whoami).

Bazaarvoice is Hiring!

[edit] I initially forgot to mention this, but the Polloi team is looking for someone passionate about distributed systems, streaming data, and just generally solving hard problems with good engineering. Hit me up if you’re interested.


bazaarvoice data-stuff streaming polloi cqrs


Previous post
Efficient Command-Line Accounting with Ledger and Account-CLI Inspired by Andrew Cantino’s blog about accounting with Ledger and Reckon, I started this year out following his workflow. I’ve used MS MyMoney,
Next post
Introducing Super-Simple Workflow (SSWF) Last time, I told you about a cool Bazaarvoice project that you can’t get your hands on (yet). This time, I thought you’d like to hear about a