splitgraph.engine package
Subpackages
Module contents
Defines the interface for a Splitgraph engine (a backing database), including running basic SQL commands, tracking tables for changes and uploading/downloading tables to other remote engines.
By default, Splitgraph is backed by Postgres: see splitgraph.engine.postgres
for an example of how to implement a different engine.
- class splitgraph.engine.ChangeEngine
Bases:
splitgraph.engine.SQLEngine
,abc.ABC
An SQL engine that can perform change tracking on a set of tables.
- discard_pending_changes(schema, table=None)
Discard recorded pending changes for a tracked table or the whole schema
- get_change_key(schema: str, table: str) List[Tuple[str, str]]
Returns the key used to identify a row in a change (list of column name, column type). If the tracked table has a PK, we use that; if it doesn’t, the whole row is used.
- get_changed_tables(schema)
List tracked tables that have pending changes
- Parameters
schema – Schema to check for changes
- Returns
List of tables with changed contents
- get_pending_changes(schema, table, aggregate=False)
Return pending changes for a given tracked table
- Parameters
schema – Schema the table belongs to
table – Table to return changes for
aggregate – Whether to aggregate changes or return them completely
- Returns
If aggregate is True: tuple with numbers of (added_rows, removed_rows, updated_rows). If aggregate is False: A changeset. The changeset is a list of(pk, action (0 for Insert, 1 for Delete, 2 for Update), action_data)where action_data is None for Delete and {‘c’: [column_names], ‘v’: [column_values]} that have been inserted/updated otherwise.
- get_tracked_tables()
- Returns
A list of (table_schema, table_name) that the engine currently tracks for changes
- has_pending_changes(schema)
Return True if the tracked schema has pending changes and False if it doesn’t.
- track_tables(tables)
Start engine-specific change tracking on a list of tables.
- Parameters
tables – List of (table_schema, table_name) to start tracking
- untrack_tables(tables)
Stop engine-specific change tracking on a list of tables and delete any pending changes.
- Parameters
tables – List of (table_schema, table_name) to start tracking
- class splitgraph.engine.ObjectEngine
Bases:
object
Routines for storing/applying objects as well as sharing them with other engines.
- apply_fragments(objects, target_schema, target_table, extra_quals=None, extra_qual_args=None, schema_spec=None, progress_every: Optional[int] = None)
Apply multiple fragments to a target table as a single-query batch operation.
- Parameters
objects – List of tuples (object_schema, object_table) that the objects are stored in.
target_schema – Schema to apply the fragment to
target_table – Table to apply the fragment to
extra_quals – Optional, extra SQL (Composable) clauses to filter new rows in the fragment on (e.g. SQL(“a = %s”))
extra_qual_args – Optional, a tuple of arguments to use with extra_quals
schema_spec – Optional, list of (ordinal, column_name, column_type, is_pk). If not specified, uses the schema of target_table.
progress_every – If set, will report the materialization progress via tqdm every progress_every objects.
- delete_objects(object_ids)
Delete one or more objects from the engine.
- Parameters
object_ids – IDs of objects to delete
- download_objects(objects, remote_engine)
Download objects from the remote engine to the local cache
- Parameters
objects – List of object IDs to download
remote_engine – A remote ObjectEngine to download the objects from.
:return List of object IDs that were downloaded.
- dump_object(object_id, stream, schema)
Dump an object into a series of SQL statements
- Parameters
object_id – Object ID
stream – Text stream to dump the object into
schema – Schema the object lives in
- get_object_schema(object_id)
Get the schema of a given object, returned as a list of (ordinal, column_name, column_type, is_pk).
- Parameters
object_id – ID of the object
- get_object_size(object_id)
Return the on-disk footprint of this object, in bytes :param object_id: ID of the object
- store_fragment(inserted, deleted, schema, table, source_schema, source_table, source_schema_spec)
Store a fragment of a changed table in another table
- Parameters
inserted – List of PKs that have been updated/inserted
deleted – List of PKs that have been deleted
schema – Schema to store the change in
table – Table to store the change in
source_schema – Schema the source table is located in
source_table – Name of the source table
source_schema_spec – Schema of the source table (optional)
- store_object(object_id: str, source_query: Union[bytes, psycopg2.sql.Composed, str, psycopg2.sql.SQL], schema_spec: List[splitgraph.core.types.TableColumn], source_query_args: Optional[Sequence[Any]], overwrite: bool)
Stores a Splitgraph object using a source query in the actual format implemented by this engine.
- Parameters
object_id – Name of the object
source_query – SELECT query that produces data required by the object
schema_spec – Schema of the source table
source_query_args – Arguments to mogrify into the source query.
overwrite – If True, will overwrite the object if it already exists.
- upload_objects(objects, remote_engine)
Upload objects from the local cache to the remote engine
- Parameters
objects – List of object IDs to upload
remote_engine – A remote ObjectEngine to upload the objects to.
- class splitgraph.engine.ResultShape(value)
Bases:
enum.Enum
Shape that the result of a query will be coerced to
- MANY_MANY = 4
- MANY_ONE = 3
- NONE = 0
- ONE_MANY = 2
- ONE_ONE = 1
- class splitgraph.engine.SQLEngine
Bases:
abc.ABC
Abstraction for a Splitgraph SQL backend. Requires any overriding classes to implement run_sql as well as a few other functions. Together with the information_schema (part of the SQL standard), this class uses those functions to implement some basic database management methods like listing, deleting, creating, dumping and loading tables.
- close()
Commit and close the engine’s backing connection
- commit()
Commit the engine’s backing connection
- copy_table(source_schema: str, source_table: str, target_schema: str, target_table: str, with_pk_constraints: bool = True) None
Copy a table in the same engine, optionally applying primary key constraints as well.
- create_schema(schema: str) None
Create a schema if it doesn’t exist
- create_table(schema: Optional[str], table: str, schema_spec: List[splitgraph.core.types.TableColumn], unlogged: bool = False, temporary: bool = False, include_comments: bool = False) None
Creates a table using a previously-dumped table schema spec
- Parameters
schema – Schema to create the table in
table – Table name to create
schema_spec – TableSchema
unlogged – If True, the table won’t be reflected in the WAL or scanned by the analyzer/autovacuum.
temporary – If True, a temporary table is created (the schema parameter is ignored)
include_comments – If True, also adds COMMENT statements for columns that have them.
- delete_schema(schema: str) None
Delete a schema if it exists, including all the tables in it.
- delete_table(schema: str, table: str) None
Drop a table from a schema if it exists
- static dump_table_creation(schema: Optional[str], table: str, schema_spec: List[splitgraph.core.types.TableColumn], unlogged: bool = False, temporary: bool = False, include_comments: bool = False) Tuple[psycopg2.sql.Composed, Tuple]
Dumps the DDL for a table using a previously-dumped table schema spec
- Parameters
schema – Schema to create the table in
table – Table name to create
schema_spec – TableSchema
unlogged – If True, the table won’t be reflected in the WAL or scanned by the analyzer/autovacuum.
temporary – If True, a temporary table is created (the schema parameter is ignored)
include_comments – If True, also adds COMMENT statements for columns that have them.
- Returns
An SQL statement that reconstructs the table schema + args to be mogrified into it.
- dump_table_sql(schema, table_name, stream, columns='*', where='', where_args=None, target_schema=None, target_table=None)
Dump the table contents in the SQL format :param schema: Schema the table is located in :param table_name: Name of the table :param stream: A file-like object to write the result into. :param columns: SQL column spec. Default ‘*’. :param where: Optional, an SQL WHERE clause :param where_args: Arguments for the optional WHERE clause. :param target_schema: Schema to create the table in (default same as schema) :param target_table: Name of the table to insert data into (default same as table_name)
- get_all_tables(schema: str) List[str]
Get all tables in a given schema.
- get_full_table_schema(schema: str, table_name: str) List[splitgraph.core.types.TableColumn]
Generates a list of (column ordinal, name, data type, is_pk, column comment), used to detect schema changes like columns being dropped/added/renamed or type changes.
NB this doesn’t work for temporary tables (pg_temp) and returns an empty schema.
- get_primary_keys(schema, table)
Get a list of (column_name, column_type) denoting the primary keys of a given table.
- get_table_type(schema: str, table: str) Optional[str]
Get the type of the table (BASE or FOREIGN)
- initialize()
Does any required initialization of the engine
- lock_table(schema, table)
Acquire an exclusive lock on a given table, released when the transaction commits / rolls back.
- rollback()
Rollback the engine’s backing connection
- run_sql(statement, arguments=None, return_shape=ResultShape.MANY_MANY, named=False)
Run an arbitrary SQL statement with some arguments, return an iterator of results. If the statement doesn’t return any results, return None. If named=True, return named tuples when possible.
- run_sql_batch(statement, arguments, schema=None)
Run a parameterized SQL statement against multiple sets of arguments.
- Parameters
statement – Statement to run
arguments – Query arguments
schema – Schema to run the statement in
- run_sql_in(schema: str, sql: Union[psycopg2.sql.Composed, str], arguments: None = None, return_shape: splitgraph.engine.ResultShape = ResultShape.MANY_MANY) Any
Executes a non-schema-qualified query against a specific schema.
- Parameters
schema – Schema to run the query in
sql – Query
arguments – Query arguments
return_shape – ReturnShape to coerce the result into.
- savepoint(name: str) Iterator[None]
At the beginning of this context manager, a savepoint is initialized and any database error that occurs in run_sql results in a rollback to this savepoint rather than the rollback of the whole transaction. At exit, the savepoint is released.
- schema_exists(schema: str) bool
Check if a schema exists on the engine.
- Parameters
schema – Schema name
- table_exists(schema: str, table_name: str) bool
Check if a table exists on the engine.
- Parameters
schema – Schema name
table_name – Table name
- class splitgraph.engine.SavepointStack
Bases:
_thread._local
- splitgraph.engine.get_engine(name: Optional[str] = None, use_socket: bool = False, use_fdw_params: bool = False, autocommit: bool = False) PostgresEngine
Get the current global engine or a named remote engine
- Parameters
name – Name of the remote engine as specified in the config. If None, the current global engine is returned.
use_socket – Use a local UNIX socket instead of PG_HOST, PG_PORT for LOCAL engine connections.
use_fdw_params – Use the _FDW connection parameters (SG_ENGINE_FDW_HOST/PORT). By default, will infer from the global splitgraph.config.IN_FDW flag.
autocommit – If True, the engine will not open SQL transactions implicitly.
- splitgraph.engine.set_engine(engine: PostgresEngine) None
Switch the global engine to a different one.
- Parameters
engine – Engine
- splitgraph.engine.switch_engine(engine: PostgresEngine) Iterator[None]
Switch the global engine to a different one. The engine will get switched back on exit from the context manager.
- Parameters
engine – Engine
- splitgraph.engine.validate_type(t: str) str