dlt.pipeline.pipeline
Pipeline Objects
class Pipeline(SupportsPipeline)
pipelines_dir
A directory where the pipelines' working directories are created
is_active
Tells if instance is currently active and available via dlt.pipeline()
__init__
def __init__(pipeline_name: str,
pipelines_dir: str,
pipeline_salt: TSecretStrValue,
destination: AnyDestination,
staging: AnyDestination,
dataset_name: str,
import_schema_path: str,
export_schema_path: str,
dev_mode: bool,
progress: _Collector,
must_attach_to_local_pipeline: bool,
config: PipelineConfiguration,
refresh: Optional[TRefreshMode] = None) -> None
Initializes the Pipeline class which implements dlt pipeline. Please use pipeline function in dlt module to create a new Pipeline instance.
drop
def drop(pipeline_name: str = None) -> "Pipeline"
Deletes local pipeline state, schemas and any working files. Re-initializes
all internal fields via init. If pipeline_name is specified that is
different from the current name, new pipeline instance is created, activated and returned.
Note that original pipeline is still dropped.
Arguments:
pipeline_namestr - Optional. New pipeline name. Creates and activates new instance
Returns:
"Pipeline"- returns self
extract
@with_runtime_trace()
@with_schemas_sync
@with_state_sync(may_extract_state=True)
@with_config_section((known_sections.EXTRACT, ))
def extract(
data: Any,
*,
table_name: str = None,
parent_table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
columns: TAnySchemaColumns = None,
primary_key: TColumnNames = None,
schema: Schema = None,
max_parallel_items: int = ConfigValue,
workers: int = ConfigValue,
table_format: TTableFormat = None,
schema_contract: TSchemaContract = None,
refresh: Optional[TRefreshMode] = None,
loader_file_format: Optional[TLoaderFileFormat] = None) -> ExtractInfo
Extracts the data and prepare it for the normalization. Does not require destination or credentials to be configured. See run method for the arguments' description.
normalize
@with_runtime_trace()
@with_schemas_sync
@with_config_section((known_sections.NORMALIZE, ))
def normalize(workers: int = 1) -> NormalizeInfo
Normalizes the data prepared with extract method, infers the schema and creates load packages for the load method. Requires destination to be known.
load
@with_runtime_trace(send_state=True)
@with_state_sync()
@with_config_section((known_sections.LOAD, ))
def load(destination: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
*,
workers: int = 20,
raise_on_failed_jobs: bool = ConfigValue) -> LoadInfo
Loads the packages prepared by normalize method into the dataset_name at destination, optionally using provided credentials
run
@with_runtime_trace()
@with_config_section(("run", ))
def run(data: Any = None,
*,
destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None,
credentials: Any = None,
table_name: str = None,
write_disposition: TWriteDispositionConfig = None,
columns: TAnySchemaColumns = None,
primary_key: TColumnNames = None,
schema: Schema = None,
loader_file_format: TLoaderFileFormat = None,
table_format: TTableFormat = None,
schema_contract: TSchemaContract = None,
refresh: TRefreshMode = None) -> LoadInfo
Loads the data from data argument into the destination specified in destination and dataset specified in dataset_name.
Notes:
This method will extract the data from the data argument, infer the schema, normalize the data into a load package (ie. jsonl or PARQUET files representing tables) and then load such packages into the destination.
The data may be supplied in several forms:
- a
listorIterableof any JSON-serializable objects ie.dlt.run([1, 2, 3], table_name="numbers") - any
Iteratoror a function that yield (Generator) ie.dlt.run(range(1, 10), table_name="range") - a function or a list of functions decorated with @dlt.resource ie.
dlt.run([chess_players(title="GM"), chess_games()]) - a function or a list of functions decorated with @dlt.source.
Please note that dlt deals with bytes, datetime, decimal and uuid objects so you are free to load documents containing ie. binary data or dates.
Execution:
The run method will first use sync_destination method to synchronize pipeline state and schemas with the destination. You can disable this behavior with restore_from_destination configuration option.
Next it will make sure that data from the previous is fully processed. If not, run method normalizes, loads pending data items and exits
If there was no pending data, new data from data argument is extracted, normalized and loaded.
Arguments:
-
dataAny - Data to be loaded to destination -
destinationTDestinationReferenceArg, optional - A name of the destination to which dlt will load the data, or a destination module imported fromdlt.destination. If not provided, the value passed todlt.pipelinewill be used. -
stagingTDestinationReferenceArg, optional - A name of the staging destination to which dlt will load the data temporarily before it is loaded to the destination, can also be a module imported fromdlt.destination. -
dataset_namestr, optional - A name of the dataset to which the data will be loaded. A dataset is a logical group of tables ie.schemain relational databases or folder grouping many files. If not provided, the value passed todlt.pipelinewill be used. If not provided at all then defaults to thepipeline_name -
credentialsAny, optional - Credentials for thedestinationie. database connection string or a dictionary with google cloud credentials. In most cases should be set to None, which letsdltto usesecrets.tomlor environment variables to infer right credentials values. -
table_namestr, optional - The name of the table to which the data should be loaded within thedataset. This argument is required for adatathat is a list/Iterable or Iterator without__name__attribute. The behavior of this argument depends on the type of thedata:- generator functions - the function name is used as table name,
table_nameoverrides this default @dlt.resource- resource contains the full table schema and that includes the table name.table_namewill override this property. Use with care!@dlt.source- source contains several resources each with a table schema.table_namewill override all table names within the source and load the data into single table.
- generator functions - the function name is used as table name,
-
write_dispositionTWriteDispositionConfig, optional - Controls how to write data to a table. Accepts a shorthand string literal or configuration dictionary. Allowed shorthand string literals:appendwill always add new data at the end of the table.replacewill replace existing data with new data.skipwill prevent data from loading. "merge" will deduplicate and merge data based on "primary_key" and "merge_key" hints. Defaults to "append". Write behaviour can be further customized through a configuration dictionary. For example, to obtain an SCD2 table providewrite_disposition={"disposition": "merge", "strategy": "scd2"}. Please note that in case ofdlt.resourcethe table schema value will be overwritten and in case ofdlt.source, the values in all resources will be overwritten. -
columnsTAnySchemaColumns, optional - A list of column schemas. Typed dictionary describing column names, data types, write disposition and performance hints that gives you full control over the created table schema. -
primary_keyTColumnNames, optional - A column name or a list of column names that comprise a private key. Typically used with "merge" write disposition to deduplicate loaded data. -
schemaSchema, optional - An explicitSchemaobject in which all table schemas will be grouped. By defaultdlttakes the schema from the source (if passed indataargument) or creates a default one itself. -
loader_file_formatTLoaderFileFormat, optional - The file format the loader will use to create the load package. Not all file_formats are compatible with all destinations. Defaults to the preferred file format of the selected destination. -
table_formatTTableFormat, optional - Can be "delta" or "iceberg". The table format used by the destination to store tables. Currently you can select table format on filesystem and Athena destinations. -
schema_contractTSchemaContract, optional - On override for the schema contract settings, this will replace the schema contract settings for all tables in the schema. Defaults to None. -
refreshTRefreshMode, optional - Fully or partially reset sources before loading new data in this run. The following refresh modes are supported:drop_sources- Drop tables and source and resource state for all sources currently being processed inrunorextractmethods of the pipeline. (Note: schema history is erased)drop_resources- Drop tables and resource state for all resources being processed. Source level state is not modified. (Note: schema history is erased)drop_data- Wipe all data and resource state for all resources being processed. Schema is not modified.
Raises:
PipelineStepFailed- when a problem happened duringextract,normalizeorloadsteps.
Returns:
LoadInfo- Information on loaded data including the list of package ids and failed job statuses. Please not thatdltwill not raise if a single job terminally fails. Such information is provided via LoadInfo.
sync_destination
@with_config_section(sections=(),
merge_func=ConfigSectionContext.prefer_existing)
def sync_destination(destination: TDestinationReferenceArg = None,
staging: TDestinationReferenceArg = None,
dataset_name: str = None) -> None
Synchronizes pipeline state with the destination's state kept in dataset_name
Notes:
Attempts to restore pipeline state and schemas from the destination. Requires the state that is present at the destination to have a higher version number that state kept locally in working directory. In such a situation the local state, schemas and intermediate files with the data will be deleted and replaced with the state and schema present in the destination.
A special case where the pipeline state exists locally but the dataset does not exist at the destination will wipe out the local state.
Note- this method is executed by therunmethod before any operation on data. Userestore_from_destinationconfiguration option to disable that behavior.
activate
def activate() -> None
Activates the pipeline
The active pipeline is used as a context for several dlt components. It provides state to sources and resources evaluated outside of
pipeline.run and pipeline.extract method. For example, if the source you use is accessing state in dlt.source decorated function, the state is provided
by active pipeline.
The name of active pipeline is used when resolving secrets and config values as the optional most outer section during value lookup. For example if pipeline
with name chess_pipeline is active and dlt looks for BigQuery configuration, it will look in chess_pipeline.destination.bigquery.credentials first and then in
destination.bigquery.credentials.
Active pipeline also provides the current DestinationCapabilitiesContext to other components ie. Schema instances. Among others, it sets the naming convention and maximum identifier length.
Only one pipeline is active at a given time.
Pipeline created or attached with dlt.pipeline/'dlt.attachis automatically activated.run, loadandextract` methods also activate pipeline.
deactivate
def deactivate() -> None
Deactivates the pipeline
Pipeline must be active in order to use this method. Please refer to activate() method for the explanation of active pipeline concept.
has_data
@property
def has_data() -> bool
Tells if the pipeline contains any data: schemas, extracted files, load packages or loaded packages in the destination
has_pending_data
@property
def has_pending_data() -> bool
Tells if the pipeline contains any pending packages to be normalized or loaded
state
@property
def state() -> TPipelineState
Returns a dictionary with the pipeline state
naming
@property
def naming() -> NamingConvention
Returns naming convention of the default schema
last_trace
@property
def last_trace() -> PipelineTrace
Returns or loads last trace generated by pipeline. The trace is loaded from standard location.
list_extracted_resources
@deprecated(
"Please use list_extracted_load_packages instead. Flat extracted storage format got dropped"
" in dlt 0.4.0",
category=Dlt04DeprecationWarning,
)
def list_extracted_resources() -> Sequence[str]
Returns a list of all the files with extracted resources that will be normalized.
list_extracted_load_packages
def list_extracted_load_packages() -> Sequence[str]
Returns a list of all load packages ids that are or will be normalized.
list_normalized_load_packages
def list_normalized_load_packages() -> Sequence[str]
Returns a list of all load packages ids that are or will be loaded.
list_completed_load_packages
def list_completed_load_packages() -> Sequence[str]
Returns a list of all load package ids that are completely loaded
get_load_package_info
def get_load_package_info(load_id: str) -> LoadPackageInfo
Returns information on extracted/normalized/completed package with given load_id, all jobs and their statuses.
get_load_package_state
def get_load_package_state(load_id: str) -> TLoadPackageState
Returns information on extracted/normalized/completed package with given load_id, all jobs and their statuses.
list_failed_jobs_in_package
def list_failed_jobs_in_package(load_id: str) -> Sequence[LoadJobInfo]
List all failed jobs and associated error messages for a specified load_id
drop_pending_packages
def drop_pending_packages(with_partial_loads: bool = True) -> None
Deletes all extracted and normalized packages, including those that are partially loaded by default
sync_schema
@with_schemas_sync
def sync_schema(schema_name: str = None) -> TSchemaTables
Synchronizes the schema schema_name with the destination. If no name is provided, the default schema will be synchronized.
set_local_state_val
def set_local_state_val(key: str, value: Any) -> None
Sets value in local state. Local state is not synchronized with destination.
get_local_state_val
def get_local_state_val(key: str) -> Any
Gets value from local state. Local state is not synchronized with destination.
sql_client
@with_config_section(sections=(),
merge_func=ConfigSectionContext.prefer_existing)
def sql_client(schema_name: str = None) -> SqlClientBase[Any]
Returns a sql client configured to query/change the destination and dataset that were used to load the data.
Use the client with with statement to manage opening and closing connection to the destination:
with pipeline.sql_client() as client:
with client.execute_query(
"SELECT id, name, email FROM customers WHERE id = %s", 10
) as cursor:
print(cursor.fetchall())
The client is authenticated and defaults all queries to dataset_name used by the pipeline. You can provide alternative
schema_name which will be used to normalize dataset name.
destination_client
@with_config_section(sections=(),
merge_func=ConfigSectionContext.prefer_existing)
def destination_client(schema_name: str = None) -> JobClientBase
Get the destination job client for the configured destination
Use the client with with statement to manage opening and closing connection to the destination:
with pipeline.destination_client() as client:
client.drop_storage() # removes storage which typically wipes all data in it
The client is authenticated. You can provide alternative schema_name which will be used to normalize dataset name.
If no schema name is provided and no default schema is present in the pipeline, and ad hoc schema will be created and discarded after use.
managed_state
@contextmanager
def managed_state(*, extract_state: bool = False) -> Iterator[TPipelineState]
Puts pipeline state in managed mode, where yielded state changes will be persisted or fully roll-backed on exception.
Makes the state to be available via StateInjectableContext
dataset
def dataset(schema: Union[Schema, str, None] = None) -> dlt.Dataset
Returns a dataset object for querying the destination data.
Arguments:
schemaUnion[Schema, str, None] - Schema name or Schema object to use. If None, uses the default schema if set.
Returns:
dlt.Dataset- A dataset object that supports querying the destination data.