Splitgraph has been acquired by EDB! Read the blog post.

splitgraph.engine package

Subpackages

Module contents

Defines the interface for a Splitgraph engine (a backing database), including running basic SQL commands, tracking tables for changes and uploading/downloading tables to other remote engines.

By default, Splitgraph is backed by Postgres: see splitgraph.engine.postgres for an example of how to implement a different engine.

class splitgraph.engine.ChangeEngine

Bases: splitgraph.engine.SQLEngine, abc.ABC

An SQL engine that can perform change tracking on a set of tables.

discard_pending_changes(schema, table=None)

Discard recorded pending changes for a tracked table or the whole schema

get_change_key(schema: str, table: str) List[Tuple[str, str]]

Returns the key used to identify a row in a change (list of column name, column type). If the tracked table has a PK, we use that; if it doesn’t, the whole row is used.

get_changed_tables(schema)

List tracked tables that have pending changes

Parameters

schema – Schema to check for changes

Returns

List of tables with changed contents

get_pending_changes(schema, table, aggregate=False)

Return pending changes for a given tracked table

Parameters
  • schema – Schema the table belongs to

  • table – Table to return changes for

  • aggregate – Whether to aggregate changes or return them completely

Returns

If aggregate is True: tuple with numbers of (added_rows, removed_rows, updated_rows). If aggregate is False: A changeset. The changeset is a list of(pk, action (0 for Insert, 1 for Delete, 2 for Update), action_data)where action_data is None for Delete and {‘c’: [column_names], ‘v’: [column_values]} that have been inserted/updated otherwise.

get_tracked_tables()
Returns

A list of (table_schema, table_name) that the engine currently tracks for changes

has_pending_changes(schema)

Return True if the tracked schema has pending changes and False if it doesn’t.

track_tables(tables)

Start engine-specific change tracking on a list of tables.

Parameters

tables – List of (table_schema, table_name) to start tracking

untrack_tables(tables)

Stop engine-specific change tracking on a list of tables and delete any pending changes.

Parameters

tables – List of (table_schema, table_name) to start tracking

class splitgraph.engine.ObjectEngine

Bases: object

Routines for storing/applying objects as well as sharing them with other engines.

apply_fragments(objects, target_schema, target_table, extra_quals=None, extra_qual_args=None, schema_spec=None, progress_every: Optional[int] = None)

Apply multiple fragments to a target table as a single-query batch operation.

Parameters
  • objects – List of tuples (object_schema, object_table) that the objects are stored in.

  • target_schema – Schema to apply the fragment to

  • target_table – Table to apply the fragment to

  • extra_quals – Optional, extra SQL (Composable) clauses to filter new rows in the fragment on (e.g. SQL(“a = %s”))

  • extra_qual_args – Optional, a tuple of arguments to use with extra_quals

  • schema_spec – Optional, list of (ordinal, column_name, column_type, is_pk). If not specified, uses the schema of target_table.

  • progress_every – If set, will report the materialization progress via tqdm every progress_every objects.

delete_objects(object_ids)

Delete one or more objects from the engine.

Parameters

object_ids – IDs of objects to delete

download_objects(objects, remote_engine)

Download objects from the remote engine to the local cache

Parameters
  • objects – List of object IDs to download

  • remote_engine – A remote ObjectEngine to download the objects from.

:return List of object IDs that were downloaded.

dump_object(object_id, stream, schema)

Dump an object into a series of SQL statements

Parameters
  • object_id – Object ID

  • stream – Text stream to dump the object into

  • schema – Schema the object lives in

get_object_schema(object_id)

Get the schema of a given object, returned as a list of (ordinal, column_name, column_type, is_pk).

Parameters

object_id – ID of the object

get_object_size(object_id)

Return the on-disk footprint of this object, in bytes :param object_id: ID of the object

store_fragment(inserted, deleted, schema, table, source_schema, source_table, source_schema_spec)

Store a fragment of a changed table in another table

Parameters
  • inserted – List of PKs that have been updated/inserted

  • deleted – List of PKs that have been deleted

  • schema – Schema to store the change in

  • table – Table to store the change in

  • source_schema – Schema the source table is located in

  • source_table – Name of the source table

  • source_schema_spec – Schema of the source table (optional)

store_object(object_id: str, source_query: Union[bytes, psycopg2.sql.Composed, str, psycopg2.sql.SQL], schema_spec: List[splitgraph.core.types.TableColumn], source_query_args: Optional[Sequence[Any]], overwrite: bool)

Stores a Splitgraph object using a source query in the actual format implemented by this engine.

Parameters
  • object_id – Name of the object

  • source_query – SELECT query that produces data required by the object

  • schema_spec – Schema of the source table

  • source_query_args – Arguments to mogrify into the source query.

  • overwrite – If True, will overwrite the object if it already exists.

upload_objects(objects, remote_engine)

Upload objects from the local cache to the remote engine

Parameters
  • objects – List of object IDs to upload

  • remote_engine – A remote ObjectEngine to upload the objects to.

class splitgraph.engine.ResultShape(value)

Bases: enum.Enum

Shape that the result of a query will be coerced to

MANY_MANY = 4
MANY_ONE = 3
NONE = 0
ONE_MANY = 2
ONE_ONE = 1
class splitgraph.engine.SQLEngine

Bases: abc.ABC

Abstraction for a Splitgraph SQL backend. Requires any overriding classes to implement run_sql as well as a few other functions. Together with the information_schema (part of the SQL standard), this class uses those functions to implement some basic database management methods like listing, deleting, creating, dumping and loading tables.

close()

Commit and close the engine’s backing connection

commit()

Commit the engine’s backing connection

copy_table(source_schema: str, source_table: str, target_schema: str, target_table: str, with_pk_constraints: bool = True) None

Copy a table in the same engine, optionally applying primary key constraints as well.

create_schema(schema: str) None

Create a schema if it doesn’t exist

create_table(schema: Optional[str], table: str, schema_spec: List[splitgraph.core.types.TableColumn], unlogged: bool = False, temporary: bool = False, include_comments: bool = False) None

Creates a table using a previously-dumped table schema spec

Parameters
  • schema – Schema to create the table in

  • table – Table name to create

  • schema_spec – TableSchema

  • unlogged – If True, the table won’t be reflected in the WAL or scanned by the analyzer/autovacuum.

  • temporary – If True, a temporary table is created (the schema parameter is ignored)

  • include_comments – If True, also adds COMMENT statements for columns that have them.

delete_schema(schema: str) None

Delete a schema if it exists, including all the tables in it.

delete_table(schema: str, table: str) None

Drop a table from a schema if it exists

static dump_table_creation(schema: Optional[str], table: str, schema_spec: List[splitgraph.core.types.TableColumn], unlogged: bool = False, temporary: bool = False, include_comments: bool = False) Tuple[psycopg2.sql.Composed, Tuple]

Dumps the DDL for a table using a previously-dumped table schema spec

Parameters
  • schema – Schema to create the table in

  • table – Table name to create

  • schema_spec – TableSchema

  • unlogged – If True, the table won’t be reflected in the WAL or scanned by the analyzer/autovacuum.

  • temporary – If True, a temporary table is created (the schema parameter is ignored)

  • include_comments – If True, also adds COMMENT statements for columns that have them.

Returns

An SQL statement that reconstructs the table schema + args to be mogrified into it.

dump_table_sql(schema, table_name, stream, columns='*', where='', where_args=None, target_schema=None, target_table=None)

Dump the table contents in the SQL format :param schema: Schema the table is located in :param table_name: Name of the table :param stream: A file-like object to write the result into. :param columns: SQL column spec. Default ‘*’. :param where: Optional, an SQL WHERE clause :param where_args: Arguments for the optional WHERE clause. :param target_schema: Schema to create the table in (default same as schema) :param target_table: Name of the table to insert data into (default same as table_name)

get_all_tables(schema: str) List[str]

Get all tables in a given schema.

get_full_table_schema(schema: str, table_name: str) List[splitgraph.core.types.TableColumn]

Generates a list of (column ordinal, name, data type, is_pk, column comment), used to detect schema changes like columns being dropped/added/renamed or type changes.

NB this doesn’t work for temporary tables (pg_temp) and returns an empty schema.

get_primary_keys(schema, table)

Get a list of (column_name, column_type) denoting the primary keys of a given table.

get_table_type(schema: str, table: str) Optional[str]

Get the type of the table (BASE or FOREIGN)

initialize()

Does any required initialization of the engine

lock_table(schema, table)

Acquire an exclusive lock on a given table, released when the transaction commits / rolls back.

rollback()

Rollback the engine’s backing connection

run_sql(statement, arguments=None, return_shape=ResultShape.MANY_MANY, named=False)

Run an arbitrary SQL statement with some arguments, return an iterator of results. If the statement doesn’t return any results, return None. If named=True, return named tuples when possible.

run_sql_batch(statement, arguments, schema=None)

Run a parameterized SQL statement against multiple sets of arguments.

Parameters
  • statement – Statement to run

  • arguments – Query arguments

  • schema – Schema to run the statement in

run_sql_in(schema: str, sql: Union[psycopg2.sql.Composed, str], arguments: None = None, return_shape: splitgraph.engine.ResultShape = ResultShape.MANY_MANY) Any

Executes a non-schema-qualified query against a specific schema.

Parameters
  • schema – Schema to run the query in

  • sql – Query

  • arguments – Query arguments

  • return_shape – ReturnShape to coerce the result into.

savepoint(name: str) Iterator[None]

At the beginning of this context manager, a savepoint is initialized and any database error that occurs in run_sql results in a rollback to this savepoint rather than the rollback of the whole transaction. At exit, the savepoint is released.

schema_exists(schema: str) bool

Check if a schema exists on the engine.

Parameters

schema – Schema name

table_exists(schema: str, table_name: str) bool

Check if a table exists on the engine.

Parameters
  • schema – Schema name

  • table_name – Table name

class splitgraph.engine.SavepointStack

Bases: _thread._local

splitgraph.engine.get_engine(name: Optional[str] = None, use_socket: bool = False, use_fdw_params: bool = False, autocommit: bool = False) PostgresEngine

Get the current global engine or a named remote engine

Parameters
  • name – Name of the remote engine as specified in the config. If None, the current global engine is returned.

  • use_socket – Use a local UNIX socket instead of PG_HOST, PG_PORT for LOCAL engine connections.

  • use_fdw_params – Use the _FDW connection parameters (SG_ENGINE_FDW_HOST/PORT). By default, will infer from the global splitgraph.config.IN_FDW flag.

  • autocommit – If True, the engine will not open SQL transactions implicitly.

splitgraph.engine.set_engine(engine: PostgresEngine) None

Switch the global engine to a different one.

Parameters

engine – Engine

splitgraph.engine.switch_engine(engine: PostgresEngine) Iterator[None]

Switch the global engine to a different one. The engine will get switched back on exit from the context manager.

Parameters

engine – Engine

splitgraph.engine.validate_type(t: str) str