2015-07-15Home
Photon: Fault-tolerant and Scalable Joining of Continuous Data Streams - Ananthanarayanan et al. 2013
We are familar with how join is performed in a relational databases (RDBMS). A typical inner-join SQL could be
select * from t_primary, t_foreign,
where t_primary.foreign_key = t_foreign.primary_key
A naive implementation of such a query is nested-loops-joins, where outer loop consumes table t_primary row by row and inner loop executed for each outer row searchs for matching rows in table t_foreign.
Joining two continuous data streams are like joining two tables in RDBMS but with far more challenges.
It's interesting to see how Google approaches these challenges in its joinning system, Photon, which is driving Google's Advertising System. Photon joins data streams such as web search queries and user clicks on advertisements and the joined log derives key business metrics includign billing for advertisers. It's geographically distributed and processes millions of events per minute at peak with an average end-to-end latency for less than 10 seconds.
Here's how Photon joins a search query with subsequent clicks on ads.
Photon formalizes the problem as
Formally, given two continuously growing log streams such that each event in the primary log stream contains a unique identifier, and each event in the foreign log stream contains the identifier referring to an event in the primary log stream, we want to join each foreign log event with the corresponding primary log event and produce the joined event.
As a system joinning continuous data streams, Photon face the challenges as described above. Additionally, at Google's scale, Photon is required to automatically handle datacenter-level outage with no manual operations and no impact on system availability. Hence, there are at least two copies of Photon pipeline in differenct datacenters each of which continues processing independent of the other.
While datacenter-level replication ensures availability, it becomes very difficult to guarantee consistency, i.e. one Photon pipeline should be aware whether one input event is already joined by another Photon pipeline. That means an input envent is joined at-most-once. Photon relies on its IdRegistry, a Paxos based in-memory key-value store, to coordinate between pipeline workers. Let's see how it works through an illustration of a single Photon pipeline.
The retrying logic ensures an input event is joined at-least-once. Combined with the at-most-once semantics guaranteed by IdRegistry, an input event is joined exactly-once in Photon. The system now looks reaonable and functional but it still needs to be performant, which highly depends on the performance of IdRegistry.
To be fault-tolerant, IdRegistry is itself replicated in different geographical regions.
Based on typical network statistics, the round-trip- time between different geographical regions (such as east and west coasts of the United States) can be over 100 milliseconds. This would limit the throughput of Paxos to less than 10 transactions per second, which is orders of magnitude fewer than our requirements—we need to process (both read and write) tens of thousands of events (i.e., key commits) per second.
To improve IdRegistry's throughput,
Events delayed by more than N days are discarded where N is determined by evaluating the trade-off between the costage of storage of the cost of dropping such events.
This post is inspired by The Morning Paper.