splitgraph.engine package
Module contents¶
Defines the interface for a Splitgraph engine (a backing database), including running basic SQL commands, tracking tables for changes and uploading/downloading tables to other remote engines.
By default, Splitgraph is backed by Postgres: see splitgraph.engine.postgres
for an example of how to implement a different engine.
- class
splitgraph.engine.
ChangeEngine
¶ Bases:
splitgraph.engine.SQLEngine
,abc.ABC
An SQL engine that can perform change tracking on a set of tables.
discard_pending_changes
(schema, table=None)¶Discard recorded pending changes for a tracked table or the whole schema
get_change_key
(schema: str, table: str) → List[Tuple[str, str]]¶Returns the key used to identify a row in a change (list of column name, column type). If the tracked table has a PK, we use that; if it doesn’t, the whole row is used.
get_changed_tables
(schema)¶List tracked tables that have pending changes
- Parameters
schema – Schema to check for changes
- Returns
List of tables with changed contents
get_pending_changes
(schema, table, aggregate=False)¶Return pending changes for a given tracked table
- Parameters
schema – Schema the table belongs to
table – Table to return changes for
aggregate – Whether to aggregate changes or return them completely
- Returns
If aggregate is True: tuple with numbers of (added_rows, removed_rows, updated_rows). If aggregate is False: A changeset. The changeset is a list of(pk, action (0 for Insert, 1 for Delete, 2 for Update), action_data)where action_data is None for Delete and {‘c’: [column_names], ‘v’: [column_values]} that have been inserted/updated otherwise.
get_tracked_tables
()¶- Returns
A list of (table_schema, table_name) that the engine currently tracks for changes
has_pending_changes
(schema)¶Return True if the tracked schema has pending changes and False if it doesn’t.
track_tables
(tables)¶Start engine-specific change tracking on a list of tables.
- Parameters
tables – List of (table_schema, table_name) to start tracking
untrack_tables
(tables)¶Stop engine-specific change tracking on a list of tables and delete any pending changes.
- Parameters
tables – List of (table_schema, table_name) to start tracking
- class
splitgraph.engine.
ObjectEngine
¶ Bases:
object
Routines for storing/applying objects as well as sharing them with other engines.
apply_fragments
(objects, target_schema, target_table, extra_quals=None, extra_qual_args=None, schema_spec=None, progress_every: Optional[int] = None)¶Apply multiple fragments to a target table as a single-query batch operation.
- Parameters
objects – List of tuples (object_schema, object_table) that the objects are stored in.
target_schema – Schema to apply the fragment to
target_table – Table to apply the fragment to
extra_quals – Optional, extra SQL (Composable) clauses to filter new rows in the fragment on (e.g. SQL(“a = %s”))
extra_qual_args – Optional, a tuple of arguments to use with extra_quals
schema_spec – Optional, list of (ordinal, column_name, column_type, is_pk). If not specified, uses the schema of target_table.
progress_every – If set, will report the materialization progress via tqdm every progress_every objects.
delete_objects
(object_ids)¶Delete one or more objects from the engine.
- Parameters
object_ids – IDs of objects to delete
download_objects
(objects, remote_engine)¶Download objects from the remote engine to the local cache
- Parameters
objects – List of object IDs to download
remote_engine – A remote ObjectEngine to download the objects from.
:return List of object IDs that were downloaded.
dump_object
(object_id, stream, schema)¶Dump an object into a series of SQL statements
- Parameters
object_id – Object ID
stream – Text stream to dump the object into
schema – Schema the object lives in
get_object_schema
(object_id)¶Get the schema of a given object, returned as a list of (ordinal, column_name, column_type, is_pk).
- Parameters
object_id – ID of the object
get_object_size
(object_id)¶Return the on-disk footprint of this object, in bytes :param object_id: ID of the object
store_fragment
(inserted, deleted, schema, table, source_schema, source_table, source_schema_spec)¶Store a fragment of a changed table in another table
- Parameters
inserted – List of PKs that have been updated/inserted
deleted – List of PKs that have been deleted
schema – Schema to store the change in
table – Table to store the change in
source_schema – Schema the source table is located in
source_table – Name of the source table
source_schema_spec – Schema of the source table (optional)
store_object
(object_id: str, source_query: Union[bytes, psycopg2.sql.Composed, str, psycopg2.sql.SQL], schema_spec: List[splitgraph.core.types.TableColumn], source_query_args: Optional[Sequence[Any]], overwrite: bool)¶Stores a Splitgraph object using a source query in the actual format implemented by this engine.
- Parameters
object_id – Name of the object
source_query – SELECT query that produces data required by the object
schema_spec – Schema of the source table
source_query_args – Arguments to mogrify into the source query.
overwrite – If True, will overwrite the object if it already exists.
upload_objects
(objects, remote_engine)¶Upload objects from the local cache to the remote engine
- Parameters
objects – List of object IDs to upload
remote_engine – A remote ObjectEngine to upload the objects to.
- class
splitgraph.engine.
ResultShape
(value)¶ Bases:
enum.Enum
Shape that the result of a query will be coerced to
MANY_MANY
= 4¶
MANY_ONE
= 3¶
NONE
= 0¶
ONE_MANY
= 2¶
ONE_ONE
= 1¶
- class
splitgraph.engine.
SQLEngine
¶ Bases:
abc.ABC
Abstraction for a Splitgraph SQL backend. Requires any overriding classes to implement run_sql as well as a few other functions. Together with the information_schema (part of the SQL standard), this class uses those functions to implement some basic database management methods like listing, deleting, creating, dumping and loading tables.
close
()¶Commit and close the engine’s backing connection
commit
()¶Commit the engine’s backing connection
copy_table
(source_schema: str, source_table: str, target_schema: str, target_table: str, with_pk_constraints: bool = True) → None¶Copy a table in the same engine, optionally applying primary key constraints as well.
create_schema
(schema: str) → None¶Create a schema if it doesn’t exist
create_table
(schema: Optional[str], table: str, schema_spec: List[splitgraph.core.types.TableColumn], unlogged: bool = False, temporary: bool = False, include_comments: bool = False) → None¶Creates a table using a previously-dumped table schema spec
- Parameters
schema – Schema to create the table in
table – Table name to create
schema_spec – TableSchema
unlogged – If True, the table won’t be reflected in the WAL or scanned by the analyzer/autovacuum.
temporary – If True, a temporary table is created (the schema parameter is ignored)
include_comments – If True, also adds COMMENT statements for columns that have them.
delete_schema
(schema: str) → None¶Delete a schema if it exists, including all the tables in it.
delete_table
(schema: str, table: str) → None¶Drop a table from a schema if it exists
- static
dump_table_creation
(schema: Optional[str], table: str, schema_spec: List[splitgraph.core.types.TableColumn], unlogged: bool = False, temporary: bool = False, include_comments: bool = False) → Tuple[psycopg2.sql.Composed, Tuple]¶ Dumps the DDL for a table using a previously-dumped table schema spec
- Parameters
schema – Schema to create the table in
table – Table name to create
schema_spec – TableSchema
unlogged – If True, the table won’t be reflected in the WAL or scanned by the analyzer/autovacuum.
temporary – If True, a temporary table is created (the schema parameter is ignored)
include_comments – If True, also adds COMMENT statements for columns that have them.
- Returns
An SQL statement that reconstructs the table schema + args to be mogrified into it.
dump_table_sql
(schema, table_name, stream, columns='*', where='', where_args=None, target_schema=None, target_table=None)¶Dump the table contents in the SQL format :param schema: Schema the table is located in :param table_name: Name of the table :param stream: A file-like object to write the result into. :param columns: SQL column spec. Default ‘*’. :param where: Optional, an SQL WHERE clause :param where_args: Arguments for the optional WHERE clause. :param target_schema: Schema to create the table in (default same as schema) :param target_table: Name of the table to insert data into (default same as table_name)
get_all_tables
(schema: str) → List[str]¶Get all tables in a given schema.
get_full_table_schema
(schema: str, table_name: str) → List[splitgraph.core.types.TableColumn]¶Generates a list of (column ordinal, name, data type, is_pk, column comment), used to detect schema changes like columns being dropped/added/renamed or type changes.
NB this doesn’t work for temporary tables (pg_temp) and returns an empty schema.
get_primary_keys
(schema, table)¶Get a list of (column_name, column_type) denoting the primary keys of a given table.
get_table_type
(schema: str, table: str) → Optional[str]¶Get the type of the table (BASE or FOREIGN)
initialize
()¶Does any required initialization of the engine
lock_table
(schema, table)¶Acquire an exclusive lock on a given table, released when the transaction commits / rolls back.
rollback
()¶Rollback the engine’s backing connection
run_sql
(statement, arguments=None, return_shape=<ResultShape.MANY_MANY: 4>, named=False)¶Run an arbitrary SQL statement with some arguments, return an iterator of results. If the statement doesn’t return any results, return None. If named=True, return named tuples when possible.
run_sql_batch
(statement, arguments, schema=None)¶Run a parameterized SQL statement against multiple sets of arguments.
- Parameters
statement – Statement to run
arguments – Query arguments
schema – Schema to run the statement in
run_sql_in
(schema: str, sql: Union[psycopg2.sql.Composed, str], arguments: None = None, return_shape: splitgraph.engine.ResultShape = <ResultShape.MANY_MANY: 4>) → Any¶Executes a non-schema-qualified query against a specific schema.
- Parameters
schema – Schema to run the query in
sql – Query
arguments – Query arguments
return_shape – ReturnShape to coerce the result into.
savepoint
(name: str) → Iterator[None]¶At the beginning of this context manager, a savepoint is initialized and any database error that occurs in run_sql results in a rollback to this savepoint rather than the rollback of the whole transaction. At exit, the savepoint is released.
schema_exists
(schema: str) → bool¶Check if a schema exists on the engine.
- Parameters
schema – Schema name
table_exists
(schema: str, table_name: str) → bool¶Check if a table exists on the engine.
- Parameters
schema – Schema name
table_name – Table name
- class
splitgraph.engine.
SavepointStack
¶ Bases:
_thread._local
splitgraph.engine.
get_engine
(name: Optional[str] = None, use_socket: bool = False, use_fdw_params: bool = False, autocommit: bool = False) → PostgresEngine¶Get the current global engine or a named remote engine
- Parameters
name – Name of the remote engine as specified in the config. If None, the current global engine is returned.
use_socket – Use a local UNIX socket instead of PG_HOST, PG_PORT for LOCAL engine connections.
use_fdw_params – Use the _FDW connection parameters (SG_ENGINE_FDW_HOST/PORT). By default, will infer from the global splitgraph.config.IN_FDW flag.
autocommit – If True, the engine will not open SQL transactions implicitly.
splitgraph.engine.
set_engine
(engine: PostgresEngine) → None¶Switch the global engine to a different one.
- Parameters
engine – Engine
splitgraph.engine.
switch_engine
(engine: PostgresEngine) → Iterator[None]¶Switch the global engine to a different one. The engine will get switched back on exit from the context manager.
- Parameters
engine – Engine
splitgraph.engine.
validate_type
(t: str) → str¶