splitgraph.engine.postgres package

Module contents


splitgraph.engine.postgres.engine module

Default Splitgraph engine: uses PostgreSQL to store metadata and actual objects and an audit stored procedure to track changes, as well as the Postgres FDW interface to upload/download objects to/from other Postgres engines.

class splitgraph.engine.postgres.engine.AuditTriggerChangeEngine(name: Optional[str], conn_params: Optional[Dict[str, Optional[str]]] = None, pool: Optional[psycopg2.pool.AbstractConnectionPool] = None, autocommit: bool = False, registry: bool = False, in_fdw: bool = False, check_version: bool = True)

Bases: splitgraph.engine.postgres.engine.PsycopgEngine, splitgraph.engine.ChangeEngine

Change tracking based on an audit trigger stored procedure

discard_pending_changes(schema: str, table: Optional[str] = None)None

Discard recorded pending changes for a tracked schema / table

get_changed_tables(schema: str)List[str]

Get list of tables that have changed content

get_pending_changes(schema: str, table: str, aggregate: bool = False)Union[List[Tuple[int, int]], List[Tuple[Tuple[str, ], bool, Dict[str, Any], Dict[str, Any]]]]

Return pending changes for a given tracked table

  • schema – Schema the table belongs to

  • table – Table to return changes for

  • aggregate – Whether to aggregate changes or return them completely


If aggregate is True: List of tuples of (change_type, number of rows). If aggregate is False: List of (primary_key, change_type, change_data)

get_tracked_tables()List[Tuple[str, str]]

Return a list of tables that the audit trigger is working on.

has_pending_changes(schema: str)bool

Return True if the tracked schema has pending changes and False if it doesn’t.

track_tables(tables: List[Tuple[str, str]])None

Install the audit trigger on the required tables

untrack_tables(tables: List[Tuple[str, str]])None

Remove triggers from tables and delete their pending changes

class splitgraph.engine.postgres.engine.PostgresEngine(name: Optional[str], conn_params: Optional[Dict[str, Optional[str]]] = None, pool: Optional[psycopg2.pool.AbstractConnectionPool] = None, autocommit: bool = False, registry: bool = False, in_fdw: bool = False, check_version: bool = True)

Bases: splitgraph.engine.postgres.engine.AuditTriggerChangeEngine, splitgraph.engine.ObjectEngine

An implementation of the Postgres engine for Splitgraph

apply_fragments(objects: List[Tuple[str, str]], target_schema: str, target_table: str, extra_quals: Optional[psycopg2.sql.Composed] = None, extra_qual_args: Optional[Tuple[str]] = None, schema_spec: Optional[List[splitgraph.core.types.TableColumn]] = None, progress_every: Optional[int] = None)None

Apply multiple fragments to a target table as a single-query batch operation.

  • objects – List of tuples (object_schema, object_table) that the objects are stored in.

  • target_schema – Schema to apply the fragment to

  • target_table – Table to apply the fragment to

  • extra_quals – Optional, extra SQL (Composable) clauses to filter new rows in the fragment on (e.g. SQL(“a = %s”))

  • extra_qual_args – Optional, a tuple of arguments to use with extra_quals

  • schema_spec – Optional, list of (ordinal, column_name, column_type, is_pk). If not specified, uses the schema of target_table.

  • progress_every – If set, will report the materialization progress via tqdm every progress_every objects.

delete_objects(object_ids: List[str])None

Delete one or more objects from the engine.


object_ids – IDs of objects to delete

download_objects(objects: List[str], remote_engine: splitgraph.engine.postgres.engine.PostgresEngine)List[str]

Download objects from the remote engine to the local cache

  • objects – List of object IDs to download

  • remote_engine – A remote ObjectEngine to download the objects from.

:return List of object IDs that were downloaded.

dump_object(object_id: str, stream: _io.TextIOWrapper, schema: str)None

Dump an object into a series of SQL statements

  • object_id – Object ID

  • stream – Text stream to dump the object into

  • schema – Schema the object lives in

dump_object_creation(object_id: str, schema: str, table: Optional[str] = None, schema_spec: Optional[List[splitgraph.core.types.TableColumn]] = None, if_not_exists: bool = False)bytes

Generate the SQL that remounts a foreign table pointing to a Splitgraph object.

  • object_id – Name of the object

  • schema – Schema to create the table in

  • table – Name of the table to mount

  • schema_spec – Schema of the table

  • if_not_exists – Add IF NOT EXISTS to the DDL


SQL in bytes format.

get_change_key(schema: str, table: str)List[Tuple[str, str]]

Returns the key used to identify a row in a change (list of column name, column type). If the tracked table has a PK, we use that; if it doesn’t, the whole row is used.

get_object_schema(object_id: str)List[splitgraph.core.types.TableColumn]

Get the schema of a given object, returned as a list of (ordinal, column_name, column_type, is_pk).


object_id – ID of the object

get_object_size(object_id: str)int

Return the on-disk footprint of this object, in bytes :param object_id: ID of the object

mount_object(object_id: str, table: None = None, schema: str = 'splitgraph_meta', schema_spec: Optional[List[splitgraph.core.types.TableColumn]] = None)None

Mount an object from local storage as a foreign table.

  • object_id – ID of the object

  • table – Table to mount the object into

  • schema – Schema to mount the object into

  • schema_spec – Schema of the object.

rename_object(old_object_id: str, new_object_id: str)
store_fragment(inserted: Any, deleted: Any, schema: str, table: str, source_schema: str, source_table: str, source_schema_spec: Optional[List[splitgraph.core.types.TableColumn]] = None)None

Store a fragment of a changed table in another table

  • inserted – List of PKs that have been updated/inserted

  • deleted – List of PKs that have been deleted

  • schema – Schema to store the change in

  • table – Table to store the change in

  • source_schema – Schema the source table is located in

  • source_table – Name of the source table

  • source_schema_spec – Schema of the source table (optional)

store_object(object_id: str, source_query: Union[bytes, psycopg2.sql.Composed, str, psycopg2.sql.SQL], schema_spec: List[splitgraph.core.types.TableColumn], source_query_args=None, overwrite=False)None

Stores a Splitgraph object using a source query in the actual format implemented by this engine.

  • object_id – Name of the object

  • source_query – SELECT query that produces data required by the object

  • schema_spec – Schema of the source table

  • source_query_args – Arguments to mogrify into the source query.

  • overwrite – If True, will overwrite the object if it already exists.


Scan through local object storage and synchronize it with the foreign tables in splitgraph_meta (unmounting non-existing objects and mounting existing ones).

unmount_objects(object_ids: List[str])None

Unmount objects from splitgraph_meta (this doesn’t delete the physical files.

upload_objects(objects: List[str], remote_engine: splitgraph.engine.postgres.engine.PostgresEngine)None

Upload objects from the local cache to the remote engine

  • objects – List of object IDs to upload

  • remote_engine – A remote ObjectEngine to upload the objects to.

class splitgraph.engine.postgres.engine.PsycopgEngine(name: Optional[str], conn_params: Optional[Dict[str, Optional[str]]] = None, pool: Optional[psycopg2.pool.AbstractConnectionPool] = None, autocommit: bool = False, registry: bool = False, in_fdw: bool = False, check_version: bool = True)

Bases: splitgraph.engine.SQLEngine

Postgres SQL engine backed by a Psycopg connection.


Commit and close the engine’s backing connection


Close and release all other connections to the connection pool.


Commit the engine’s backing connection

property connection

Engine-internal Psycopg connection.


Return a cursor that can be used for copy_expert operations

delete_database(database: str)None

Helper function to drop a database using the admin connection


database – Database name to drop

dump_table_sql(schema: str, table_name: str, stream: _io.TextIOWrapper, columns: str = '*', where: str = '', where_args: Optional[Union[List[str], Tuple[str, str]]] = None, target_schema: Optional[str] = None, target_table: Optional[str] = None)None

Dump the table contents in the SQL format :param schema: Schema the table is located in :param table_name: Name of the table :param stream: A file-like object to write the result into. :param columns: SQL column spec. Default ‘*’. :param where: Optional, an SQL WHERE clause :param where_args: Arguments for the optional WHERE clause. :param target_schema: Schema to create the table in (default same as schema) :param target_table: Name of the table to insert data into (default same as table_name)

get_primary_keys(schema: str, table: str)List[Tuple[str, str]]

Inspects the Postgres information_schema to get the primary keys for a given table.

initialize(skip_object_handling: bool = False, skip_create_database: bool = False)None

Create the Splitgraph Postgres database and install the audit trigger

  • skip_object_handling – If True, skips installation of audit triggers and other object management routines for engines that don’t need change tracking or checkouts.

  • skip_create_database – Don’t create the Splitgraph database

lock_table(schema: str, table: str)None

Acquire an exclusive lock on a given table, released when the transaction commits / rolls back.


Rollback the engine’s backing connection

run_api_call(call: str, *args, schema: str = 'splitgraph_api')Any
run_api_call_batch(call: str, argslist, schema: str = 'splitgraph_api')
run_chunked_sql(statement: Union[bytes, psycopg2.sql.Composed, str, psycopg2.sql.SQL], arguments: Sequence[Any], return_shape: Optional[splitgraph.engine.ResultShape] = <ResultShape.MANY_MANY: 4>, chunk_size: int = 1000, chunk_position: int = -1)Any

Because the Splitgraph API has a request size limitation, certain SQL calls with variadic arguments are going to be too long to fit that. This function runs an SQL query against a set of broken up arguments and returns the combined result.

run_sql(statement: Union[bytes, psycopg2.sql.Composed, str, psycopg2.sql.SQL], arguments: Optional[Sequence[Any]] = None, return_shape: Optional[splitgraph.engine.ResultShape] = <ResultShape.MANY_MANY: 4>, named: bool = False)Any

Run an arbitrary SQL statement with some arguments, return an iterator of results. If the statement doesn’t return any results, return None. If named=True, return named tuples when possible.

run_sql_batch(statement: Union[psycopg2.sql.Composed, str], arguments: Any, schema: Optional[str] = None, max_size=261000)None

Run a parameterized SQL statement against multiple sets of arguments.

  • statement – Statement to run

  • arguments – Query arguments

  • schema – Schema to run the statement in

property splitgraph_version

Returns the version of the Splitgraph library installed on the engine and by association the version of the engine itself.

splitgraph.engine.postgres.engine.add_ud_flag_column(table_schema: List[splitgraph.core.types.TableColumn])List[splitgraph.core.types.TableColumn]
splitgraph.engine.postgres.engine.chunk(sequence: Sequence[T], chunk_size: int = 1000)Iterator[List[T]]
splitgraph.engine.postgres.engine.get_change_key(schema_spec: List[splitgraph.core.types.TableColumn])List[Tuple[str, str]]
splitgraph.engine.postgres.engine.get_conn_str(conn_params: Dict[str, str])str