Previous Post
Scheduling, versioning and cataloging: introducing our dbt integration

PostgreSQL FDW aggregation pushdown part I: modifying Multicorn

We recently implemented support for aggregation and grouping pushdown in the Multicorn FDW. In this post, we'll demonstrate it on a simple toy example and discuss how PostgreSQL aggregation pushdown works in general.

Introduction

Besides being remarkably stable, performant and reliable, PostgreSQL offers a plethora of more or less known attractive features, has a meticulous source code and documentation, and a bountiful ecosystem to spur. That is why at its core, Splitgraph is based on PostgreSQL. One particular feature that stands out, and has been underutilized in our opinion, is foreign data wrappers (FDW).

As we previously elaborated, FDWs enable SQL to serve as a "lingua franca" of sorts, and make any remote data source queryable by the user locally, whether it's "SQL-native" (PostgreSQL, MySQL, SQLite, Snowflake, etc.) or not (MongoDB, Redis, Elasticsearch, CSVs in S3, etc.). In other words, if it has an API you can probably make an FDW for it.

The gold standard for all FDW implementations is postgres_fdw, the native PostgreSQL FDW, which illustrates the full scope of its capabilities. Besides basic read and write operations, this also includes support for handling non-trivial SQL constructs in a clever way, namely pushdown of join and aggregation queries.

In this article, the first part in a series of posts (check out part II, and III), we start to describe how we implemented aggregation pushdown in a number of FDWs that we employ, all based on a common framework named Multicorn.

Multicorn basics

Writing an FDW implementation from scratch is not exactly trivial, and neither is adding aggregation push-down capability. This canonically involves writing PostgreSQL-flavored C code extensions, which despite being surprisingly smooth, can still be quite laborious and error prone.

Enter Multicorn. On the one hand, it provides a high-level interface for last-mile data wrangling to be performed in Python, customized for each specific FDW implementation. On the other hand, the shared core internally couples into the FDW mechanism in accordance with the low-level API, so that the users need not worry about it.

To give a concrete example, let's focus on a working FDW toy model written in Multicorn—one in which we query a Pandas data frame, but for the sake of drama let's pretend this data source is found on a remote server, somewhere across the network.

import pandas as pd
from multicorn import ForeignDataWrapper

df = pd.DataFrame({
    "number": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
    "parity": ["even", "odd", "even", "odd", "even", "odd", "even", "odd", "even", "odd"]
})

def fake_remote_pandas_endpoint(columns):
    return df[columns].to_dict("records")


class PandasFdw(ForeignDataWrapper):

    def execute(self, quals, columns):
        for row in fake_remote_pandas_endpoint(columns):
            yield row
        return

Now, after we instantiate the FDW:

CREATE SERVER pandas_server FOREIGN DATA WRAPPER multicorn OPTIONS ( wrapper 'multicorn.pandasfdw.PandasFdw');
CREATE FOREIGN TABLE pandas_table(number int, parity character varying(4)) SERVER pandas_server;

we can go ahead and actually query it:

sgr@localhost:splitgraph> SELECT number, parity FROM pandas_table
+--------+--------+
| number | parity |
|--------+--------|
| 0      | even   |
| 1      | odd    |
| 2      | even   |
| 3      | odd    |
| 4      | even   |
| 5      | odd    |
| 6      | even   |
| 7      | odd    |
| 8      | even   |
| 9      | odd    |
+--------+--------+

In fact, we can do aggregations too:

sgr@localhost:splitgraph> SELECT count(number) FROM pandas_table
+-------+
| count |
|-------|
| 10    |
+-------+

There is only one issue however, as evidenced by the execute API—Multicorn doesn't support the aggregation pushdown mechanism. In particular, without aggregation pushdown the query sent to the remote would amount to a full table scan, and the aggregation would be performed locally by PostgreSQL itself only after fetching all the rows. Needless to say that this is sub-optimal, given that in real scenarios aggregation results most often involve orders of magnitude smaller outputs than the full size of the underlying data, and at its most extreme reduce to a single row.

Moreover, this impacts not only speed, but can also end up being quite costly due to the substantial egress fees on many cloud providers, while at the same time neglecting to utilize inbuilt (likely optimized) aggregation capabilities of the remote data source. Given that we've previously customized the Multicorn repository in order to ensure Splitgraph's column-oriented nature, it made sense to try and implement this functionality ourselves.

FDW mechanism 101

Generally speaking, an FDW implementation hooks into the planning and execution phases of the query processing flow. In particular, during the planning phase PostgreSQL will consult the FDW implementation for an estimate of the remote data size to be fetched (to aid in optimal plan selection), then ask it to create corresponding foreign access path(s), with each path representing one competing approach that will result in the same output for the given query. Lastly, the FDW is required to generate a plan out of the selected path.

Next, in the execution phase PostgreSQL delegates to the FDW the responsibility of preparing for the scan, iterating through the scan itself, and finally cleaning up any leftover resources. Note that each of the aforementioned steps corresponds to one of the pre-determined callback functions that must be implemented by the FDW.

In order to move beyond the most basic foreign scan, one needs to start implementing some of the optional advanced callback functions; in the case of aggregation pushdown this is GetForeignUpperPaths (and the contained functions). In PostgreSQL terminology, post-scan/join query constructs, such as aggregations among other things, are known as upper relations, hence the name.

Deep dive into aggregation pushdown

Our own version of GetForeignUpperPaths closely follows that of postgres_fdw itself, albeit in a considerably reduced form and with a long tail of various tweaks in order to meet our needs. It is beyond the scope of this blog post to go through the entire implementation (relevant PRs: #1, #2, #3), so we will give a higher level overview of how it works.

The main outcome of GetForeignUpperPaths is the addition of a foreign access path, whereby the aggregation would be pushed to the remote data source, and which the planner can then (most likely) choose as the cheapest path. However, in producing this outcome there is a problem to be addressed first: can the desired query be executed by the remote in the first place?

We should not push down the aggregation if it cannot be performed on the other side. Think of specific aggregation functions (e.g. jsonb_agg, aka aggregate values, including nulls, as a JSON array) or operators (e.g. ~~*, aka ILIKE), which don't necessarily have an equivalent in the remote data source. We must detect any such cases, and skip adding any corresponding upper access paths. The aggregation will have to be performed the hard way, after fetching all the relevant rows first.

Target structure

We achieve this verification by recursively walking through the expression tree for each expression of the path target, making sure each construct involved is shippable. To get a better sense of the task at hand, let's take a closer look at the relevant upper path target for our simple example. One way to see that is to look at the targetList field of the Query struct with e.g. debug_print_rewritten option ON (for brevity, we purposefully omit fields irrelevant to our discussion):

(
   {QUERY
   :commandType 1:hasAggs true
   ⋮
   :targetList (
      {TARGETENTRY
      :expr
         {AGGREF
         :aggfnoid 2147
         :aggtype 20:args (
            {TARGETENTRY
            :expr
               {VAR
               :varno 1
               :varattno 1
               :vartype 23}}
         )}:resname count
      ⋮
      }
   )}
)

There are a number of things worth noting, notably commandType equal to 1 corresponds to a SELECT statement, while hasAggs is true due to the count invocation. More importantly, note that the expression list has one element, consisting of two nested nodes, namely Aggref and Var, each of which is encapsulated by a TargetEntry struct. These in turn contain further useful information.

On the one hand, Aggrefs aggfnoid holds the Oid of the function, from which we can extract the function name that we want to pass to the PandasFdw. Note that we cannot rely on resname of the target entry, since this refers only to the output column name, and defaults to count simply because we haven't used a column alias in this case. Also important is the aggtype field, which denotes the type of the output (in this case INT8OID aka bigint); in some cases we must apply type coercion if the remote data source does not comply (e.g. Elasticsearch returning a float for aggregations over an integer field).

On the other hand, Vars varno can determine whether the variable belongs to a foreign table at all, while vartype represents the type of the column itself ( INT4OID aka integer). Finally, varattno corresponds to the position of the column in the table, and can be used to fetch the actual column name, another data point to be passed into our PandasFdw.

Multicorn aggregation API

Having established that we only need Aggref and Var nodes simplifies the expression walking algorithm significantly. We've also identified where we can obtain the data that needs to be passed to PandasFdw, namely function name and column name. We will pass those through a keyword argument aggs, in the following form:

{
    "count.number":
    {
        "column": "number",
        "function": "count"
    },
    ...
}

This is also where the peculiarities of Multicorn manifest compared to all other FDW implementations. Since Multicorn is devised to be a generic FDW framework, one does not know ahead of time which functions and operators are supported.

In order to help determine that, we will add a specific method named can_pushdown_upperrel to our PandasFdw class which will be invoked during a GetForeignUpperPaths call. For now this method only needs to provide Multicorn with a list of supported aggregation functions that can safely be propagated.

Adding support for aggregation functions

Let us look at how PandasFdw looks like with support for aggregations added:

import json
import pandas as pd
from multicorn import ForeignDataWrapper

df = pd.DataFrame({
    "number": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9],
    "parity": ["even", "odd", "even", "odd", "even", "odd", "even", "odd", "even", "odd"]
})

def fake_remote_pandas_endpoint(columns, aggs=None):
    if aggs is not None:
        # Returns {"column_1": {"avg": x, "sum": y}, "column_2": {"min": z}, ..}
        return df.agg(aggs).to_dict()

    return df[columns].to_dict("records")


def _convert_aggs_arg(aggs):
    # Convert aggs in accordance with Pandas API:
    # {"column_1": ["avg", "sum"], "column_2": ["min"], ..}
    pandas_aggs = {}
    for agg_props in aggs.values():
        if agg_props["column"] not in pandas_aggs:
            pandas_aggs[agg_props["column"]] = [agg_props["function"]]
        else:
            pandas_aggs[agg_props["column"]].append(agg_props["function"])
    return pandas_aggs


class PandasFdw(ForeignDataWrapper):

    def can_pushdown_upperrel(self):
        return {
            "agg_functions": ["min", "max", "sum", "avg", "count"]
        }

    def explain(self, quals, columns, aggs=None, verbose=False):
        return [
            f"columns: {columns}",
            f"aggs: {json.dumps(aggs, indent=4)}"
        ]

    def execute(self, quals, columns, aggs=None):
        if aggs is not None:
            pandas_aggs = _convert_aggs_arg(aggs)
            row = fake_remote_pandas_endpoint(columns, pandas_aggs)

            # Convert result back to Multicorn API:
            # {"column_1.avg": x, "column_1.sum": y, "column_2.min": z, ...}
            result = {}
            for agg_name, agg_props in aggs.items():
                result[agg_name] = row[agg_props["column"]][agg_props["function"]]

            yield result
        else:
            for row in fake_remote_pandas_endpoint(columns):
                yield row
        return

A couple of important generic observations here:

  • note that we enable our Pandas endpoint to execute the new aggregation queries
  • we have a new function named _convert_aggs_arg, that simply needs to adapt the aggs argument to the one our Pandas endpoint accepts
  • can_pushdown_upperrel returns a dict with a sole element being the list of supported functions under the key agg_functions
  • we add an explain method, that can shed light on the plan chosen by Postgres; if the aggs argument is not None then Postgres opted for the pushdown
  • finally, when returning the resulting row to Multicorn, we must convert it to the expected output: a corresponding value for each aggregation key in aggs

Now we can confirm that aggregation pushdown is chosen as the optimal path:

sgr@localhost:splitgraph> EXPLAIN SELECT count(number) FROM pandas_table
+------------------------------------------------+
| QUERY PLAN                                     |
|------------------------------------------------|
| Foreign Scan  (cost=1.00..1.00 rows=1 width=1) |
|   Multicorn: columns: ['number']               |
|   Multicorn: aggs: {                           |
|     "count.number": {                          |
|         "function": "count",                   |
|         "column": "number"                     |
|     }                                          |
| }                                              |
+------------------------------------------------+
sgr@localhost:splitgraph> SELECT count(number) FROM pandas_table
+-------+
| count |
|-------|
| 10    |
+-------+

A proper aggs object was passed down, which means that PostgreSQL did in fact decide to go for an aggregation pushdown. If we try a combo of other aggregation functions:

sgr@localhost:splitgraph> SELECT min(number), max(number), sum(number), avg(number)
    FROM pandas_table
Error in python: AttributeError
DETAIL:  'avg' is not a valid function for 'Series' object

we see that we get an error. The problem arises since we propagate the Postgres average function avg verbatim, instead of using the Pandas native average. Here's a straightforward fix:

@@ -13,16 +13,25 @@ def fake_remote_pandas_endpoint(columns, aggs=None):

     return df[columns].to_dict("records")

+_PG_TO_PANDAS_FUNC_MAP = {
+    "min": "min",
+    "max": "max",
+    "sum": "sum",
+    "avg": "average",
+    "count": "count",
+}

 def _convert_aggs_arg(aggs):
     # Convert aggs in accordance with Pandas API:
     # {"column_1": ["avg", "sum"], "column_2": ["min"], ..}
     pandas_aggs = {}
     for agg_props in aggs.values():
+        function_name = _PG_TO_PANDAS_FUNC_MAP[agg_props["function"]]
+
         if agg_props["column"] not in pandas_aggs:
-            pandas_aggs[agg_props["column"]] = [agg_props["function"]]
+            pandas_aggs[agg_props["column"]] = [function_name]
         else:
-            pandas_aggs[agg_props["column"]].append(agg_props["function"])
+            pandas_aggs[agg_props["column"]].append(function_name)
     return pandas_aggs


@@ -31,7 +40,7 @@ class PandasFdw(ForeignDataWrapper):
     def can_pushdown_upperrel(self):
         return {
-            "agg_functions": ["min", "max", "sum", "avg", "count"]
+            "agg_functions": list(_PG_TO_PANDAS_FUNC_MAP)
         }

     def explain(self, quals, columns, aggs=None, verbose=False):

Trying out the same query with the above changes gives us the correct result:

sgr@localhost:splitgraph> SELECT min(number), max(number), sum(number), avg(number)
    FROM pandas_table
+-----+-----+-----+-----+
| min | max | sum | avg |
|-----+-----+-----+-----|
| 0   | 9   | 45  | 4.5 |
+-----+-----+-----+-----+

Pushing down GROUP BY clauses

The next logical step is enabling pushdown of aggregation queries in the broader sense, i.e. ones with a grouping clause:

SELECT parity, count(number) FROM pandas_table GROUP BY parity

Let's take a look at what changed in the query tree:

(
   {QUERY
   :commandType 1:hasAggs true
   ⋮
   :targetList (
      {TARGETENTRY
      :expr
         {VAR
         :varno 1
         :varattno 2
         :vartype 1043}:resname parity
      :ressortgroupref 1}
      {TARGETENTRY
      :expr
         {AGGREF
         :aggfnoid 2147
         :aggtype 20:args (
            {TARGETENTRY
            :expr
               {VAR
               :varno 1
               :varattno 1
               :vartype 23}}
         )}:resname count
      ⋮
      }
   ):groupClause (
      {SORTGROUPCLAUSE
      :tleSortGroupRef 1}
   )}
)

You can see that the expression now contains a target with the corresponding Var, which references a SortGroupClause. Again, for any such variables we can extract the corresponding column names and pass them into PandasFdw inside a list named group_clauses:

@@ -7,7 +7,9 @@ df = pd.DataFrame({
     "parity": ["even", "odd", "even", "odd", "even", "odd", "even", "odd", "even", "odd"]
 })

-def fake_remote_pandas_endpoint(columns, aggs=None):
+def fake_remote_pandas_endpoint(columns, aggs=None, group_clauses=None):
+    if group_clauses is not None:
+        return df.groupby(group_clauses, as_index=False).agg(aggs).to_dict('records')
     if aggs is not None:
         # Returns {"column_1": {"avg": x, "sum": y}, "column_2": {"min": z}, ..}
         return df.agg(aggs).to_dict()
@@ -40,19 +42,33 @@ class PandasFdw(ForeignDataWrapper):

     def can_pushdown_upperrel(self):
         return {
+            "groupby_supported": True,
             "agg_functions": list(_PG_TO_PANDAS_FUNC_MAP)
         }

-    def explain(self, quals, columns, aggs=None, verbose=False):
+    def explain(self, quals, columns, aggs=None, group_clauses=None, verbose=False):
         return [
             f"columns: {columns}",
-            f"aggs: {json.dumps(aggs, indent=4)}"
+            f"aggs: {json.dumps(aggs, indent=4)}",
+            f"group_clauses: {group_clauses}"
         ]

-    def execute(self, quals, columns, aggs=None):
-        if aggs is not None:
+    def execute(self, quals, columns, aggs=None, group_clauses=None):
+        if group_clauses is not None:
+            pandas_aggs = _convert_aggs_arg(aggs)
+
+            for row in fake_remote_pandas_endpoint(columns, pandas_aggs, group_clauses):
+                # Convert result back to Multicorn API:
+                result = {}
+                for agg_name, agg_props in aggs.items():
+                    result[agg_name] = row[(agg_props["column"], agg_props["function"])]
+
+                for group_clause in group_clauses:
+                    result[group_clause] = row[(group_clause, "")]
+
+                yield result
+        elif aggs is not None:
             pandas_aggs = _convert_aggs_arg(aggs)
             row = fake_remote_pandas_endpoint(columns, pandas_aggs)

Again, few things worth pointing out:

  • we extend our Pandas endpoint to support grouping statements
  • we declare to Multicorn that grouping constructs are now supported by setting groupby_supported to True inside can_pushdown_upperrel
  • also, the parsing of returned results is somewhat more involved now, since we must also return a value for each group clause element in every row

At last, the resulting query will get pushed down:

sgr@localhost:splitgraph> EXPLAIN SELECT parity, count(number) FROM pandas_table
    GROUP BY parity
+------------------------------------------------+
| QUERY PLAN                                     |
|------------------------------------------------|
| Foreign Scan  (cost=1.00..1.00 rows=1 width=1) |
|   Multicorn: columns: ['parity', 'number']     |
|   Multicorn: aggs: {                           |
|     "count.number": {                          |
|         "function": "count",                   |
|         "column": "number"                     |
|     }                                          |
| }                                              |
|   Multicorn: group_clauses: ['parity']         |
+------------------------------------------------+
sgr@localhost:splitgraph> SELECT parity, count(number) FROM pandas_table
    GROUP BY parity
+--------+-------+
| parity | count |
|--------+-------|
| even   | 5     |
| odd    | 5     |
+--------+-------+

The final code for PandasFdw can be found in our Multicorn fork.

The missing pieces in PandasFdw

Note that we don't actually handle the aggregation statements without aggregation functions correctly, i.e. ones with only GROUP BY clauses such as SELECT parity FROM pandas_table GROUP BY parity. Although somewhat unusual, they are valid SQL statements, and are analogous to SELECT DISTINCT ... constructs. While our current implementation doesn't support pushdown of any advanced statements with the DISTINCT keyword, it is straightforward to fix grouping-only query execution, simply by handling the case when aggs is None and group_clauses is not.

More importantly, note that we don't do anything with the quals argument in our toy FDW. This argument corresponds to record filtering achieved via the WHERE clause, and in case of non-aggregation queries this is a nice-to-have. In such case, not pushing down quals is still going to result in correct output, albeit in a wasteful way, given that Postgres will double check and re-apply any unapplied WHERE clauses.

But in case of aggregations quals is crucial, since it determines which records are involved in the aggregation, and therefore impacts not only performance but also the result's correctness. Our implementation does take care of this, by demanding specific FDW implementations to provide a list of supported operators which the corresponding remote data source is familiar with. Multicorn will look those up under the key supported_operators in the output of can_pushdown_upperrel, in the absence of which there is no aggregation pushdown of queries involving WHERE clauses.

Conclusion

We've demonstrated our work on extending the Multicorn FDW framework with new capabilities, namely aggregation and grouping pushdown. In particular, this was illustrated using a toy FDW model for querying Pandas dataframes via SQL.

In addition, we've provided the general background as well as some deeper insights concerning aggregations in the context of PostgreSQL FDWs. This includes the inner workings of the aggregation pushdown mechanism itself, as well as a discussion of limitations (unsupported functions and operators) and benefits (speed, lower egress costs).

In subsequent installments in this series, we will explore some real-life FDWs utilizing these features, which we have developed and currently use at Splitgraph.