Query Processing Models
A processing model is a mechanism to connect different operators of a query. Different models will be optimised for different bottlenecks. Previously, we had:
1using Tuple = vector<variant<int, float, string>>; 2using Table = vector<Tuple>; 3 4class StorageManager { 5 map<string, Table> catalog; 6 7public: 8 Table& getTable(const string& name) { 9 return catalog[name]; 10 } 11};
1. Volcano Processing
Volcano focuses on design, not performance. It contains a cascades query optimiser, non-relational physical algebra and a query processing model. A volcano operator contains three virtual functions:
1struct Operator { 2 virtual void open() = 0; 3 virtual optional<Tuple> next() = 0; 4 virtual void close() = 0; 5};
1.1 Operator: Scan
Scan will read a table and return tuples one by one.
1struct Scan : Operator { 2 Table input; 3 size_t nextIndex = 0; 4 Scan(Table table) : input(table) {}; 5 6 void open() {}; // Assumes everything is in memory 7 optional<Tuple> next() { 8 return nextIndex < input.size() ? input[nextIndex++] : {}; 9 }; 10 void close() {}; 11};
As we can see, operators are stateful!
1.2 Operator: Project
This transforms a tuple into another tuple using a projection function:
1struct Project : Operator { 2 using Projection = function<Tuple(Tuple)>; 3 4 Projection projection; 5 unique_ptr<Operator> child; 6 Project(Projection proj, unique_ptr<Operator> child) 7 : projection(proj), child(move(child)) {}; 8 9 void open() { 10 child->open(); 11 }; 12 optional<Tuple> next() { 13 return projection(child->next()); 14 }; 15 void close() { 16 child->close(); 17 }; 18};
1.3 Operator: Select
A selection returns all tuples that satisfy a boolean predicate:
1struct Select : Operator { 2 using Predicate = function<bool(Tuple)>; 3 4 Predicate predicate; 5 unique_ptr<Operator> child; 6 Select(Predicate pred, unique_ptr<Operator> child) 7 : predicate(pred), child(move(child)) {}; 8 9 void open() { 10 child->open(); 11 }; 12 optional<Tuple> next() { 13 for (auto candidate = child->next(); candidate.has_value(); candidate = child->next()) { 14 if (predicate(candidate)) { 15 return candidate; 16 } 17 } 18 return {}; 19 }; 20 void close() { 21 child->close(); 22};
1.4 Operator: Union
Return all tuples from one child, followed by all tuples from another child:
1struct Union : Operator { 2 unique_ptr<Operator> leftChild; 3 unique_ptr<Operator> rightChild; 4 Union(unique_ptr<Operator> left, unique_ptr<Operator> right) 5 : leftChild(move(left)), rightChild(move(right)) {}; 6 7 void open() { 8 leftChild->open(); 9 rightChild->open(); 10 }; 11 optional<Tuple> next() { 12 auto leftResult = leftChild->next(); 13 return leftResult.has_value() ? leftResult : rightChild->next(); 14 }; 15 void close() { 16 leftChild->close(); 17 rightChild->close(); 18 }; 19};
1.5 Operator: Difference
1struct Difference : Operator { 2 unique_ptr<Operator> leftChild; 3 unique_ptr<Operator> rightChild; 4 vector<Tuple> bufferedRight; 5 Difference(unique_ptr<Operator> left, unique_ptr<Operator> right) 6 : leftChild(move(left)), rightChild(move(right)) {}; 7 8 void open() { 9 leftChild->open(); 10 rightChild->open(); 11 for (auto rightTuple = rightChild->next(); rightTuple.has_value(); rightTuple = rightChild->next()) { 12 bufferedRight.push_back(rightTuple); 13 } 14 }; 15 optional<Tuple> next() { 16 for (auto candidate = leftChild->next(); candidate.has_value(); candidate = leftChild->next()) { 17 if (find(bufferedRight.begin(), bufferedRight.end(), candidate) == bufferedRight.end()) { 18 return candidate; 19 } 20 } 21 return {}; 22 }; 23 void close() { 24 leftChild->close(); 25 rightChild->close(); 26 }; 27};
This operator must read all tuples from one side before the other. This is called a pipeline breaker operator.
1.6 Operator: Cross Product
1struct CrossProduct : Operator { 2 unique_ptr<Operator> leftChild; 3 unique_ptr<Operator> rightChild; 4 Tuple leftTuple{}; 5 vector<Tuple> bufferedRight; 6 size_t bufferedRightOffset = 0; 7 8 CrossProduct(unique_ptr<Operator> left, unique_ptr<Operator> right) 9 : leftChild(move(left)), rightChild(move(right)) {}; 10 11 void open() { 12 leftChild->open(); 13 rightChild->open(); 14 for (auto rightTuple = rightChild->next(); rightTuple.has_value(); rightTuple = rightChild->next()) { 15 bufferedRight.push_back(rightTuple); 16 } 17 }; 18 optional<Tuple> next() { 19 if (bufferedRightOffset == bufferedRight.size()) { 20 bufferedRightOffset = 0; 21 leftTuple = leftChild->next(); 22 } 23 if (!leftTuple.has_value()) { 24 return {}; 25 } 26 auto rightTuple = bufferedRight[bufferedRightOffset++]; 27 return leftTuple.concat(rightTuple); 28 }; 29 void close() { 30 leftChild->close(); 31 rightChild->close(); 32 }; 33};
However, this operator is inefficient, as it is a pipeline breaker. We can implement it better:
1struct CrossProduct : Operator { 2 unique_ptr<Operator> leftChild; 3 unique_ptr<Operator> rightChild; 4 Tuple leftTuple{}; 5 vector<Tuple> bufferedRight; 6 size_t bufferedRightOffset = 0; 7 8 CrossProduct(unique_ptr<Operator> left, unique_ptr<Operator> right) 9 : leftChild(move(left)), rightChild(move(right)) {}; 10 11 void open() { 12 leftChild->open(); 13 rightChild->open(); 14 leftTuple = leftChild->next(); 15 }; 16 optional<Tuple> next() { 17 auto rightTuple = rightChild->next(); 18 if (rightTuple.has_value()) { 19 bufferedRight.push_back(rightTuple); 20 } 21 if (bufferedRightOffset == bufferedRight.size()) { 22 bufferedRightOffset = 0; 23 leftTuple = leftChild->next(); 24 } 25 return leftTuple.concat(bufferedRight[bufferedRightOffset++]); 26 }; 27 void close() { 28 leftChild->close(); 29 rightChild->close(); 30 }; 31};
Some operators a pipelinable, some are not.
1.7 Operator: Grouped Aggregate
Groups all equal tuples (given a projection function), and calculate one or more per group aggregates. This is non pipelinable.
1struct GroupBy : Operator { 2 using SupportedDatatype = variant<int, float>; 3 using AggregateFunc = function<SupportedDatatype(SupportedDatatype, Tuple)>; 4 5 unique_ptr<Operator> child; 6 vector<optional<Tuple>> hashTable; 7 Projection getGroupKeys; 8 vector<AggregateFunc> aggregateFuncs; 9 int outputCursor = 0; 10 11 GroupBy(Projection groupKeys, vector<AggregateFunc> aggs, unique_ptr<Operator> child) 12 : getGroupKeys(groupKeys), aggregateFuncs(aggs), child(move(child)) {}; 13 14 size_t hash(Tuple); 15 size_t nextSlot(size_t); 16 17 void open() { 18 child->open(); 19 20 auto inputTuple = child->next(); 21 while (inputTuple.has_value()) { 22 // Put into hash table without conflicts: 23 auto slot = hash(inputTuple[groupAttribute]); 24 while (hashTable[slot].has_value() && inputTuple[groupAttribute] != hashTable[slot][0]) { 25 slot = nextSlot(slot); 26 } 27 28 // If the slot is empty, initialize it: 29 if (!hashTable[slot].has_value()) { 30 hashTable[slot][0] = {inputTuple[groupAttribute]}; 31 hashTable[slot].resize(1 + aggregateFuncs.size()); 32 } 33 34 // Update aggregates: 35 for (size_t i = 0; i < aggregateFuncs.size(); i++) { 36 hashTable[slot][i + 1] = aggregateFuncs[i](hashTable[slot][i + 1], inputTuple); 37 } 38 inputTuple = child->next(); 39 } 40 }; 41 optional<Tuple> next() { 42 while (outputCursor < hashTable.size()) { 43 auto slot = hashTable[outputCursor++]; 44 if (slot.has_value()) { 45 return slot; 46 } 47 } 48 return {}; 49 }; 50 void close() { 51 child->close(); 52 }; 53};
1.8 Using Volcano
We can build an example query as such:
1void myQuery() { 2 Table input{{1l, "Tom", 20}, {2l, "Jerry", 22}, {3l, "Spike", 21}}; 3 4 auto plan = make_unique<GroupBy>( 5 make_unique<Select>( 6 make_unique<Scan>(input), 7 [](auto t) { return t[2] == 21; } 8 ), 9 [](auto t) { return Tuple{t[1]}; }, 10 vector<AggregateFunc>{ 11 [](auto v, auto t) { return long(v) + 1; } 12 } 13 ); 14 15 plan->open(); 16 for (auto t = plan->next(); t.has_value(); t = plan->next()) { 17 cout << t << endl; 18 } 19 plan->close(); 20};
The above query counts the number of people aged 21, grouped by name. The volcano model:
- Elegant and short implementation
- Good object oriented design
- Extensible
- Good IO behaviour
We can also estimate buffer IO operations in Volcano:
- Scan operator uses the number of pages in the table
- For pipeline breakers, if they fit in memory, no IO.
- Otherwise, if it is accessed sequentially, we say it uses the number of pages in the input table.
- Otherwise, if it is not accessed sequentially, we say it uses one page per tuple.
To check if a buffer fits in memory, we need to know their size:
- For nested loop buffers and sorted relations, their size is the size of the input.
- Hashtables are overallocated by a factor (2 if not specified).
We assume perfect knowledge about input and output cardinalities (an oracle).
1.9 Limitations of Volcano
Virtual instruction calls are very difficult to branch predict, causing many pipeline stalls. We can approximate the number of function calls as:
- Scan: none, direct access to tuples.
- Select, Project: one to read input, one to apply predicate/projection.
- Cross Product: one to read left, one to read right.
- Join: one to read input.
- Group By: one to read input, one to apply aggregate value.
- Output: one to extract for output.
This is a huge CPU bottleneck!
2. Bulk Processing
Bulk processing attempts to reduce function calls on the CPU, being optimised for in-memory processing, by turning control dependencies into data dependencies. It does this by buffering tuples between operators, and processing them in batches. Bulk processing means tight loops:
- No function calls
- Very CPU efficient
- Every operator is a pipeline breaker, but very fast
In bulk processing, we read and write all IO sequentilly, meaning high spatial locality. If the buffer is memory, no slow IO is needed. If not, sequential access means high IO throughput (cost per page) whereas random access is very slow (cost per access).
2.1 Bulk Select
For example, a bulk select operator:
1int select(Table& output, Table const& input, int predicate, int attribute) { 2 for(size_t i = 0; i < input.size(); i++) { 3 if (input[i][attribute] == predicate) { 4 output.push_back(input[i]); 5 } 6 } 7 return output.size(); 8}; 9 10// Example usage: 11Table order, buffer1, buffer2; 12int attr1 = 4, attr2 = 7; 13select(buffer1, order, attr1, 2); 14select(buffer2, buffer1, attr2, 3);
2.2 Decomposed Storage
By Reference Bulk Processing optimises for bandwidth by producing tuple IDs instead. These are
1int select(vector<int>& output, optional<vector<int>> const& candidatePositions, 2 int predicate, int attribute, vector<Tuple> const& underlyingRelation) { 3 if (!candidatePositions.has_value()) { 4 // First selection in the plan: 5 for (size_t i = 0; i < underlyingRelation.size(); i++) { 6 if (underlyingRelation[i][attribute] == predicate) { 7 output.push_back(i); 8 } 9 } 10 } else { 11 // Later selection in the plan: 12 int outputCursor = 0; 13 for (size_t i = 0; i < candidatePositions->size(); i++) { 14 int pos = (*candidatePositions)[i]; 15 if (underlyingRelation[pos][attribute] == predicate) { 16 output[outputCursor++] = pos; 17 } 18 } 19 } 20 return outputCursor; 21}; 22 23// Example usage: 24vector<Tuple> order; 25vector<int> buffer1, buffer2; 26int attr1 = 4, attr2 = 7; 27select(buffer1, {}, attr1, 2, order); 28select(buffer2, buffer1, attr2, 3, order);
To calculate IO costs, we need to add in operators resolving candidate references by looking stuff up in the base relation.