Splitgraph has been acquired by EDB! Read the blog post.

splitgraph.ingestion.singer package

Subpackages

Submodules

splitgraph.ingestion.singer.common module

splitgraph.ingestion.singer.common.log_exception(f)

Emit exceptions with full traceback instead of just the error text

splitgraph.ingestion.singer.common.rollback_at_end(func: collections.abc.Callable) collections.abc.Callable
splitgraph.ingestion.singer.common.store_ingestion_state(repository: splitgraph.core.repository.Repository, image_hash: str, current_state: Optional[Dict[str, Any]], new_state: str)

splitgraph.ingestion.singer.data_source module

class splitgraph.ingestion.singer.data_source.GenericSingerDataSource(*args, **kwargs)

Bases: splitgraph.ingestion.singer.data_source.SingerDataSource

credentials_schema: Dict[str, Any] = {'type': 'object'}
classmethod get_description() str
classmethod get_name() str
get_singer_executable()
params_schema: Dict[str, Any] = {'properties': {'tap_path': {'type': 'string'}}, 'required': ['tap_path'], 'type': 'object'}
class splitgraph.ingestion.singer.data_source.MySQLSingerDataSource(engine: PostgresEngine, credentials: Credentials, params: Params, tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None)

Bases: splitgraph.ingestion.singer.data_source.SingerDataSource

build_singer_catalog(catalog: Dict[str, Any], tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None)
credentials_schema: Dict[str, Any] = {'properties': {'password': {'type': 'string'}, 'user': {'type': 'string'}}, 'required': ['user', 'password'], 'type': 'object'}
classmethod get_description() str
classmethod get_name() str
get_singer_executable()
params_schema: Dict[str, Any] = {'properties': {'host': {'type': 'string'}, 'port': {'type': 'integer'}, 'replication_method': {'enum': ['INCREMENTAL', 'LOG_BASED', 'FULL TABLE'], 'type': 'string'}}, 'required': ['host', 'port', 'replication_method'], 'type': 'object'}
use_legacy_stream_selection = False
use_properties = True
class splitgraph.ingestion.singer.data_source.SingerDataSource(engine: PostgresEngine, credentials: Credentials, params: Params, tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None)

Bases: splitgraph.hooks.data_source.base.SyncableDataSource, abc.ABC

build_singer_catalog(catalog: Dict[str, Any], tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None) Dict[str, Any]
get_singer_config()
abstract get_singer_executable()
introspect() IntrospectionResult
load(repository: splitgraph.core.repository.Repository, tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None) str
sync(repository: splitgraph.core.repository.Repository, image_hash: Optional[str] = None, tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None, use_state: bool = True) str
use_legacy_stream_selection = False
use_properties = False
splitgraph.ingestion.singer.data_source.select_streams(catalog: Dict[str, Any], tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None, use_legacy_stream_selection=False) Dict[str, Any]

splitgraph.ingestion.singer.db_sync module

class splitgraph.ingestion.singer.db_sync.DbSyncProxy(*args, **kwargs)

Bases: target_postgres.db_sync.DbSync

create_indices(stream)
create_schema_if_not_exists(table_columns_cache=None)
delete_rows(stream)
load_csv(file, count, size_bytes)
sync_table()
splitgraph.ingestion.singer.db_sync.db_sync_wrapper(image: splitgraph.core.image.Image, staging_schema: str)
splitgraph.ingestion.singer.db_sync.get_key_properties(stream_message)

Extract the PK from a stream message. Supports both legacy (“key_properties”) and new (“metadata”) Singer taps.

splitgraph.ingestion.singer.db_sync.get_sg_schema(stream_schema_message, flattening_max_level=0)
splitgraph.ingestion.singer.db_sync.get_table_name(stream_schema_message)
splitgraph.ingestion.singer.db_sync.run_patched_sync(repository: splitgraph.core.repository.Repository, base_image: Optional[splitgraph.core.image.Image], new_image_hash: str, delete_old: bool, failure: str, input_stream: Optional[BinaryIO] = None, output_stream: Optional[TextIO] = None)
splitgraph.ingestion.singer.db_sync.select_breadcrumb(stream_message, breadcrumb)

Module contents