Splitgraph has been acquired by EDB! Read the blog post.
 
Previous Post
(Ab)using CDNs for SQL queries
Nov 18, 2022 · By Marko Grujić
READING TIME: 10 min

Table partitioning and time travel queries: Seafowl case study

We discuss how Seafowl performs table partitioning to enable efficient versioning and time travel queries

Note (April 2023): In the meantime we've migrated Seafowl's storage layer to use the Delta Lake protocol. While the high-level functionality and feature-set remain the same, some implementation details in this blog post are no longer up to date.

The primary specialty of Seafowl is data-crunching over large slowly-changing data sets at the edge. It does so by leveraging the standard HTTP cache semantics natively in the provided query execution endpoint(s).

The query result then becomes a regular static HTTP asset, eligible for caching at various locations (from the browser to the CDN), thus saving on the compute time. Seafowl also utilizes the usual HTTP cache revalidation mechanism, so that when the underlying data does change, the query is re-run in a performant manner (thanks to Rust and Apache DataFusion). In a sense, it can be said that Seafowl is a fast, lightweight and self-contained version of the Splitgraph DDN

Despite being an analytical query engine at it's core, Seafowl (like the DDN) also supports writing data using DDL (CREATE, ALTER and DROP) as well as DML (INSERT, UPDATE and DELETE) SQL statements. In this blog post we'll take a look at the interplay between such statements and the way Seafowl chunks and stores the data in partition objects. We'll also describe how table versioning and query time travel concepts naturally arise in this context.

Table version and partition dynamics

To start off, let's create a new table with some dummy data after spinning up a Seafowl instance. There are a number of options for getting data into Seafowl, such as pointing a table to a source table from a remote database, or by pointing a table to a local or remote CSV/Parquet file.

For this example though let's use the upload endpoint. It is intended for when you can't or don't want to place your data file on the same machine where Seafowl is running, nor spin up a one-off server for serving it over HTTP:

$ $ cat > some_numbers.csv << EOF
> name,number
> one,1
> two,2
> EOF
$ curl \
  -H "Authorization: Bearer write_password" \
  -F "data=@some_numbers.csv" \
  http://localhost:8080/upload/some/numbers
done

Even though the primary speciality of Seafowl is querying over HTTP, for the sake of conciseness we'll run the rest of the queries in this blog post via a client. Specifically, we'll exploit the fact that Seafowl can also expose a PostgreSQL-compatible endpoint and use psql:

seafowl=> SELECT * FROM some.numbers;
 name | number
------+--------
 one  |      1
 two  |      2
(2 rows)

seafowl=> SELECT column_name, data_type FROM information_schema.columns
seafowl-> WHERE table_name = 'numbers';
 column_name | data_type
-------------+-----------
 name        | Utf8
 number      | Int64
(2 rows)

No surprises there; the data was uploaded into a table and the schema got introspected since we haven't specified one1. Since we've just created it, our table so far only has a single version and a single partition, which in Seafowl are stored as Parquet files. To verify this we can utilize a couple of system tables that Seafowl provides, providing insights into the table metadata:

seafowl=> SELECT * FROM system.table_versions;
 table_schema | table_name | table_version_id |    creation_time
--------------+------------+------------------+---------------------
 some         | numbers    |                1 | 2022-11-18 13:11:24
(1 row)

seafowl=> SELECT table_version_id, table_partition_id, object_storage_id, row_count FROM system.table_partitions;
 table_version_id | table_partition_id |                            object_storage_id                             | row_count
------------------+--------------------+--------------------------------------------------------------------------+-----------
                1 |                  1 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet |         2
(1 row)

Given that we haven't specified a custom object store, Seafowl defaults to using the local file system to store the objects in the seafowl-data folder:

$ ls -l seafowl-data/ | grep .parquet
-rw------- 1 ubuntu ubuntu    854 Nov 18 13:11 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet

Our table consists of only one partition file since our example data is tiny; had the uploaded file been sufficiently large2, Seafowl would have partitioned it into multiple Parquet files.

Modifying the data with the INSERT command

When it comes to adding new data via an INSERT statement Seafowl will create new partition(s) from the input, and link-up with existing partitions of the referenced table to make a new version of it. Hence, each separate invocation generates at least one partition3:

seafowl=> INSERT INTO some.numbers VALUES ('three', 3), ('four', 4), ('five', 5);
...
seafowl=> INSERT INTO some.numbers VALUES ('one', 1), ('two', 2);
seafowl=> SELECT * FROM some.numbers;
 name  | number
-------+--------
 one   |      1
 two   |      2
 one   |      1
 two   |      2
 three |      3
 four  |      4
 five  |      5
(7 rows)

The combined effect of the preceding two INSERT statements is the creation of two new table versions (2 and 3). Each one inherits all the partitions from the previous version:

seafowl=> SELECT * FROM system.table_versions;
 table_schema | table_name | table_version_id |    creation_time
--------------+------------+------------------+---------------------
 some         | numbers    |                1 | 2022-11-18 13:11:24
 some         | numbers    |                2 | 2022-11-18 13:12:58
 some         | numbers    |                3 | 2022-11-18 13:13:35
(3 rows)

seafowl=> SELECT table_version_id, table_partition_id, object_storage_id, row_count  FROM system.table_partitions;
 table_version_id | table_partition_id |                            object_storage_id                             | row_count
------------------+--------------------+--------------------------------------------------------------------------+-----------
                1 |                  1 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet |         2
                2 |                  1 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet |         2
                2 |                  2 | b78bb0a0336ed215c3c0cbb0451f676ced19f3987f3c5519e776507ef832dfd8.parquet |         3
                3 |                  1 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet |         2
                3 |                  2 | b78bb0a0336ed215c3c0cbb0451f676ced19f3987f3c5519e776507ef832dfd8.parquet |         3
                3 |                  3 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet |         2
(6 rows)

However, note that the last INSERT added the same data that was used to create the table initially via the upload endpoint. Therefore, even though our table now has three logical partitions, two of them (table_partition_id 1 and 3) are physically identical, pointing to the same file:

$ ls -l seafowl-data/ | grep .parquet
-rw------- 1 ubuntu ubuntu    854 Nov 18 13:13 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet
-rw------- 1 ubuntu ubuntu    883 Nov 18 13:12 b78bb0a0336ed215c3c0cbb0451f676ced19f3987f3c5519e776507ef832dfd8.parquet

This example illustrates how this partitioning scheme naturally lends itself to supporting time travel querying that we'll discuss shortly. In particular, what makes this proposition very appealing is that by storing the objects for the latest/current table version, we're also able to re-construct prior table versions with minimal additional storage overhead due to partition re-use. In this particular example three different table versions can be obtained from just two objects.

Modifying the data with the UPDATE and DELETE command

Unlike INSERT, the two remaining DML statements will in general replace some of the existing partitions with new ones. The specifics depend on the exact qualifier used in the WHERE clause, as well as the statement type:

seafowl=> DELETE FROM some.numbers WHERE number = 3 OR number = 5;
...
seafowl=> UPDATE some.numbers SET name = 'two', number = 2 WHERE number = 1;
seafowl=> SELECT * FROM some.numbers;
 name | number
------+--------
 four |      4
 two  |      2
 two  |      2
 two  |      2
 two  |      2
(5 rows)

seafowl=> SELECT * FROM system.table_versions where table_version_id > 3;
 table_schema | table_name | table_version_id |    creation_time
--------------+------------+------------------+---------------------
 some         | numbers    |                4 | 2022-11-18 13:15:18
 some         | numbers    |                5 | 2022-11-18 13:15:53
(2 rows)

Here, the DELETE statement specifically targets the 2nd partition (created by the first INSERT statement), while leaving the other two partitions (which are actually identical) unchanged and eligible for re-use. The targeted partition is processed, and the rows matching the qualifier selection are filtered out, thus leading to a new, in this case smaller, partition (or no partition at all if all rows are filtered out). When there are multiple partitions that match the selection, for the sake of optimal partition size, we bundle them together, and then perform required row filtration followed by potential re-chunking of partitions.

The UPDATE on the other hand leaves the total number of rows unchanged, but is also subject to merging of partitions that need to be processed. In the above case, UPDATE targets the two "twin" partitions, changes two out of four rows, and merges the result in one final partition:

seafowl=> SELECT table_version_id, table_partition_id, object_storage_id, row_count
seafowl-> FROM system.table_partitions WHERE table_version_id > 3;
 table_version_id | table_partition_id |                            object_storage_id                             | row_count
------------------+--------------------+--------------------------------------------------------------------------+-----------
                4 |                  1 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet |         2
                4 |                  3 | 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet |         2
                4 |                  4 | 78ae77c39b4eac1a80d7238af473af82a8038b41aebb908c890dd78323a48d65.parquet |         1
                5 |                  4 | 78ae77c39b4eac1a80d7238af473af82a8038b41aebb908c890dd78323a48d65.parquet |         1
                5 |                  5 | da56d46a18b13f3d67c2816f107a837d41c760c8d67afcc015c334375f5a5e87.parquet |         4
(5 rows)

As a final result we now have a total of four partition objects:

$ ls -l seafowl-data/ | grep .parquet
-rw------- 1 ubuntu ubuntu    854 Nov 18 13:13 073cfac570ff05e6b0c23670dff787b2d8e15399af4654715ce078f1d0ad53ef.parquet
-rw------- 1 ubuntu ubuntu    844 Nov 18 13:15 78ae77c39b4eac1a80d7238af473af82a8038b41aebb908c890dd78323a48d65.parquet
-rw------- 1 ubuntu ubuntu    883 Nov 18 13:12 b78bb0a0336ed215c3c0cbb0451f676ced19f3987f3c5519e776507ef832dfd8.parquet
-rw------- 1 ubuntu ubuntu    835 Nov 18 13:15 da56d46a18b13f3d67c2816f107a837d41c760c8d67afcc015c334375f5a5e87.parquet

This partition-scoping mechanism for UPDATE/DELETE ultimately builds on top of partition pruning that Seafowl uses for reducing the number of objects needed for scanning in response to usual SELECT queries with filtration. In turn, partition pruning relies on simple statistics gathered during partition generation, such as max/min value for each column, as well as the presence/count of null values. Consequently, if a particular column can't/doesn't have such statistics collected the partition pruning doesn't work, and all table partitions need to be processed.

Table time travel

Having demonstrated how older table versions can be re-constructed from the constituent objects, let's discuss how that looks in practice. First, a bit of background on the way table resolution works in general.

Each query invocation is accompanied by a fresh Seafowl context, wrapping the inner DataFusion context itself. The wrapped context in turn contains the nested maps of all databases (also referred to as catalogs) -> schemas -> tables, so that it can always determine which particular table implementations are needed for executing the given query. By default all latest table versions at the moment of query invocation are populated in the nested maps.

Temporal querying syntax

To support time travel queries, we opted for (ab)using the table function syntax4, whereby the target table reference can have a timestamp value specified inside parenthesis. For instance to go back and inspect the initial table contents or the contents after the two INSERT's (versions 1 and 3, respectively) we can use:

seafowl=> SELECT * FROM some.numbers('2022-11-18 13:13:35');
 name  | number
-------+--------
 one   |      1
 two   |      2
 three |      3
 four  |      4
 five  |      5
 one   |      1
 two   |      2
(7 rows)

seafowl=> SELECT * FROM some.numbers('2022-11-18 13:11:24');
 name | number
------+--------
 one  |      1
 two  |      2
(2 rows)

This works by Seafowl doing an initial walk over the query AST during the logical planning phase, and recording the presence (and value) of temporal versions used in the query. If none are found, then the query proceeds as usual, with the default context table map unchanged.

On the other hand, if Seafowl detects that time travel querying is used for some table(s) it then performs triage for the exact table_version_id required. Once it has that information it will rewrite the affected table names in the AST to a unique one using the specific table_version_id fetched.

Finally, Seafowl updates the context table mapping for each table version specified with the unique name that was used in the AST rewrite. Note that this only involves fetching the table metadata, such as which particular partitions correspond to a given table_version_id. Loading of the partitions themselves is only done during the scan phase, once all the optimizations have been performed, and we have the table filters handy for partition pruning.

Future work

There are a couple of limitations in Seafowl time travel implementation that we aim to improve down the line. One such example is lack of support for using time travel in write queries, as this mechanism is only supported for SELECT queries at present.

Another would be adding more flexibility to our existing time travel syntax, for instance by adding relative time (e.g. table version from exactly 2 days ago) and version specification support (e.g. table version 2 versions prior to the latest one). Alternatively we may also eventually align it with the AS OF syntax standard that is gaining more traction in the industry.

Nonetheless, the present mechanism is quite robust and versatile, capable of supporting advanced queries, such as CTEs and table JOINs that use multiple different versions of the same table at once.


  1. For CSV file uploads you can also explicitly specify a desired schema. Parquet files have an embedded schema so this is not required.
  2. The threshold size for partition chunking is currently controlled via the max_partition_size config parameter.
  3. Since Seafowl usually assumes relatively infrequent and large writes this shouldn't lead to suboptimal partition sizes. However, if that happens to be the case one can always force re-partitioning of an entire table using CREATE TABLE AS statement.
  4. Temporal table version selection syntax has been part of the ANSI SQL since 2011, but in our case it was technically simpler to go with the table function syntax.
Extending Seafowl with WebAssembly