Add Sales Rank solution

This commit is contained in:
Donne Martin 2017-03-04 21:06:17 -08:00
parent d484f80b18
commit edc404ec5c
5 changed files with 415 additions and 0 deletions

View File

@ -0,0 +1,338 @@
# Design Amazon's sales rank by category feature
*Note: This document links directly to relevant areas found in the [system design topics](https://github.com/donnemartin/system-design-primer-interview#index-of-system-design-topics-1) to avoid duplication. Refer to the linked content for general talking points, tradeoffs, and alternatives.*
## Step 1: Outline use cases and constraints
> Gather requirements and scope the problem.
> Ask questions to clarify use cases and constraints.
> Discuss assumptions.
Without an interviewer to address clarifying questions, we'll define some use cases and constraints.
### Use cases
#### We'll scope the problem to handle only the following use case
* **Service** calculates the past week's most popular products by category
* **User** views the past week's most popular products by category
* **Service** has high availability
#### Out of scope
* The general e-commerce site
* Design components only for calculating sales rank
### Constraints and assumptions
#### State assumptions
* Traffic is not evenly distributed
* Items can be in multiple categories
* Items cannot change categories
* There are no subcategories ie `foo/bar/baz`
* Results must be updated hourly
* More popular products might need to be updated more frequently
* 10 million products
* 1000 categories
* 1 billion transactions per month
* 100 billion read requests per month
* 100:1 read to write ratio
#### Calculate usage
**Clarify with your interviewer if you should run back-of-the-envelope usage calculations.**
* Size per transaction:
* `created_at` - 5 bytes
* `product_id` - 8 bytes
* `category_id` - 4 bytes
* `seller_id` - 8 bytes
* `buyer_id` - 8 bytes
* `quantity` - 4 bytes
* `total_price` - 5 bytes
* Total: ~40 bytes
* 40 GB of new transaction content per month
* 40 bytes per transaction * 1 billion transactions per month
* 1.44 TB of new transaction content in 3 years
* Assume most are new transactions instead of updates to existing ones
* 400 transactions per second on average
* 40,000 read requests per second on average
Handy conversion guide:
* 2.5 million seconds per month
* 1 request per second = 2.5 million requests per month
* 40 requests per second = 100 million requests per month
* 400 requests per second = 1 billion requests per month
## Step 2: Create a high level design
> Outline a high level design with all important components.
![Imgur](http://i.imgur.com/vwMa1Qu.png)
## Step 3: Design core components
> Dive into details for each core component.
### Use case: Service calculates the past week's most popular products by category
We could store the raw **Sales API** server log files on a managed **Object Store** such as Amazon S3, rather than managing our own distributed file system.
**Clarify with your interviewer how much code you are expected to write**.
We'll assume this is a sample log entry, tab delimited:
```
timestamp product_id category_id qty total_price seller_id buyer_id
t1 product1 category1 2 20.00 1 1
t2 product1 category2 2 20.00 2 2
t2 product1 category2 1 10.00 2 3
t3 product2 category1 3 7.00 3 4
t4 product3 category2 7 2.00 4 5
t5 product4 category1 1 5.00 5 6
...
```
The **Sales Rank Service** could use **MapReduce**, using the **Sales API** server log files as input and writing the results to an aggregate table `sales_rank` in a **SQL Database**. We should discuss the [use cases and tradeoffs between choosing SQL or NoSQL](https://github.com/donnemartin/system-design-primer-interview#sql-or-nosql).
We'll use a multi-step **MapReduce**:
* **Step 1** - Transform the data to `(category, product_id), sum(quantity)`
* **Step 2** - Perform a distributed sort
```
class SalesRanker(MRJob):
def within_past_week(self, timestamp):
"""Return True if timestamp is within past week, False otherwise."""
...
def mapper(self, _ line):
"""Parse each log line, extract and transform relevant lines.
Emit key value pairs of the form:
(category1, product1), 2
(category2, product1), 2
(category2, product1), 1
(category1, product2), 3
(category2, product3), 7
(category1, product4), 1
"""
timestamp, product_id, category_id, quantity, total_price, seller_id, \
buyer_id = line.split('\t')
if self.within_past_week(timestamp):
yield (category_id, product_id), quantity
def reducer(self, key, value):
"""Sum values for each key.
(category1, product1), 2
(category2, product1), 3
(category1, product2), 3
(category2, product3), 7
(category1, product4), 1
"""
yield key, sum(values)
def mapper_sort(self, key, value):
"""Construct key to ensure proper sorting.
Transform key and value to the form:
(category1, 2), product1
(category2, 3), product1
(category1, 3), product2
(category2, 7), product3
(category1, 1), product4
The shuffle/sort step of MapReduce will then do a
distributed sort on the keys, resulting in:
(category1, 1), product4
(category1, 2), product1
(category1, 3), product2
(category2, 3), product1
(category2, 7), product3
"""
category_id, product_id = key
quantity = value
yield (category_id, quantity), product_id
def reducer_identity(self, key, value):
yield key, value
def steps(self):
"""Run the map and reduce steps."""
return [
self.mr(mapper=self.mapper,
reducer=self.reducer),
self.mr(mapper=self.mapper_sort,
reducer=self.reducer_identity),
]
```
The result would be the following sorted list, which we could insert into the `sales_rank` table:
```
(category1, 1), product4
(category1, 2), product1
(category1, 3), product2
(category2, 3), product1
(category2, 7), product3
```
The `sales_rank` table could have the following structure:
```
id int NOT NULL AUTO_INCREMENT
category_id int NOT NULL
total_sold int NOT NULL
product_id int NOT NULL
PRIMARY KEY(id)
FOREIGN KEY(category_id) REFERENCES Categories(id)
FOREIGN KEY(product_id) REFERENCES Products(id)
```
We'll create an [index](https://github.com/donnemartin/system-design-primer-interview#use-good-indices) on `id `, `category_id`, and `product_id` to speed up lookups (log-time instead of scanning the entire table) and to keep the data in memory. Reading 1 MB sequentially from memory takes about 250 microseconds, while reading from SSD takes 4x and from disk takes 80x longer.<sup><a href=https://github.com/donnemartin/system-design-primer-interview#latency-numbers-every-programmer-should-know>1</a></sup>
### Use case: User views the past week's most popular products by category
* The **Client** sends a request to the **Web Server**, running as a [reverse proxy](https://github.com/donnemartin/system-design-primer-interview#reverse-proxy-web-server)
* The **Web Server** forwards the request to the **Read API** server
* The **Read API** server reads from the **SQL Database** `sales_rank` table
We'll use a public [**REST API**](https://github.com/donnemartin/system-design-primer-interview##representational-state-transfer-rest):
```
$ curl https://amazon.com/api/v1/popular?category_id=1234
```
Response:
```
{
"id": "100",
"category_id": "1234",
"total_sold": "100000",
"product_id": "50",
},
{
"id": "53",
"category_id": "1234",
"total_sold": "90000",
"product_id": "200",
},
{
"id": "75",
"category_id": "1234",
"total_sold": "80000",
"product_id": "3",
},
```
For internal communications, we could use [Remote Procedure Calls](https://github.com/donnemartin/system-design-primer-interview#remote-procedure-call-rpc).
## Step 4: Scale the design
> Identify and address bottlenecks, given the constraints.
![Imgur](http://i.imgur.com/MzExP06.png)
**Important: Do not simply jump right into the final design from the initial design!**
State you would 1) **Benchmark/Load Test**, 2) **Profile** for bottlenecks 3) address bottlenecks while evaluating alternatives and trade-offs, and 4) repeat. See [Design a system that scales to millions of users on AWS]() as a sample on how to iteratively scale the initial design.
It's important to discuss what bottlenecks you might encounter with the initial design and how you might address each of them. For example, what issues are addressed by adding a **Load Balancer** with multiple **Web Servers**? **CDN**? **Master-Slave Replicas**? What are the alternatives and **Trade-Offs** for each?
We'll introduce some components to complete the design and to address scalability issues. Internal load balancers are not shown to reduce clutter.
*To avoid repeating discussions*, refer to the following [system design topics](https://github.com/donnemartin/system-design-primer-interview#) for main talking points, tradeoffs, and alternatives:
* [DNS](https://github.com/donnemartin/system-design-primer-interview#domain-name-system)
* [CDN](https://github.com/donnemartin/system-design-primer-interview#content-delivery-network)
* [Load balancer](https://github.com/donnemartin/system-design-primer-interview#load-balancer)
* [Horizontal scaling](https://github.com/donnemartin/system-design-primer-interview#horizontal-scaling)
* [Web server (reverse proxy)](https://github.com/donnemartin/system-design-primer-interview#reverse-proxy-web-server)
* [API server (application layer)](https://github.com/donnemartin/system-design-primer-interview#application-layer)
* [Cache](https://github.com/donnemartin/system-design-primer-interview#cache)
* [Relational database management system (RDBMS)](https://github.com/donnemartin/system-design-primer-interview#relational-database-management-system-rdbms)
* [SQL write master-slave failover](https://github.com/donnemartin/system-design-primer-interview#fail-over)
* [Master-slave replication](https://github.com/donnemartin/system-design-primer-interview#master-slave-replication)
* [Consistency patterns](https://github.com/donnemartin/system-design-primer-interview#consistency-patterns)
* [Availability patterns](https://github.com/donnemartin/system-design-primer-interview#availability-patterns)
The **Analytics Database** could use a data warehousing solution such as Amazon Redshift or Google BigQuery.
We might only want to store a limited time period of data in the database, while storing the rest in a data warehouse or in an **Object Store**. An **Object Store** such as Amazon S3 can comfortably handle the constraint of 40 GB of new content per month.
To address the 40,000 *average* read requests per second (higher at peak), traffic for popular content (and their sales rank) should be handled by the **Memory Cache** instead of the database. The **Memory Cache** is also useful for handling the unevenly distributed traffic and traffic spikes. With the large volume of reads, the **SQL Read Replicas** might not be able to handle the cache misses. We'll probably need to employ additional SQL scaling patterns.
400 *average* writes per second (higher at peak) might be tough for a single **SQL Write Master-Slave**, also pointing to a need for additional scaling techniques.
SQL scaling patterns include:
* [Federation](https://github.com/donnemartin/system-design-primer-interview#federation)
* [Sharding](https://github.com/donnemartin/system-design-primer-interview#sharding)
* [Denormalization](https://github.com/donnemartin/system-design-primer-interview#denormalization)
* [SQL Tuning](https://github.com/donnemartin/system-design-primer-interview#sql-tuning)
We should also consider moving some data to a **NoSQL Database**.
## Additional talking points
> Additional topics to dive into, depending on the problem scope and time remaining.
#### NoSQL
* [Key-value store](https://github.com/donnemartin/system-design-primer-interview#)
* [Document store](https://github.com/donnemartin/system-design-primer-interview#)
* [Wide column store](https://github.com/donnemartin/system-design-primer-interview#)
* [Graph database](https://github.com/donnemartin/system-design-primer-interview#)
* [SQL vs NoSQL](https://github.com/donnemartin/system-design-primer-interview#)
### Caching
* Where to cache
* [Client caching](https://github.com/donnemartin/system-design-primer-interview#client-caching)
* [CDN caching](https://github.com/donnemartin/system-design-primer-interview#cdn-caching)
* [Web server caching](https://github.com/donnemartin/system-design-primer-interview#web-server-caching)
* [Database caching](https://github.com/donnemartin/system-design-primer-interview#database-caching)
* [Application caching](https://github.com/donnemartin/system-design-primer-interview#application-caching)
* What to cache
* [Caching at the database query level](https://github.com/donnemartin/system-design-primer-interview#caching-at-the-database-query-level)
* [Caching at the object level](https://github.com/donnemartin/system-design-primer-interview#caching-at-the-object-level)
* When to update the cache
* [Cache-aside](https://github.com/donnemartin/system-design-primer-interview#cache-aside)
* [Write-through](https://github.com/donnemartin/system-design-primer-interview#write-through)
* [Write-behind (write-back)](https://github.com/donnemartin/system-design-primer-interview#write-behind-write-back)
* [Refresh ahead](https://github.com/donnemartin/system-design-primer-interview#refresh-ahead)
### Asynchronism and microservices
* [Message queues](https://github.com/donnemartin/system-design-primer-interview#)
* [Task queues](https://github.com/donnemartin/system-design-primer-interview#)
* [Back pressure](https://github.com/donnemartin/system-design-primer-interview#)
* [Microservices](https://github.com/donnemartin/system-design-primer-interview#)
### Communications
* Discuss tradeoffs:
* External communication with clients - [HTTP APIs following REST](https://github.com/donnemartin/system-design-primer-interview#representational-state-transfer-rest)
* Internal communications - [RPC](https://github.com/donnemartin/system-design-primer-interview#remote-procedure-call-rpc)
* [Service discovery](https://github.com/donnemartin/system-design-primer-interview#service-discovery)
### Security
Refer to the [security section](https://github.com/donnemartin/system-design-primer-interview#security).
### Latency numbers
See [Latency numbers every programmer should know](https://github.com/donnemartin/system-design-primer-interview#latency-numbers-every-programmer-should-know).
### Ongoing
* Continue benchmarking and monitoring your system to address bottlenecks as they come up
* Scaling is an iterative process

Binary file not shown.

After

Width:  |  Height:  |  Size: 213 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 78 KiB

View File

@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-
from mrjob.job import MRJob
class SalesRanker(MRJob):
def within_past_week(self, timestamp):
"""Return True if timestamp is within past week, False otherwise."""
...
def mapper(self, _ line):
"""Parse each log line, extract and transform relevant lines.
Emit key value pairs of the form:
(foo, p1), 2
(bar, p1), 2
(bar, p1), 1
(foo, p2), 3
(bar, p3), 10
(foo, p4), 1
"""
timestamp, product_id, category, quantity = line.split('\t')
if self.within_past_week(timestamp):
yield (category, product_id), quantity
def reducer(self, key, value):
"""Sum values for each key.
(foo, p1), 2
(bar, p1), 3
(foo, p2), 3
(bar, p3), 10
(foo, p4), 1
"""
yield key, sum(values)
def mapper_sort(self, key, value):
"""Construct key to ensure proper sorting.
Transform key and value to the form:
(foo, 2), p1
(bar, 3), p1
(foo, 3), p2
(bar, 10), p3
(foo, 1), p4
The shuffle/sort step of MapReduce will then do a
distributed sort on the keys, resulting in:
(category1, 1), product4
(category1, 2), product1
(category1, 3), product2
(category2, 3), product1
(category2, 7), product3
"""
category, product_id = key
quantity = value
yield (category, quantity), product_id
def reducer_identity(self, key, value):
yield key, value
def steps(self):
"""Run the map and reduce steps."""
return [
self.mr(mapper=self.mapper,
reducer=self.reducer),
self.mr(mapper=self.mapper_sort,
reducer=self.reducer_identity),
]
if __name__ == '__main__':
HitCounts.run()