Splitgraph has been acquired by EDB! Read the blog post.

splitgraph.ingestion.airbyte package

Submodules

splitgraph.ingestion.airbyte.data_source module

class splitgraph.ingestion.airbyte.data_source.AirbyteDataSource(engine: PostgresEngine, credentials: Credentials, params: Params, tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None)

Bases: splitgraph.hooks.data_source.base.SyncableDataSource, abc.ABC

Generic data source for Airbyte-compliant sources. We run ingestion by combining an Airbyte source and the Airbyte Postgres destination.

airbyte_name: Optional[str] = None
credentials_schema: Dict[str, Any] = {'properties': {'normalization_git_url': {'description': 'For `custom` normalization, a URL to the Git repo with the dbt project, for example,`https://uname:pass_or_token@github.com/organisation/repository.git`.', 'title': 'dbt model Git URL', 'type': 'string'}}, 'type': 'object'}
cursor_overrides: Optional[Dict[str, List[str]]] = None
docker_environment: Optional[Dict[str, str]] = None
docker_image: Optional[str] = None
get_airbyte_config() Dict[str, Any]
introspect() IntrospectionResult
load(repository: splitgraph.core.repository.Repository, tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None) str
normalization_image = 'airbyte/normalization:0.1.59'
params_schema: Dict[str, Any] = {'properties': {'normalization_git_branch': {'default': 'master', 'description': 'Branch or commit hash to use for the normalization dbt project.', 'title': 'dbt model Git branch', 'type': 'string'}, 'normalization_mode': {'default': 'basic', 'description': "Whether to normalize raw Airbyte tables. `none` is no normalization, `basic` is Airbyte's basic normalization, `custom` is a custom dbt transformation on the data.", 'enum': ['none', 'basic', 'custom'], 'title': 'Post-ingestion normalization', 'type': 'string'}}, 'type': 'object'}
primary_key_overrides: Optional[Dict[str, List[str]]] = None
receiver_image = 'airbyte/destination-postgres:0.3.12'
sync(repository: splitgraph.core.repository.Repository, image_hash: Optional[str] = None, tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]] = None, use_state: bool = True) str
table_params_schema: Dict[str, Any] = {'properties': {'airbyte_cursor_fields': {'description': "Fields in this stream to be used as a cursor for incremental replication (overrides Airbyte configuration's cursor_field)", 'items': {'type': 'string'}, 'title': 'Cursor field(s)', 'type': 'array'}, 'airbyte_primary_key_fields': {'description': "Fields in this stream to be used as a primary key for deduplication (overrides Airbyte configuration's primary_key)", 'items': {'type': 'string'}, 'title': 'Primary key field(s)', 'type': 'array'}}, 'type': 'object'}
splitgraph.ingestion.airbyte.data_source.delete_schema_at_end(engine: splitgraph.engine.postgres.engine.PostgresEngine, schema: str) Generator
splitgraph.ingestion.airbyte.data_source.getrandbits(k) x.  Generates an int with k random bits.

splitgraph.ingestion.airbyte.docker_utils module

exception splitgraph.ingestion.airbyte.docker_utils.SubprocessError

Bases: splitgraph.exceptions.SplitGraphError

splitgraph.ingestion.airbyte.docker_utils.add_files(container: docker.models.containers.Container, files: List[Tuple[str, str]]) None
splitgraph.ingestion.airbyte.docker_utils.build_command(files: List[Tuple[str, Any]]) List[str]
splitgraph.ingestion.airbyte.docker_utils.detect_network_mode() str
splitgraph.ingestion.airbyte.docker_utils.remove_at_end(container: docker.models.containers.Container) docker.models.containers.Container
splitgraph.ingestion.airbyte.docker_utils.wait_not_failed(container: docker.models.containers.Container, mirror_logs: bool = False) None

Block until a Docker container exits.

:raises SubprocessError if the container exited with a non-zero code.

splitgraph.ingestion.airbyte.models module

class splitgraph.ingestion.airbyte.models.AirbyteCatalog(*, streams: List[splitgraph.ingestion.airbyte.models.AirbyteStream], **extra_data: Any)

Bases: pydantic.main.BaseModel

class Config

Bases: object

extra = 'allow'
streams: List[splitgraph.ingestion.airbyte.models.AirbyteStream]
class splitgraph.ingestion.airbyte.models.AirbyteConnectionStatus(*, status: splitgraph.ingestion.airbyte.models.Status, message: str = None, **extra_data: Any)

Bases: pydantic.main.BaseModel

class Config

Bases: object

extra = 'allow'
message: Optional[str]
status: splitgraph.ingestion.airbyte.models.Status
class splitgraph.ingestion.airbyte.models.AirbyteLogMessage(*, level: splitgraph.ingestion.airbyte.models.Level, message: str, **extra_data: Any)

Bases: pydantic.main.BaseModel

class Config

Bases: object

extra = 'allow'
level: splitgraph.ingestion.airbyte.models.Level
message: str
class splitgraph.ingestion.airbyte.models.AirbyteMessage(*, type: splitgraph.ingestion.airbyte.models.Type, log: splitgraph.ingestion.airbyte.models.AirbyteLogMessage = None, spec: splitgraph.ingestion.airbyte.models.ConnectorSpecification = None, connectionStatus: splitgraph.ingestion.airbyte.models.AirbyteConnectionStatus = None, catalog: splitgraph.ingestion.airbyte.models.AirbyteCatalog = None, record: splitgraph.ingestion.airbyte.models.AirbyteRecordMessage = None, state: splitgraph.ingestion.airbyte.models.AirbyteStateMessage = None, **extra_data: Any)

Bases: pydantic.main.BaseModel

class Config

Bases: object

extra = 'allow'
catalog: Optional[splitgraph.ingestion.airbyte.models.AirbyteCatalog]
connectionStatus: Optional[splitgraph.ingestion.airbyte.models.AirbyteConnectionStatus]
log: Optional[splitgraph.ingestion.airbyte.models.AirbyteLogMessage]
record: Optional[splitgraph.ingestion.airbyte.models.AirbyteRecordMessage]
spec: Optional[splitgraph.ingestion.airbyte.models.ConnectorSpecification]
state: Optional[splitgraph.ingestion.airbyte.models.AirbyteStateMessage]
type: splitgraph.ingestion.airbyte.models.Type
class splitgraph.ingestion.airbyte.models.AirbyteProtocol(*, airbyte_message: splitgraph.ingestion.airbyte.models.AirbyteMessage = None, configured_airbyte_catalog: splitgraph.ingestion.airbyte.models.ConfiguredAirbyteCatalog = None)

Bases: pydantic.main.BaseModel

airbyte_message: Optional[splitgraph.ingestion.airbyte.models.AirbyteMessage]
configured_airbyte_catalog: Optional[splitgraph.ingestion.airbyte.models.ConfiguredAirbyteCatalog]
class splitgraph.ingestion.airbyte.models.AirbyteRecordMessage(*, stream: str, data: Dict[str, Any], emitted_at: int, namespace: str = None, **extra_data: Any)

Bases: pydantic.main.BaseModel

class Config

Bases: object

extra = 'allow'
data: Dict[str, Any]
emitted_at: int
namespace: Optional[str]
stream: str
class splitgraph.ingestion.airbyte.models.AirbyteStateMessage(*, data: Dict[str, Any], **extra_data: Any)

Bases: pydantic.main.BaseModel

class Config

Bases: object

extra = 'allow'
data: Dict[str, Any]
class splitgraph.ingestion.airbyte.models.AirbyteStream(*, name: str, json_schema: Dict[str, Any], supported_sync_modes: List[splitgraph.ingestion.airbyte.models.SyncMode] = None, source_defined_cursor: bool = None, default_cursor_field: List[str] = None, source_defined_primary_key: List[List[str]] = None, namespace: str = None, **extra_data: Any)

Bases: pydantic.main.BaseModel

class Config

Bases: object

extra = 'allow'
default_cursor_field: Optional[List[str]]
json_schema: Dict[str, Any]
name: str
namespace: Optional[str]
source_defined_cursor: Optional[bool]
source_defined_primary_key: Optional[List[List[str]]]
supported_sync_modes: Optional[List[splitgraph.ingestion.airbyte.models.SyncMode]]
class splitgraph.ingestion.airbyte.models.ConfiguredAirbyteCatalog(*, streams: List[splitgraph.ingestion.airbyte.models.ConfiguredAirbyteStream], **extra_data: Any)

Bases: pydantic.main.BaseModel

class Config

Bases: object

extra = 'allow'
streams: List[splitgraph.ingestion.airbyte.models.ConfiguredAirbyteStream]
class splitgraph.ingestion.airbyte.models.ConfiguredAirbyteStream(*, stream: splitgraph.ingestion.airbyte.models.AirbyteStream, sync_mode: splitgraph.ingestion.airbyte.models.SyncMode, cursor_field: List[str] = None, destination_sync_mode: splitgraph.ingestion.airbyte.models.DestinationSyncMode, primary_key: List[List[str]] = None, **extra_data: Any)

Bases: pydantic.main.BaseModel

class Config

Bases: object

extra = 'allow'
cursor_field: Optional[List[str]]
destination_sync_mode: splitgraph.ingestion.airbyte.models.DestinationSyncMode
primary_key: Optional[List[List[str]]]
stream: splitgraph.ingestion.airbyte.models.AirbyteStream
sync_mode: splitgraph.ingestion.airbyte.models.SyncMode
class splitgraph.ingestion.airbyte.models.ConnectorSpecification(*, documentationUrl: pydantic.networks.AnyUrl = None, changelogUrl: pydantic.networks.AnyUrl = None, connectionSpecification: Dict[str, Any], supportsIncremental: bool = None, supportsNormalization: bool = False, supportsDBT: bool = False, supported_destination_sync_modes: List[splitgraph.ingestion.airbyte.models.DestinationSyncMode] = None, **extra_data: Any)

Bases: pydantic.main.BaseModel

class Config

Bases: object

extra = 'allow'
changelogUrl: Optional[pydantic.networks.AnyUrl]
connectionSpecification: Dict[str, Any]
documentationUrl: Optional[pydantic.networks.AnyUrl]
supported_destination_sync_modes: Optional[List[splitgraph.ingestion.airbyte.models.DestinationSyncMode]]
supportsDBT: Optional[bool]
supportsIncremental: Optional[bool]
supportsNormalization: Optional[bool]
class splitgraph.ingestion.airbyte.models.DestinationSyncMode(value)

Bases: enum.Enum

An enumeration.

append = 'append'
append_dedup = 'append_dedup'
overwrite = 'overwrite'
class splitgraph.ingestion.airbyte.models.Level(value)

Bases: enum.Enum

An enumeration.

DEBUG = 'DEBUG'
ERROR = 'ERROR'
FATAL = 'FATAL'
INFO = 'INFO'
TRACE = 'TRACE'
WARN = 'WARN'
class splitgraph.ingestion.airbyte.models.Status(value)

Bases: enum.Enum

An enumeration.

FAILED = 'FAILED'
SUCCEEDED = 'SUCCEEDED'
class splitgraph.ingestion.airbyte.models.SyncMode(value)

Bases: enum.Enum

An enumeration.

full_refresh = 'full_refresh'
incremental = 'incremental'
class splitgraph.ingestion.airbyte.models.Type(value)

Bases: enum.Enum

An enumeration.

CATALOG = 'CATALOG'
CONNECTION_STATUS = 'CONNECTION_STATUS'
LOG = 'LOG'
RECORD = 'RECORD'
SPEC = 'SPEC'
STATE = 'STATE'

splitgraph.ingestion.airbyte.utils module

splitgraph.ingestion.airbyte.utils.get_pk_cursor_fields(stream: splitgraph.ingestion.airbyte.models.AirbyteStream, table_params: TableParams, cursor_overrides: Optional[Dict[str, List[str]]] = None, primary_key_overrides: Optional[Dict[str, List[str]]] = None) Tuple[Optional[List[str]], Optional[List[List[str]]]]
splitgraph.ingestion.airbyte.utils.get_sg_schema(stream: splitgraph.ingestion.airbyte.models.AirbyteStream) List[splitgraph.core.types.TableColumn]
splitgraph.ingestion.airbyte.utils.select_streams(catalog: splitgraph.ingestion.airbyte.models.AirbyteCatalog, tables: Optional[Union[List[str], Dict[str, Tuple[List[splitgraph.core.types.TableColumn], TableParams]]]], sync: bool = False, cursor_overrides: Optional[Dict[str, List[str]]] = None, primary_key_overrides: Optional[Dict[str, List[str]]] = None) splitgraph.ingestion.airbyte.models.ConfiguredAirbyteCatalog

Module contents