Stream Processing
DBMS are generally not good with data which quickly loses value. In a stream processing system, we have static queries over incoming data. In this data, time is often implicit. Memory is limited, the input is infinite, so the results must be approximate. We need to consider that:
- Scheduling is not up to the system.
- The order is important.
- Memory is bounded but the input is infinite.
1. Push Driven Scheduling
In stream processing, data sources push data into operators:
1class PushOperator { 2 public: 3 virtual void process(Tuple) = 0; 4}; 5 6class Source { 7 public: 8 Source(PushOperator& plan) : plan_(plan) {} 9 10 void run() { 11 while (true) plan_.process(readTupleFromNetwork()); 12 } 13 14 private: 15 PushOperator& plan_; 16};
1.1 Select Operator
For example, a select operator can be implemented as follows:
1class Select : PushOperator { 2 public: 3 Select(PushOperator& plan, function<bool(Tuple)> pred) 4 : plan_(plan), pred_(pred) {} 5 6 void process(Tuple t) override { 7 if (pred_(t)) plan_.process(t); 8 } 9 10 private: 11 PushOperator& plan_; 12 function<bool(Tuple)> pred_; 13}
2. Backpressure
Although push driven scheduling makes sense here:
- Slow operators slow down the entire pipeline.
- Source buffers can grow indefinitely.
So, operators need to communicate "up the stream". To fix this, we can use backpressure:
1class PushOperator { 2 public: 3 virtual float process(Tuple) = 0; // Returns backpressure factor 4}; 5 6class Select : PushOperator { 7 public: 8 Select(PushOperator& plan, function<bool(Tuple)> pred) 9 : plan_(plan), pred_(pred) {} 10 11 float process(Tuple t) override { 12 auto before = std::chrono::system_clock::now(); 13 if (pred_(t)) plan_.process(t); 14 return std::chrono::system_clock::now() - before; // Return time it took 15 } 16 17 private: 18 PushOperator& plan_; 19 function<bool(Tuple)> pred_; 20};
In practice, backpressure usually captures resource consumption (e.g. Buffer Memory), not time.
3. Timestamps
There are multiple ways to capture an implicit timestamp:
- Processing Time: Acquired by every operator when processing a tuple from the system clock. Not consistent, unpredictable but low overhead.
- Ingestion Time: Acquired by the fist operator, passed as an attribute. Consistent, but unpredictable with medium overhead.
- Event Time: Provided externally and passed as an attribute. Consistent and predictable but with high overhead. This is the only correct approach.
However, the timestamps are not guaranteed to be ordered - and many queries rely on this. There are three options:
- Treat stream as a sequence of transactions, inserting all tuples into a DBMS. However, memory is finite and streams are not.
- Make assumptions (e.g. a tuple can be up to 10s late).
- Make user provide guarantees.
3.1 Transactions
1on IncomingBallPosition bpos 2 insert into BallPosition values (bpos); 3 4select * from PlayerPosition 5where distance(bpos.x, bpos.y, PlayerPosition.x, PlayerPosition.y) < .3 6and bpos.timestamp = PlayerPosition.timestamp; 7 8on IncomingPlayerPosition ppos 9 insert into PlayerPosition values (ppos);
3.2 Lateness Bound
1on IncomingBallPosition bpos 2 insert into BallPosition values (x); 3 4select * from PlayerPosition 5where distance(bpos.x, bpos.y, PlayerPosition.x, PlayerPosition.y) < .3 6and bpos.timestamp = PlayerPosition.timestamp; 7 8delete from BallPosition where timestamp < bpos.timestamp - lateness_bound;
3.3 Watermarks
1on IncomingBallPosition bpos 2 insert into BallPosition values (x); 3 4select * from PlayerPosition 5where distance(bpos.x, bpos.y, PlayerPosition.x, PlayerPosition.y) < .3 6and bpos.timestamp = PlayerPosition.timestamp; 7 8on IncomingBallPositionWatermark wm 9 delete from PlayerPosition 10 where timestamp < select max(timestamp) from BallPosition);
Watermarks are special tuples that indicate that no future tuples with a timestamp less than the watermark will arrive.
4. Windows
Window queries make lateness bounds a part of the query semantics. For example:
1SELECT 2 avg(x) OVER (ORDER BY timestamp ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) AS smoothedX, 3 avg(y) OVER (ORDER BY timestamp ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) AS smoothedY 4FROM BallPosition;
A window is defined in terms of its size (number of elements) and slide (numbe rof elements between the start of two window instances):
- When
slide < size, we have sliding windows. - When
slide = size, we have tumbling windows. - When
slide > size, we have stream sampling.
Session windows are opened / closed through events in the stream (e.g. user logging in).
4.1 Window Aggregation
We may want to aggregate values in an input. We have:
- Distributive - can be calculated by looking at two inputs at a time (e.g. min, max, sum, count).
- Algebraic - can be calculated by first doing a distributive aggregation on parts of the input, then combining those results (e.g. average).
- Hollistic - need to look at all inputs at once (e.g. percentiles).
We can also divide these functions into:
- Inveritible - can be updated by removing old values (e.g. sum, count, average).
- Non-invertible - cannot be updated by removing old values (e.g. min, max, percentiles).
4.2 Distributive Invertible Aggregation
1class WindowSumAggegator : PushOperator { 2public: 3 WindowSumAggregator(PushOperator& plan, size_t windowSize) 4 : plan_(plan), window_(windowSize) {} 5 6 void process(Tuple t) override { 7 aggregate_ += (float) t.at(0); 8 window[i_ % window_.size()] = (float) t.at(0); 9 if (i_++ >= window_.size()) { 10 aggregate_ -= window_[(i_ - 2) % window_.size()]; 11 plan_.process(Tuple{aggregate_}); 12 } 13 } 14private: 15 PushOperator& plan_; 16 vector<float> window_; 17 size_t i_ = 0; 18 float aggregate_ = 0; 19};
4.3 Hollistic Non-Invertible Aggregation
1class WindowMedianAggregator : PushOperator { 2public: 3 WindowMedianAggregator(PushOperator& plan, size_t windowSize) 4 : plan_(plan), window_(windowSize) {} 5 6 void process(Tuple t) override { 7 window_[i_++] = (float) t.at(0); 8 if (i_ >= window_.size()) { 9 auto sorted = window_; 10 sort(sorted.begin(), sorted.end()); 11 plan_.process(Tuple{sorted[sorted.size() / 2]}); 12 } 13 } 14 15private: 16 PushOperator& plan_; 17 vector<float> window_; 18 size_t i_ = 0; 19};
4.4 Two Stacks Algorithm
What if we wanted Non-Invertible Aggregation?
Here, we use a two stacks algorithm. We have the oldest tuples in the top of the back stack, and the newest tuples in the top of the front stack. When we need to evict an old tuple, we pop from the back stack. If it's empty, we move all elements from front to back, reversing their order. To calculate the median, we merge the two stacks into a sorted list. Essentially:
- Maintain two stacks: a front stack for outgoing elements and a back stack for incoming elements.
- Each stack stores elements along with pre-aggregated partial results, enabling O(1) access to prefix aggregates.
- When the window slides, pop from the front stack; when empty, move all items from back to front, rebuilding aggregated prefixes.
- The current window’s aggregate is computed by combining the top aggregates of both stacks, ensuring amortized O(1) updates.
5. Joins
We could do a handshake join - two sliding windows, one for each input. When a tuple arrives on one side, we probe the other window for matches. Although this is naive, it has good locality and is easy to parallelize.
A symmetric hash join maintains a hash table for each input. When a tuple arrives on one side, we insert it into the corresponding hash table and probe the other hash table for matches. This allows producing results as soon as possible, but requires infinite memory.
5.1 Bloom Filters
Instead, we could use filters. This will provide approximate results. Take a query, for example:
1select * from BallContacts, RedCards 2where BallContacts.Player = RedCards.Player 3and BallContacts.timestamp > RedCards.timestamp;
This cannot be implemented with window joins, instead we use a bloom filter. This is similar to a hash table, where instead of storing the value, we just store a boolean array (whether we have seen a tuple).
However, hash collisions can produce a false positive. To reduce this, we can use multiple hash functions and hash to
- number of bits for the filter. - number of expected distinct elements. - number of hash functions. - false positive rate.