Splitgraph has been acquired by EDB! Read the blog post.
 
Previous Post
SQLite file uploads
Apr 11, 2023 · By Marko Grujić
READING TIME: 14 min

A Lakehouse by the sea: Migrating Seafowl storage layer to delta-rs

Announcing the replacement of our custom storage layer with Delta Lake, facilitated by its open-source Rust implementation.

Introduction

From the outset one of the key tenets of Seafowl's architecture is separation of compute and storage, true to its aim of providing fast analytical queries at the edge. On one hand, this enables running Seafowl on thin, distributed and scalable nodes or serverless platforms with scale-to-zero capabilities.

On the other hand, the data itself is kept in Parquet files in dedicated object stores, which can provide for cheap and voluminous storage capacities on the cloud. As of recently, the actual physical layout of table data accommodates the Delta Lake semantics, marking a transition from a previous custom storage layer that we used.1 In this blog post we'll be taking a look at what this means in practice, and at the same time expose some basic facts and features of the Delta protocol.

A brief recap of table formats and data lakes

In general, table format refers to the underlying storage arrangement that represents a given table in a given database. In the case of Seafowl, it made most sense to go with flat storage of Parquet files for a variety of performance and storage benefits, a setup commonly referred to as a data lake.

Initially, we implemented a custom Seafowl data lake table format, that (among other things) involved keeping track of which Parquet file corresponds to which table (version) via a metadata catalog (i.e. a Postgres or SQLite DB). This also meant a lot of custom logic for scanning and partition pruning of tables. However, we soon realized that this niche is being commoditized quickly, and that it would be beneficial to migrate to one of the new standards.

In particular, there are three open and competing table formats that bring an ACID layer to data lakes, one of which will likely become the de-facto industry standard eventually. Besides Delta Lake, there are also Apache Iceberg and Apache Hudi, and while they all offer a variety of advanced features and open-source implementations, we've found that the most mature Rust implementation is that of Delta via the delta-rs project. It also has an active and welcoming community of maintainers, so we finally decided to replace our custom storage layer with delta-rs instead.

Lastly, note that in conjunction with other components, namely the internal metadata catalog, and a SQL-native query engine powered by DataFusion, the combined capabilities of Seafowl make up for what is dubbed a Lakehouse.2 This is a term popularized by Databricks, but gaining industry-wide acceptance, which denotes a hybrid of a data lake and a data warehouse.

A quick tour

To demonstrate the new storage layer in action, let's try out a toy example, where we use Seafowl to perform analytical queries on a table that is periodically synced with an external source. First, we'll spin up a Seafowl process:

$ SEAFOWL__FRONTEND__HTTP__WRITE_ACCESS=any ./target/debug/seafowl
 2023-04-11T06:58:04.654Z INFO  seafowl > Starting Seafowl 0.3.2
 2023-04-11T06:58:04.659Z INFO  seafowl > Loading the configuration from seafowl.toml
 2023-04-11T06:58:04.680Z INFO  seafowl > Starting the PostgreSQL frontend on 127.0.0.1:6432
 2023-04-11T06:58:04.680Z WARN  seafowl > The PostgreSQL frontend doesn't have authentication or encryption and should only be used in development!
 2023-04-11T06:58:04.680Z INFO  seafowl > Starting the HTTP frontend on 0.0.0.0:8080
 2023-04-11T06:58:04.680Z INFO  seafowl > HTTP access settings: read any, write any

... and run some queries:

$ curl -H "Content-Type: application/json" http://localhost:8080/q -d@-<<EOF
{"query": "CREATE EXTERNAL TABLE seafowl_issue_reactions
STORED AS TABLE
OPTIONS ('name' '\"splitgraph-demo/seafowl:latest\".issue_reactions')
LOCATION 'postgresql://$SPLITGRAPH_API_KEY:$SPLITGRAPH_API_SECRET@data.splitgraph.com:5432/ddn';

CREATE TABLE reactions AS
SELECT content, issue_number, 2022 as year
FROM staging.seafowl_issue_reactions
WHERE date_part('year', created_at) = 2022;

SELECT content, count(*) as count
FROM reactions
GROUP BY content
ORDER BY count DESC"}
EOF
{"content":"+1","count":5}
{"content":"heart","count":3}
{"content":"hooray","count":1}

The above uses Seafowl's POST endpoint, which isn't cache-friendly, but can run multiple write statements sequentially, and an optional read statement at the end. The queries we run:

  • Create a remote table (analogous to foreign tables in Postgres parlance), which proxies queries to a repository on Splitgraph DDN in which we periodically ingest statistics from Seafowl's GitHub repo.
  • Create the actual table by ingesting all Seafowl GitHub issue reactions from 2022; note that it is only at this point that the data is fetched from the remote (benefiting from any potential filter clause pushdown) and persisted.
  • Perform a simple aggregation query to showcase the results.

Delta protocol 101

Turning our attention over to storage now, which in the default case is our local file system, we can see the layout of files that make up this simple table:

$ ls -l seafowl-data/
total 480
drwxr-xr-x  4 markogrujic  staff     128 Apr 11 08:58 00c853f9-48b6-4531-881a-f5e1f546ed0e
-rw-r--r--  1 markogrujic  staff    4096 Apr 11 08:58 seafowl.sqlite
$ cd seafowl-data/00c853f9-48b6-4531-881a-f5e1f546ed0e/
$ tree .
.
├── _delta_log
│   ├── 00000000000000000000.json
│   └── 00000000000000000001.json
└── part-00000-8f6d21a2-f53a-483f-8f0b-85ff237ef5f6-c000.snappy.parquet

1 directory, 3 files

In Seafowl, each table gets a corresponding UUID, which also serves as the name of the table's root directory, where data files and logs are stored. The root Seafowl data folder (seafowl-data) contains all such table directories (besides seafowl.sqlite, which is the default Seafowl catalog), regardless of the schema/database they belong to. This ensures that any schema or table drops are catalog metadata operations only, and that the actual garbage collection of dropped table folders/files can be done lazily later on.3

Since the remote table that we created first (seafowl_issue_reactions) is an in-memory entity, we only have one physical table (reactions) in storage (00c853f9-48b6-4531-881a-f5e1f546ed0e directory). This table has only one data file for now (the sole .parquet file), as we only inserted a few rows. In the general case it can consist of an arbitrary number of files, dictated by the total input size and the partition chunking configuration.4

Inspecting the log entries

As per the Delta protocol, the log entries represent the transactional history of changes to a table. The elementary unit are JSON files, each corresponding to a particular table version, and named by incrementally increasing numbers starting from 0. These are used to replay all Delta actions taken on a table in order to reproduce the desired table version. This effectively means knowing which files belong to the table state for a given version, also known as a table snapshot, which is very useful during query planning.

In the case of our reactions table, there are two initial log files stemming from the CREATE TABLE reactions AS ... statement. The first one of these represents the zeroth version of the table, and corresponds to the creation of a blank table along with some metadata (time, name, schema, etc.):

$ jq --sort-keys . _delta_log/00000000000000000000.json
{
  "protocol": {
    "minReaderVersion": 1,
    "minWriterVersion": 1
  }
}
{
  "metaData": {
    "configuration": {},
    "createdTime": 1681196324144,
    "description": "Created by Seafowl version 0.3.2",
    "format": {
      "options": {},
      "provider": "parquet"
    },
    "id": "48659c21-0ecf-4e3c-9168-5e056c6d813d",
    "name": "reactions",
    "partitionColumns": [],
    "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"content\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"issue_number\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"year\",\"type\":\"long\",\"nullable\":false,\"metadata\":{}}]}"
  }
}
{
  "commitInfo": {
    "clientVersion": "delta-rs.0.8.0",
    "operation": "CREATE TABLE",
    "operationParameters": {
      "location": "file:///Users/markogrujic/Splitgraph/seafowl-data/00c853f9-48b6-4531-881a-f5e1f546ed0e/",
      "metadata": "{\"configuration\":{},\"created_time\":1681196324144,\"description\":\"Created by Seafowl version 0.3.2\",\"format\":{\"options\":{},\"provider\":\"parquet\"},\"id\":\"48659c21-0ecf-4e3c-9168-5e056c6d813d\",\"name\":\"reactions\",\"partition_columns\":[],\"schema\":{\"fields\":[{\"metadata\":{},\"name\":\"content\",\"nullable\":true,\"type\":\"string\"},{\"metadata\":{},\"name\":\"issue_number\",\"nullable\":true,\"type\":\"long\"},{\"metadata\":{},\"name\":\"year\",\"nullable\":false,\"type\":\"long\"}],\"type\":\"struct\"}}",
      "mode": "ErrorIfExists",
      "protocol": "{\"minReaderVersion\":1,\"minWriterVersion\":1}"
    },
    "timestamp": 1681196324145
  }
}

The second log entry (version 1) represents the writing of data to the table, and contains a single add Delta action. This action modifies the table state by logically attaching the data file to the new version.

$ jq --sort-keys . _delta_log/00000000000000000001.json
{
  "add": {
    "dataChange": true,
    "modificationTime": 1681196324172,
    "partitionValues": {},
    "path": "part-00000-8f6d21a2-f53a-483f-8f0b-85ff237ef5f6-c000.snappy.parquet",
    "size": 1336,
    "stats": "{\"numRecords\":9,\"minValues\":{\"content\":\"+1\",\"issue_number\":57,\"year\":2022},\"maxValues\":{\"year\":2022,\"issue_number\":188,\"content\":\"hooray\"},\"nullCount\":{\"issue_number\":0,\"content\":0,\"year\":0}}",
    "tags": null
  }
}
{
  "commitInfo": {
    "clientVersion": "delta-rs.0.8.0",
    "operation": "WRITE",
    "operationParameters": {
      "mode": "Append"
    },
    "timestamp": 1681196324174
  }
}

Note that while here we break up our CREATE TABLE reactions AS ... statement into two distinct commits (~ CREATE + INSERT), this isn't strictly enforced by the Delta protocol (and may be changed in later Seafowl versions). In fact, many actions can be mixed together when committing a Delta transaction, including creating a new table and attaching some files to it simultaneously.

What the protocol does enforce however, is that the physical files must first be written to the storage, and only in the next phase can a commit referencing these files be attempted. Since the object store abstraction doesn't have locking or transaction primitives, the delta-rs crate provides for the ability to use implementation specific utilities for achieving the required atomicity. For instance in case of AWS S3 object store, there is the option of using DynamoDB as a lock to help with mediating concurrent writes.

DML statements and partition pruning

Each individual Seafowl write statement (INSERT, UPDATE or DELETE) gets executed as a single Delta transaction resulting in a new table version. The INSERT statement is append-only, meaning that all new data is add-ed to the new table version. For instance, if we want to sync our Seafowl GitHub reactions table with fresh 2023 data we can run:

$ curl -H "Content-Type: application/json" http://localhost:8080/q -d@-<<EOF
{"query": "INSERT INTO reactions
SELECT content, issue_number, 2023 as year
FROM staging.seafowl_issue_reactions
WHERE date_part('year', created_at) = 2023;

SELECT content, count(*) as count
FROM reactions
GROUP BY content
ORDER BY count DESC"}
EOF
{"content":"+1","count":9}
{"content":"heart","count":5}
{"content":"hooray","count":2}

This results in the creation of an additional data file and a new table version (2):

$ tree .
.
├── _delta_log
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.json
│   └── 00000000000000000002.json
├── part-00000-3e46aea4-3d7d-408e-8fe6-d09fb13f9d11-c000.snappy.parquet
└── part-00000-8f6d21a2-f53a-483f-8f0b-85ff237ef5f6-c000.snappy.parquet

1 directory, 5 files
$ jq --sort-keys . _delta_log/00000000000000000002.json
{
  "add": {
    "path": "part-00000-3e46aea4-3d7d-408e-8fe6-d09fb13f9d11-c000.snappy.parquet",
    "stats": "{\"numRecords\":7,\"minValues\":{\"issue_number\":137,\"year\":2023,\"content\":\"+1\"},\"maxValues\":{\"content\":\"hooray\",\"issue_number\":349,\"year\":2023},\"nullCount\":{\"issue_number\":0,\"content\":0,\"year\":0}}",
    ...
  }
}
{
  "commitInfo": {
    "operation": "WRITE",
    ...
}

Note that as per the Delta logs stats, the two data files are nicely partitioned by the year, reflecting the order of data insertion. This is very handy, as delta-rs will make sure to optimize the query planning procedure, by scoping the necessary table scans only to relevant data files, via the process known as partition pruning. We can see this clearly when we EXPLAIN a query that uses a filter clause with the year column in it:

$ curl -H "Content-Type: application/json" http://localhost:8080/q -d@-<<EOF
{"query": "EXPLAIN SELECT content
FROM reactions
WHERE year = 2023"}
EOF
{"plan":"Projection: reactions.content\n  Filter: reactions.year = Int64(2023)\n    TableScan: reactions projection=[content, year], partial_filters=[reactions.year = Int64(2023)]","plan_type":"logical_plan"}
{"plan":"ProjectionExec: ... partitions={1 group: [[part-00000-3e46aea4-3d7d-408e-8fe6-d09fb13f9d11-c000.snappy.parquet]]}, predicate=year = Int64(2023), pruning_predicate=year_min@0 <= 2023 AND 2023 <= year_max@1, projection=[content, year]\n","plan_type":"physical_plan"}

Note that only a single partition (the one from version 2) is referenced in the table scan (the partitions field in the physical plan), since the other data file (from version 1) does not contain any records where the year is 2023.

Partition pruning on writes

In contrast to INSERT, the DELETE involves another type of Delta action, named remove, that removes the logical connection between a data file and the new version, though it does not remove the data file physically. The UPDATE statement can contain the mixture of both, as some assignments are made that replace some old values.

To take a slightly contrived example, assume that for instance we want to replace the content in our reactions table with actual emojis that represent them (note that we also record the timestamp prior to that for later use):

$ VERSION_2_TIMESTAMP=$(date -uIseconds)
$ curl -H "Content-Type: application/json" http://localhost:8080/q -d@-<<EOF
{"query": "UPDATE reactions
SET content = CASE
    WHEN content = 'hooray' THEN '🎉'
    WHEN content = 'heart' THEN '❤️'
    WHEN content = '+1' THEN '👍'
END
WHERE year = 2023;

SELECT year, content, count(*) as count
FROM reactions
GROUP BY year, content
ORDER BY count DESC"}
EOF
{"content":"+1","count":5,"year":2022}
{"content":"👍","count":4,"year":2023}
{"content":"heart","count":3,"year":2022}
{"content":"❤️","count":2,"year":2023}
{"content":"hooray","count":1,"year":2022}
{"content":"🎉","count":1,"year":2023}

This leads to table version 3 with a new data file:

$ tree .
.
├── _delta_log
│   ├── 00000000000000000000.json
│   ├── 00000000000000000001.json
│   ├── 00000000000000000002.json
│   └── 00000000000000000003.json
├── part-00000-3d16e87d-04c4-4f6c-9c6a-8769891acbdb-c000.snappy.parquet
├── part-00000-3e46aea4-3d7d-408e-8fe6-d09fb13f9d11-c000.snappy.parquet
└── part-00000-8f6d21a2-f53a-483f-8f0b-85ff237ef5f6-c000.snappy.parquet

1 directory, 7 files

The corresponding log symbolizes the inclusion of the new file to the present table state, as well the removal of the file prior to it:

$ jq --sort-keys . _delta_log/00000000000000000003.json
{
  "add": {
    "dataChange": true,
    "modificationTime": 1681196616085,
    "partitionValues": {},
    "path": "part-00000-3d16e87d-04c4-4f6c-9c6a-8769891acbdb-c000.snappy.parquet",
    "size": 1347,
    "stats": "{\"numRecords\":7,\"minValues\":{\"content\":\"❤️\",\"issue_number\":137,\"year\":2023},\"maxValues\":{\"content\":\"👍\",\"issue_number\":349,\"year\":2023},\"nullCount\":{\"content\":0,\"issue_number\":0,\"year\":0}}",
    "tags": null
  }
}
{
  "remove": {
    "dataChange": true,
    "deletionTimestamp": 1681196616085,
    "extendedFileMetadata": true,
    "partitionValues": {},
    "path": "part-00000-3e46aea4-3d7d-408e-8fe6-d09fb13f9d11-c000.snappy.parquet",
    "size": 1338,
    "tags": null
  }
}
{
  "commitInfo": {
    "clientVersion": "delta-rs.0.8.0",
    "timestamp": 1681196616085
  }
}

This again has to do with partition pruning, but this time in the context of writes. Namely, since the new version overwrites the rows with year 2023, the prior partition that contained those rows is now obsolete, which is why it is being removed from the state. These remove actions are referred to as tombstones in the Delta protocol lingo.

Note that the version 3 log doesn't mention anything related to the initial partition, stemming from version 1, meaning that the original add action is still valid. This is because that partition contains the data where year is 2022, so nothing was changed there by our UPDATE statement.

One more thing worth mentioning is that, besides the regular log entries in the form of versioned JSON files, the Delta protocol prescribes checkpoints as a way to short-circuit the re-play of log actions. These are by default created after every 10 versions, and represent an actual Parquet file that the subsequent log entries should be built upon.

Time travel and cleanup

Finally, note that we can recreate snapshots of earlier table version now, given that both the logs and data files are still there, which facilitates time travel querying. For instance, we can uses this to calculate a SQL diff between the current and the previous table version:

$ curl -H "Content-Type: application/json" http://localhost:8080/q -d@-<<EOF
{"query": "WITH reactions_diff AS (
    SELECT *, 'added' as action FROM (
        SELECT * FROM reactions
        EXCEPT ALL
        SELECT * FROM reactions('$VERSION_2_TIMESTAMP')
    )
    UNION ALL
    SELECT *, 'removed' as action FROM (
        SELECT * FROM reactions('$VERSION_2_TIMESTAMP')
        EXCEPT ALL
        SELECT * FROM reactions
    )
)
SELECT action, content, count(*) as count
FROM reactions_diff
GROUP BY action, content
ORDER BY count DESC"}
EOF
{"action":"removed","content":"+1","count":4}
{"action":"added","content":"👍","count":4}
{"action":"added","content":"❤️","count":2}
{"action":"removed","content":"heart","count":2}
{"action":"added","content":"🎉","count":1}
{"action":"removed","content":"hooray","count":1}

However, once we run VACUUM TABLE reactions command, all tombstones will be deleted, so the time travel to the previous version will not work anymore.

Future outlook

We've only scratched the surface of the Delta protocol capabilities, and are looking forward to further developments and integration with the delta-rs project. One such example would be adding support for partition columns in Seafowl, which would be a natural thing for our example reactions table.

In particular this would mean writing nested data files in separate sub-folders, named after the value of each partition column, i.e. year=2022/part-00000-8f6... and year=2023/part-00000-3d1.... This would enable a more optimal partition pruning strategy. Namely, using the log stats for scoping the relevant partitions as done above results in inexact partition pruning. This means that the source doesn't guarantee that all returned rows pass the provided filter. In turn DataFusion will have to keep the filtering node and do a pass over all the rows to ensure that they abide by the clause used. With partition columns pruning is exact, so that DataFusion will be relieved of the additional work.

In the long term, what gets us most excited is the prospect of integration with projects like Nessie and lakeFS, which provide a data version control layer for table formats such as Delta Lake. This would endow Seafowl with Git-for-data semantics, and thus open a whole new world of possible applications.


  1. In particular as of Seafowl 0.3.x versions. Read-only capability is still supported for older tables created with previous Seafowl version (aka legacy tables), apart from DROP and VACUUM support to facilitate migration.
  2. Zaharia, M.A., Ghodsi, A., Xin, R., & Armbrust, M. (2021). Lakehouse: A New Generation of Open Platforms that Unify Data Warehousing and Advanced Analytics. Conference on Innovative Data Systems Research.
  3. This house cleaning can be performed via the VACUUM DATABASE command.
  4. The threshold size for partition chunking is currently controlled via the max_partition_size config parameter.
Using Dagster with Seafowl