splitgraph.engine.postgres package
Module contents¶
Submodules¶
splitgraph.engine.postgres.engine module¶
Default Splitgraph engine: uses PostgreSQL to store metadata and actual objects and an audit stored procedure to track changes, as well as the Postgres FDW interface to upload/download objects to/from other Postgres engines.
- class
splitgraph.engine.postgres.engine.
AuditTriggerChangeEngine
(name: Optional[str], conn_params: Optional[Dict[str, Optional[str]]] = None, pool: Optional[psycopg2.pool.AbstractConnectionPool] = None, autocommit: bool = False, registry: bool = False, in_fdw: bool = False, check_version: bool = True)¶ Bases:
splitgraph.engine.postgres.engine.PsycopgEngine
,splitgraph.engine.ChangeEngine
Change tracking based on an audit trigger stored procedure
discard_pending_changes
(schema: str, table: Optional[str] = None) → None¶Discard recorded pending changes for a tracked schema / table
get_changed_tables
(schema: str) → List[str]¶Get list of tables that have changed content
get_pending_changes
(schema: str, table: str, aggregate: bool = False) → Union[List[Tuple[int, int]], List[Tuple[Tuple[str, …], bool, Dict[str, Any], Dict[str, Any]]]]¶Return pending changes for a given tracked table
- Parameters
schema – Schema the table belongs to
table – Table to return changes for
aggregate – Whether to aggregate changes or return them completely
- Returns
If aggregate is True: List of tuples of (change_type, number of rows). If aggregate is False: List of (primary_key, change_type, change_data)
get_tracked_tables
() → List[Tuple[str, str]]¶Return a list of tables that the audit trigger is working on.
has_pending_changes
(schema: str) → bool¶Return True if the tracked schema has pending changes and False if it doesn’t.
track_tables
(tables: List[Tuple[str, str]]) → None¶Install the audit trigger on the required tables
untrack_tables
(tables: List[Tuple[str, str]]) → None¶Remove triggers from tables and delete their pending changes
- class
splitgraph.engine.postgres.engine.
PostgresEngine
(name: Optional[str], conn_params: Optional[Dict[str, Optional[str]]] = None, pool: Optional[psycopg2.pool.AbstractConnectionPool] = None, autocommit: bool = False, registry: bool = False, in_fdw: bool = False, check_version: bool = True)¶ Bases:
splitgraph.engine.postgres.engine.AuditTriggerChangeEngine
,splitgraph.engine.ObjectEngine
An implementation of the Postgres engine for Splitgraph
apply_fragments
(objects: List[Tuple[str, str]], target_schema: str, target_table: str, extra_quals: Optional[psycopg2.sql.Composed] = None, extra_qual_args: Optional[Tuple[str]] = None, schema_spec: Optional[TableSchema] = None, progress_every: Optional[int] = None) → None¶Apply multiple fragments to a target table as a single-query batch operation.
- Parameters
objects – List of tuples (object_schema, object_table) that the objects are stored in.
target_schema – Schema to apply the fragment to
target_table – Table to apply the fragment to
extra_quals – Optional, extra SQL (Composable) clauses to filter new rows in the fragment on (e.g. SQL(“a = %s”))
extra_qual_args – Optional, a tuple of arguments to use with extra_quals
schema_spec – Optional, list of (ordinal, column_name, column_type, is_pk). If not specified, uses the schema of target_table.
progress_every – If set, will report the materialization progress via tqdm every progress_every objects.
delete_objects
(object_ids: List[str]) → None¶Delete one or more objects from the engine.
- Parameters
object_ids – IDs of objects to delete
download_objects
(objects: List[str], remote_engine: splitgraph.engine.postgres.engine.PostgresEngine) → List[str]¶Download objects from the remote engine to the local cache
- Parameters
objects – List of object IDs to download
remote_engine – A remote ObjectEngine to download the objects from.
:return List of object IDs that were downloaded.
dump_object
(object_id: str, stream: _io.TextIOWrapper, schema: str) → None¶Dump an object into a series of SQL statements
- Parameters
object_id – Object ID
stream – Text stream to dump the object into
schema – Schema the object lives in
dump_object_creation
(object_id: str, schema: str, table: Optional[str] = None, schema_spec: Optional[TableSchema] = None, if_not_exists: bool = False) → bytes¶Generate the SQL that remounts a foreign table pointing to a Splitgraph object.
- Parameters
object_id – Name of the object
schema – Schema to create the table in
table – Name of the table to mount
schema_spec – Schema of the table
if_not_exists – Add IF NOT EXISTS to the DDL
- Returns
SQL in bytes format.
get_change_key
(schema: str, table: str) → List[Tuple[str, str]]¶Returns the key used to identify a row in a change (list of column name, column type). If the tracked table has a PK, we use that; if it doesn’t, the whole row is used.
get_object_schema
(object_id: str) → List[splitgraph.core.types.TableColumn]¶Get the schema of a given object, returned as a list of (ordinal, column_name, column_type, is_pk).
- Parameters
object_id – ID of the object
get_object_size
(object_id: str) → int¶Return the on-disk footprint of this object, in bytes :param object_id: ID of the object
mount_object
(object_id: str, table: None = None, schema: str = 'splitgraph_meta', schema_spec: Optional[TableSchema] = None) → None¶Mount an object from local storage as a foreign table.
- Parameters
object_id – ID of the object
table – Table to mount the object into
schema – Schema to mount the object into
schema_spec – Schema of the object.
store_fragment
(inserted: Any, deleted: Any, schema: str, table: str, source_schema: str, source_table: str) → None¶Store a fragment of a changed table in another table
- Parameters
inserted – List of PKs that have been updated/inserted
deleted – List of PKs that have been deleted
schema – Schema to store the change in
table – Table to store the change in
source_schema – Schema the source table is located in
source_table – Name of the source table
store_object
(object_id: str, source_query: Union[bytes, psycopg2.sql.Composed, str, psycopg2.sql.SQL], schema_spec: List[splitgraph.core.types.TableColumn], source_query_args=None, overwrite=False) → None¶Stores a Splitgraph object using a source query in the actual format implemented by this engine.
- Parameters
object_id – Name of the object
source_query – SELECT query that produces data required by the object
schema_spec – Schema of the source table
source_query_args – Arguments to mogrify into the source query.
overwrite – If True, will overwrite the object if it already exists.
sync_object_mounts
() → None¶Scan through local object storage and synchronize it with the foreign tables in splitgraph_meta (unmounting non-existing objects and mounting existing ones).
unmount_objects
(object_ids: List[str]) → None¶Unmount objects from splitgraph_meta (this doesn’t delete the physical files.
upload_objects
(objects: List[str], remote_engine: splitgraph.engine.postgres.engine.PostgresEngine) → None¶Upload objects from the local cache to the remote engine
- Parameters
objects – List of object IDs to upload
remote_engine – A remote ObjectEngine to upload the objects to.
- class
splitgraph.engine.postgres.engine.
PsycopgEngine
(name: Optional[str], conn_params: Optional[Dict[str, Optional[str]]] = None, pool: Optional[psycopg2.pool.AbstractConnectionPool] = None, autocommit: bool = False, registry: bool = False, in_fdw: bool = False, check_version: bool = True)¶ Bases:
splitgraph.engine.SQLEngine
Postgres SQL engine backed by a Psycopg connection.
close
() → None¶Commit and close the engine’s backing connection
close_others
() → None¶Close and release all other connections to the connection pool.
commit
() → None¶Commit the engine’s backing connection
- property
connection
¶ Engine-internal Psycopg connection.
delete_database
(database: str) → None¶Helper function to drop a database using the admin connection
- Parameters
database – Database name to drop
dump_table_sql
(schema: str, table_name: str, stream: _io.TextIOWrapper, columns: str = '*', where: str = '', where_args: Optional[Union[List[str], Tuple[str, str]]] = None, target_schema: Optional[str] = None, target_table: Optional[str] = None) → None¶Dump the table contents in the SQL format :param schema: Schema the table is located in :param table_name: Name of the table :param stream: A file-like object to write the result into. :param columns: SQL column spec. Default ‘*’. :param where: Optional, an SQL WHERE clause :param where_args: Arguments for the optional WHERE clause. :param target_schema: Schema to create the table in (default same as schema) :param target_table: Name of the table to insert data into (default same as table_name)
get_primary_keys
(schema: str, table: str) → List[Tuple[str, str]]¶Inspects the Postgres information_schema to get the primary keys for a given table.
initialize
(skip_object_handling: bool = False, skip_create_database: bool = False) → None¶Create the Splitgraph Postgres database and install the audit trigger
- Parameters
skip_object_handling – If True, skips installation of audit triggers and other object management routines for engines that don’t need change tracking or checkouts.
skip_create_database – Don’t create the Splitgraph database
lock_table
(schema: str, table: str) → None¶Acquire an exclusive lock on a given table, released when the transaction commits / rolls back.
rollback
() → None¶Rollback the engine’s backing connection
run_api_call
(call: str, *args, schema: str = 'splitgraph_api') → Any¶
run_api_call_batch
(call: str, argslist, schema: str = 'splitgraph_api')¶
run_chunked_sql
(statement: Union[bytes, psycopg2.sql.Composed, str, psycopg2.sql.SQL], arguments: Sequence[Any], return_shape: Optional[splitgraph.engine.ResultShape] = <ResultShape.MANY_MANY: 4>, chunk_size: int = 1000, chunk_position: int = -1) → Any¶Because the Splitgraph API has a request size limitation, certain SQL calls with variadic arguments are going to be too long to fit that. This function runs an SQL query against a set of broken up arguments and returns the combined result.
run_sql
(statement: Union[bytes, psycopg2.sql.Composed, str, psycopg2.sql.SQL], arguments: Optional[Sequence[Any]] = None, return_shape: Optional[splitgraph.engine.ResultShape] = <ResultShape.MANY_MANY: 4>, named: bool = False) → Any¶Run an arbitrary SQL statement with some arguments, return an iterator of results. If the statement doesn’t return any results, return None. If named=True, return named tuples when possible.
run_sql_batch
(statement: Union[psycopg2.sql.Composed, str], arguments: Any, schema: Optional[str] = None, max_size=261000) → None¶Run a parameterized SQL statement against multiple sets of arguments.
- Parameters
statement – Statement to run
arguments – Query arguments
schema – Schema to run the statement in
- property
splitgraph_version
¶ Returns the version of the Splitgraph library installed on the engine and by association the version of the engine itself.
splitgraph.engine.postgres.engine.
add_ud_flag_column
(table_schema: List[splitgraph.core.types.TableColumn]) → List[splitgraph.core.types.TableColumn]¶
splitgraph.engine.postgres.engine.
chunk
(sequence: Sequence[T], chunk_size: int = 1000) → Iterator[List[T]]¶
splitgraph.engine.postgres.engine.
get_change_key
(schema_spec: List[splitgraph.core.types.TableColumn]) → List[Tuple[str, str]]¶
splitgraph.engine.postgres.engine.
get_conn_str
(conn_params: Dict[str, str]) → str¶