Query Processing Models

A processing model is a mechanism to connect different operators of a query. Different models will be optimised for different bottlenecks. Previously, we had:

1using Tuple = vector<variant<int, float, string>>; 2using Table = vector<Tuple>; 3 4class StorageManager { 5 map<string, Table> catalog; 6 7public: 8 Table& getTable(const string& name) { 9 return catalog[name]; 10 } 11};

1. Volcano Processing

Volcano focuses on design, not performance. It contains a cascades query optimiser, non-relational physical algebra and a query processing model. A volcano operator contains three virtual functions:

1struct Operator { 2 virtual void open() = 0; 3 virtual optional<Tuple> next() = 0; 4 virtual void close() = 0; 5};

1.1 Operator: Scan

Scan will read a table and return tuples one by one.

1struct Scan : Operator { 2 Table input; 3 size_t nextIndex = 0; 4 Scan(Table table) : input(table) {}; 5 6 void open() {}; // Assumes everything is in memory 7 optional<Tuple> next() { 8 return nextIndex < input.size() ? input[nextIndex++] : {}; 9 }; 10 void close() {}; 11};

As we can see, operators are stateful!

1.2 Operator: Project

This transforms a tuple into another tuple using a projection function:

1struct Project : Operator { 2 using Projection = function<Tuple(Tuple)>; 3 4 Projection projection; 5 unique_ptr<Operator> child; 6 Project(Projection proj, unique_ptr<Operator> child) 7 : projection(proj), child(move(child)) {}; 8 9 void open() { 10 child->open(); 11 }; 12 optional<Tuple> next() { 13 return projection(child->next()); 14 }; 15 void close() { 16 child->close(); 17 }; 18};

1.3 Operator: Select

A selection returns all tuples that satisfy a boolean predicate:

1struct Select : Operator { 2 using Predicate = function<bool(Tuple)>; 3 4 Predicate predicate; 5 unique_ptr<Operator> child; 6 Select(Predicate pred, unique_ptr<Operator> child) 7 : predicate(pred), child(move(child)) {}; 8 9 void open() { 10 child->open(); 11 }; 12 optional<Tuple> next() { 13 for (auto candidate = child->next(); candidate.has_value(); candidate = child->next()) { 14 if (predicate(candidate)) { 15 return candidate; 16 } 17 } 18 return {}; 19 }; 20 void close() { 21 child->close(); 22};

1.4 Operator: Union

Return all tuples from one child, followed by all tuples from another child:

1struct Union : Operator { 2 unique_ptr<Operator> leftChild; 3 unique_ptr<Operator> rightChild; 4 Union(unique_ptr<Operator> left, unique_ptr<Operator> right) 5 : leftChild(move(left)), rightChild(move(right)) {}; 6 7 void open() { 8 leftChild->open(); 9 rightChild->open(); 10 }; 11 optional<Tuple> next() { 12 auto leftResult = leftChild->next(); 13 return leftResult.has_value() ? leftResult : rightChild->next(); 14 }; 15 void close() { 16 leftChild->close(); 17 rightChild->close(); 18 }; 19};

1.5 Operator: Difference

1struct Difference : Operator { 2 unique_ptr<Operator> leftChild; 3 unique_ptr<Operator> rightChild; 4 vector<Tuple> bufferedRight; 5 Difference(unique_ptr<Operator> left, unique_ptr<Operator> right) 6 : leftChild(move(left)), rightChild(move(right)) {}; 7 8 void open() { 9 leftChild->open(); 10 rightChild->open(); 11 for (auto rightTuple = rightChild->next(); rightTuple.has_value(); rightTuple = rightChild->next()) { 12 bufferedRight.push_back(rightTuple); 13 } 14 }; 15 optional<Tuple> next() { 16 for (auto candidate = leftChild->next(); candidate.has_value(); candidate = leftChild->next()) { 17 if (find(bufferedRight.begin(), bufferedRight.end(), candidate) == bufferedRight.end()) { 18 return candidate; 19 } 20 } 21 return {}; 22 }; 23 void close() { 24 leftChild->close(); 25 rightChild->close(); 26 }; 27};

This operator must read all tuples from one side before the other. This is called a pipeline breaker operator.

1.6 Operator: Cross Product

1struct CrossProduct : Operator { 2 unique_ptr<Operator> leftChild; 3 unique_ptr<Operator> rightChild; 4 Tuple leftTuple{}; 5 vector<Tuple> bufferedRight; 6 size_t bufferedRightOffset = 0; 7 8 CrossProduct(unique_ptr<Operator> left, unique_ptr<Operator> right) 9 : leftChild(move(left)), rightChild(move(right)) {}; 10 11 void open() { 12 leftChild->open(); 13 rightChild->open(); 14 for (auto rightTuple = rightChild->next(); rightTuple.has_value(); rightTuple = rightChild->next()) { 15 bufferedRight.push_back(rightTuple); 16 } 17 }; 18 optional<Tuple> next() { 19 if (bufferedRightOffset == bufferedRight.size()) { 20 bufferedRightOffset = 0; 21 leftTuple = leftChild->next(); 22 } 23 if (!leftTuple.has_value()) { 24 return {}; 25 } 26 auto rightTuple = bufferedRight[bufferedRightOffset++]; 27 return leftTuple.concat(rightTuple); 28 }; 29 void close() { 30 leftChild->close(); 31 rightChild->close(); 32 }; 33};

However, this operator is inefficient, as it is a pipeline breaker. We can implement it better:

1struct CrossProduct : Operator { 2 unique_ptr<Operator> leftChild; 3 unique_ptr<Operator> rightChild; 4 Tuple leftTuple{}; 5 vector<Tuple> bufferedRight; 6 size_t bufferedRightOffset = 0; 7 8 CrossProduct(unique_ptr<Operator> left, unique_ptr<Operator> right) 9 : leftChild(move(left)), rightChild(move(right)) {}; 10 11 void open() { 12 leftChild->open(); 13 rightChild->open(); 14 leftTuple = leftChild->next(); 15 }; 16 optional<Tuple> next() { 17 auto rightTuple = rightChild->next(); 18 if (rightTuple.has_value()) { 19 bufferedRight.push_back(rightTuple); 20 } 21 if (bufferedRightOffset == bufferedRight.size()) { 22 bufferedRightOffset = 0; 23 leftTuple = leftChild->next(); 24 } 25 return leftTuple.concat(bufferedRight[bufferedRightOffset++]); 26 }; 27 void close() { 28 leftChild->close(); 29 rightChild->close(); 30 }; 31};

Some operators a pipelinable, some are not.

1.7 Operator: Grouped Aggregate

Groups all equal tuples (given a projection function), and calculate one or more per group aggregates. This is non pipelinable.

1struct GroupBy : Operator { 2 using SupportedDatatype = variant<int, float>; 3 using AggregateFunc = function<SupportedDatatype(SupportedDatatype, Tuple)>; 4 5 unique_ptr<Operator> child; 6 vector<optional<Tuple>> hashTable; 7 Projection getGroupKeys; 8 vector<AggregateFunc> aggregateFuncs; 9 int outputCursor = 0; 10 11 GroupBy(Projection groupKeys, vector<AggregateFunc> aggs, unique_ptr<Operator> child) 12 : getGroupKeys(groupKeys), aggregateFuncs(aggs), child(move(child)) {}; 13 14 size_t hash(Tuple); 15 size_t nextSlot(size_t); 16 17 void open() { 18 child->open(); 19 20 auto inputTuple = child->next(); 21 while (inputTuple.has_value()) { 22 // Put into hash table without conflicts: 23 auto slot = hash(inputTuple[groupAttribute]); 24 while (hashTable[slot].has_value() && inputTuple[groupAttribute] != hashTable[slot][0]) { 25 slot = nextSlot(slot); 26 } 27 28 // If the slot is empty, initialize it: 29 if (!hashTable[slot].has_value()) { 30 hashTable[slot][0] = {inputTuple[groupAttribute]}; 31 hashTable[slot].resize(1 + aggregateFuncs.size()); 32 } 33 34 // Update aggregates: 35 for (size_t i = 0; i < aggregateFuncs.size(); i++) { 36 hashTable[slot][i + 1] = aggregateFuncs[i](hashTable[slot][i + 1], inputTuple); 37 } 38 inputTuple = child->next(); 39 } 40 }; 41 optional<Tuple> next() { 42 while (outputCursor < hashTable.size()) { 43 auto slot = hashTable[outputCursor++]; 44 if (slot.has_value()) { 45 return slot; 46 } 47 } 48 return {}; 49 }; 50 void close() { 51 child->close(); 52 }; 53};

1.8 Using Volcano

We can build an example query as such:

1void myQuery() { 2 Table input{{1l, "Tom", 20}, {2l, "Jerry", 22}, {3l, "Spike", 21}}; 3 4 auto plan = make_unique<GroupBy>( 5 make_unique<Select>( 6 make_unique<Scan>(input), 7 [](auto t) { return t[2] == 21; } 8 ), 9 [](auto t) { return Tuple{t[1]}; }, 10 vector<AggregateFunc>{ 11 [](auto v, auto t) { return long(v) + 1; } 12 } 13 ); 14 15 plan->open(); 16 for (auto t = plan->next(); t.has_value(); t = plan->next()) { 17 cout << t << endl; 18 } 19 plan->close(); 20};

The above query counts the number of people aged 21, grouped by name. The volcano model:

We can also estimate buffer IO operations in Volcano:

To check if a buffer fits in memory, we need to know their size:

We assume perfect knowledge about input and output cardinalities (an oracle).

1.9 Limitations of Volcano

Virtual instruction calls are very difficult to branch predict, causing many pipeline stalls. We can approximate the number of function calls as:

This is a huge CPU bottleneck!

2. Bulk Processing

Bulk processing attempts to reduce function calls on the CPU, being optimised for in-memory processing, by turning control dependencies into data dependencies. It does this by buffering tuples between operators, and processing them in batches. Bulk processing means tight loops:

In bulk processing, we read and write all IO sequentilly, meaning high spatial locality. If the buffer is memory, no slow IO is needed. If not, sequential access means high IO throughput (cost per page) whereas random access is very slow (cost per access).

2.1 Bulk Select

For example, a bulk select operator:

1int select(Table& output, Table const& input, int predicate, int attribute) { 2 for(size_t i = 0; i < input.size(); i++) { 3 if (input[i][attribute] == predicate) { 4 output.push_back(input[i]); 5 } 6 } 7 return output.size(); 8}; 9 10// Example usage: 11Table order, buffer1, buffer2; 12int attr1 = 4, attr2 = 7; 13select(buffer1, order, attr1, 2); 14select(buffer2, buffer1, attr2, 3);

2.2 Decomposed Storage

By Reference Bulk Processing optimises for bandwidth by producing tuple IDs instead. These are bit positions in their buffer. When processing, we dereference the tuple IDs to get the actual tuples. Lookup costs are the same as hashtables without conflicts. Now our select operator looks like:

1int select(vector<int>& output, optional<vector<int>> const& candidatePositions, 2 int predicate, int attribute, vector<Tuple> const& underlyingRelation) { 3 if (!candidatePositions.has_value()) { 4 // First selection in the plan: 5 for (size_t i = 0; i < underlyingRelation.size(); i++) { 6 if (underlyingRelation[i][attribute] == predicate) { 7 output.push_back(i); 8 } 9 } 10 } else { 11 // Later selection in the plan: 12 int outputCursor = 0; 13 for (size_t i = 0; i < candidatePositions->size(); i++) { 14 int pos = (*candidatePositions)[i]; 15 if (underlyingRelation[pos][attribute] == predicate) { 16 output[outputCursor++] = pos; 17 } 18 } 19 } 20 return outputCursor; 21}; 22 23// Example usage: 24vector<Tuple> order; 25vector<int> buffer1, buffer2; 26int attr1 = 4, attr2 = 7; 27select(buffer1, {}, attr1, 2, order); 28select(buffer2, buffer1, attr2, 3, order);

To calculate IO costs, we need to add in operators resolving candidate references by looking stuff up in the base relation.

Back to Home