Stream Processing

DBMS are generally not good with data which quickly loses value. In a stream processing system, we have static queries over incoming data. In this data, time is often implicit. Memory is limited, the input is infinite, so the results must be approximate. We need to consider that:

1. Push Driven Scheduling

In stream processing, data sources push data into operators:

1class PushOperator { 2 public: 3 virtual void process(Tuple) = 0; 4}; 5 6class Source { 7 public: 8 Source(PushOperator& plan) : plan_(plan) {} 9 10 void run() { 11 while (true) plan_.process(readTupleFromNetwork()); 12 } 13 14 private: 15 PushOperator& plan_; 16};

1.1 Select Operator

For example, a select operator can be implemented as follows:

1class Select : PushOperator { 2 public: 3 Select(PushOperator& plan, function<bool(Tuple)> pred) 4 : plan_(plan), pred_(pred) {} 5 6 void process(Tuple t) override { 7 if (pred_(t)) plan_.process(t); 8 } 9 10 private: 11 PushOperator& plan_; 12 function<bool(Tuple)> pred_; 13}

2. Backpressure

Although push driven scheduling makes sense here:

So, operators need to communicate "up the stream". To fix this, we can use backpressure:

1class PushOperator { 2 public: 3 virtual float process(Tuple) = 0; // Returns backpressure factor 4}; 5 6class Select : PushOperator { 7 public: 8 Select(PushOperator& plan, function<bool(Tuple)> pred) 9 : plan_(plan), pred_(pred) {} 10 11 float process(Tuple t) override { 12 auto before = std::chrono::system_clock::now(); 13 if (pred_(t)) plan_.process(t); 14 return std::chrono::system_clock::now() - before; // Return time it took 15 } 16 17 private: 18 PushOperator& plan_; 19 function<bool(Tuple)> pred_; 20};

In practice, backpressure usually captures resource consumption (e.g. Buffer Memory), not time.

3. Timestamps

There are multiple ways to capture an implicit timestamp:

However, the timestamps are not guaranteed to be ordered - and many queries rely on this. There are three options:

3.1 Transactions

1on IncomingBallPosition bpos 2 insert into BallPosition values (bpos); 3 4select * from PlayerPosition 5where distance(bpos.x, bpos.y, PlayerPosition.x, PlayerPosition.y) < .3 6and bpos.timestamp = PlayerPosition.timestamp; 7 8on IncomingPlayerPosition ppos 9 insert into PlayerPosition values (ppos);

3.2 Lateness Bound

1on IncomingBallPosition bpos 2 insert into BallPosition values (x); 3 4select * from PlayerPosition 5where distance(bpos.x, bpos.y, PlayerPosition.x, PlayerPosition.y) < .3 6and bpos.timestamp = PlayerPosition.timestamp; 7 8delete from BallPosition where timestamp < bpos.timestamp - lateness_bound;

3.3 Watermarks

1on IncomingBallPosition bpos 2 insert into BallPosition values (x); 3 4select * from PlayerPosition 5where distance(bpos.x, bpos.y, PlayerPosition.x, PlayerPosition.y) < .3 6and bpos.timestamp = PlayerPosition.timestamp; 7 8on IncomingBallPositionWatermark wm 9 delete from PlayerPosition 10 where timestamp < select max(timestamp) from BallPosition);

Watermarks are special tuples that indicate that no future tuples with a timestamp less than the watermark will arrive.

4. Windows

Window queries make lateness bounds a part of the query semantics. For example:

1SELECT 2 avg(x) OVER (ORDER BY timestamp ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) AS smoothedX, 3 avg(y) OVER (ORDER BY timestamp ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) AS smoothedY 4FROM BallPosition;

A window is defined in terms of its size (number of elements) and slide (numbe rof elements between the start of two window instances):

Session windows are opened / closed through events in the stream (e.g. user logging in).

4.1 Window Aggregation

We may want to aggregate values in an input. We have:

We can also divide these functions into:

4.2 Distributive Invertible Aggregation

1class WindowSumAggegator : PushOperator { 2public: 3 WindowSumAggregator(PushOperator& plan, size_t windowSize) 4 : plan_(plan), window_(windowSize) {} 5 6 void process(Tuple t) override { 7 aggregate_ += (float) t.at(0); 8 window[i_ % window_.size()] = (float) t.at(0); 9 if (i_++ >= window_.size()) { 10 aggregate_ -= window_[(i_ - 2) % window_.size()]; 11 plan_.process(Tuple{aggregate_}); 12 } 13 } 14private: 15 PushOperator& plan_; 16 vector<float> window_; 17 size_t i_ = 0; 18 float aggregate_ = 0; 19};

4.3 Hollistic Non-Invertible Aggregation

1class WindowMedianAggregator : PushOperator { 2public: 3 WindowMedianAggregator(PushOperator& plan, size_t windowSize) 4 : plan_(plan), window_(windowSize) {} 5 6 void process(Tuple t) override { 7 window_[i_++] = (float) t.at(0); 8 if (i_ >= window_.size()) { 9 auto sorted = window_; 10 sort(sorted.begin(), sorted.end()); 11 plan_.process(Tuple{sorted[sorted.size() / 2]}); 12 } 13 } 14 15private: 16 PushOperator& plan_; 17 vector<float> window_; 18 size_t i_ = 0; 19};

4.4 Two Stacks Algorithm

What if we wanted Non-Invertible Aggregation?

Here, we use a two stacks algorithm. We have the oldest tuples in the top of the back stack, and the newest tuples in the top of the front stack. When we need to evict an old tuple, we pop from the back stack. If it's empty, we move all elements from front to back, reversing their order. To calculate the median, we merge the two stacks into a sorted list. Essentially:

5. Joins

We could do a handshake join - two sliding windows, one for each input. When a tuple arrives on one side, we probe the other window for matches. Although this is naive, it has good locality and is easy to parallelize.

A symmetric hash join maintains a hash table for each input. When a tuple arrives on one side, we insert it into the corresponding hash table and probe the other hash table for matches. This allows producing results as soon as possible, but requires infinite memory.

5.1 Bloom Filters

Instead, we could use filters. This will provide approximate results. Take a query, for example:

1select * from BallContacts, RedCards 2where BallContacts.Player = RedCards.Player 3and BallContacts.timestamp > RedCards.timestamp;

This cannot be implemented with window joins, instead we use a bloom filter. This is similar to a hash table, where instead of storing the value, we just store a boolean array (whether we have seen a tuple).

However, hash collisions can produce a false positive. To reduce this, we can use multiple hash functions and hash to slots. By checking two slots we can detect some collisions. The bloom filters can be optimally configured with:

Back to Home