Trel SDK#
Trel SDK is available as part of the CLI tool as well as most execution profiles.
treldev
#
Contain code with functionalities frequently needed by Trel jobs.
Classes:
|
Inherit from here to have |
|
Inherit from this class to simplify sensor creation. |
|
Fake file-like stream object that redirects writes to a logger instance. |
Parent class for URIs that can be validated |
Functions:
|
Fetches arguments relevant to the in multiple scenarios. |
|
Convert an instance_ts_str to |
- class treldev.ClockBasedSensor(config, credentials, *args, **kwargs)#
Inherit from here to have
get_new_datasetspecs
callget_new_datasetspecs_with_cron_and_precision
Methods:
__init__
(config, credentials, *args, **kwargs)Sets
instance_ts_precision
.get_new_datasetspecs
(datasets, **kwargs)Calls get_new_datasetspecs_with_cron_and_precision
- __init__(config, credentials, *args, **kwargs)#
Sets
instance_ts_precision
. Setsbackfill_newest_first
with default True.
- get_new_datasetspecs(datasets, **kwargs)#
Calls get_new_datasetspecs_with_cron_and_precision
- class treldev.Sensor(config, credentials, logger, debug)#
Inherit from this class to simplify sensor creation. You need to implement the following functions:
__init__
(Optional)get_search_criteria
(Optional)get_new_datasetspecs
save_data_to_path
Methods:
__init__
(config, credentials, logger, debug)When inheriting, call super __init__ with the same parameters.
get_dataset_classes
(load_info)Get all the datasets class that can be loaded for this load_info
get_new_datasetspecs
(datasets, **kwargs)Given a list of relevant datasets, this returns the datasets specs that need to be added.
This function provides the get_new_datasetspecs you need if the data availability is determined only by the clock.
Generates a search criteria from the sensor configuration for finding datasets that already exist in the catalog.
init_and_run
([class_obj])Initializes the sensor and runs it.
save_data_to_path
(load_info, uri[, dataset])Here, you load the data specified by
load_info
intouri
.- __init__(config, credentials, logger, debug)#
When inheriting, call super __init__ with the same parameters.
Set any sensor-specific configurations here. If not, you will have to use them as
self.config['...']
.
- get_dataset_classes(load_info)#
Get all the datasets class that can be loaded for this load_info
- get_new_datasetspecs(datasets, **kwargs)#
Given a list of relevant datasets, this returns the datasets specs that need to be added.
It should yield tuples of the form
(load_info, dataset_spec)
.Accept kwargs
Make sure you accept
**kwargs
in your implementation for forward compatibility.The former is a string. The latter is a dict that can contain:
dataset_class (optional: default is from config)
instance_prefix
instance_ts
instance_ts_precision
label (optional: default is from config)
repository (optional: default is from config)
alt_uri (optional)
If
alt_uri
is provided, this sensor is only “marking” the data where it is, and the dataset inserted into the catalog will be visible immediately.If it is not provided, then it means the sensor is doing the data loading, not just the sensing. Subsequently,
save_data_to_path
method will be called withload_info
and theuri
. It will load the data to the URI.
- get_new_datasetspecs_with_cron_and_precision(datasets, **kwargs)#
This function provides the get_new_datasetspecs you need if the data availability is determined only by the clock. E.g.,
Read time data such as tweets from Twitter, or stock price information from Finnhub
Periodic pull of a transactional table such as sales.
Real-time drop of a dataset in some location. This is not supported yet. Use a sensor that will actually check for the data.
In such cases, there is no need to check if data is present. We can assume it is by looking at the clock.
This function is able to do this and return the datasets that must be pulled from the source and loaded.
It uses existing attributes:
cron_constraint instance_ts_precision max_instance_age_seconds delay_seconds
and new attributes:
backfill_newest_first # Default. If true load the newest period first. period_cron_constraint # Optional. See below
along with current time to automatically figure out which periods need to be loaded into datasets. It provides the following as
load_info
to your save_data_to_path function:{ 'instance_ts': instance_ts, # equivalent to period_start 'period_end': period_end, }
period_cron_constraint
allows you to not load data corresponding to all time duration specified by the normal period. E.g., if you specify justcron_constraint
to be weekly on sundays, then each period is for a week and period_end is 7 days away from instance_ts.But if you also provide
period_cron_constraint
as daily, then each period is for a day and we only load the periods starting on Sundays. This way,period_end
is 24 hours after instance_ts.
- get_search_criteria()#
Generates a search criteria from the sensor configuration for finding datasets that already exist in the catalog. The sensor will not load these datasets again.
This function must return a search criteria with the following syntax:
{ "dataset_classes": [ <dataset_class> ], "instance_ts_precisions": [ ... ], "instance_prefix": <instance_prefix> , "instance_prefix_is_null": True|False, "labels": [ <label> ], "repositories": [ <repository> ], "min_instance_ts": <ts>, # If instance_ts of dataset is older than this, ignore "min_ready_ts": <ts>, # If dataset is older than this, ignore }
Default behavior:
Passes the parameters
dataset_class
,label
,repository
from config.instance_ts_precision
from config is not passed and is filtered for later.If
instance_prefix
is provided in config,instance_prefix
is set in the criteria.instance_prefix_is_null
is set to True if it is None.If not provided,
instance_prefix
is set to None andinstance_prefix_is_null
is set to False.max_age_seconds
is used to computemin_ready_seconds
.max_instance_age_seconds
is used to computemin_instance_ts
.If
min_instance_ts
is also provided in the config, the larger one is picked.
Usually, the default behavior is enough and you do not need to re-implement this function.
- classmethod init_and_run(class_obj=None)#
Initializes the sensor and runs it. Call this from your sensor code.
class_obj
is not required and is depracated.Example:
import treldev class MySensor(treldev.Sensor): ... if __name__ == '__main__': MySensor.init_and_run()
- save_data_to_path(load_info, uri, dataset=None, **kwargs)#
Here, you load the data specified by
load_info
intouri
. The format ofload_info
depends on theget_new_datasetspecs
function.Implement this for sensors where data needs to be loaded by the sensor. In such cases,``get_new_datasetspecs`` will return without
alt_uri
.Accept kwargs
Make sure you accept
**kwargs
in your implementation for forward compatibility.The first two parameters are positional parameters, but anything else will be named parameters, even if always provided. Make sure you always put
**kwargs
at the end to catch any new parameters without breaking your sensor.
- class treldev.StreamToLogger(logger, level)#
Fake file-like stream object that redirects writes to a logger instance.
Methods:
__init__
(logger, level)flush
()Flush the stream, which involves logging any data remaining in the buffer.
- __init__(logger, level)#
- flush()#
Flush the stream, which involves logging any data remaining in the buffer. This is typically called when the stream is being closed or needs to be forcibly flushed.
- class treldev.ValidatableURI#
Parent class for URIs that can be validated
Methods:
validate
(job_args[, save, sodacore_check, ...])This function can pull the schema and validation information from the args and run it against the sodacore
- validate(job_args, save=True, sodacore_check=False, sodacore_schema_check=False, sodacore_extra_checks=None, return_folder='return')#
This function can pull the schema and validation information from the args and run it against the sodacore
- treldev.get_args(default_args=None)#
Fetches arguments relevant to the in multiple scenarios. Try to use this for every job.
This function looks for arguments in a number of locations. 1. If the –_args <yml file> is provided, the file is parsed and returned 2. If default_args is provided, that is returned. 3. If the –_creds <yml file> is provided, the file is parsed and added to the args under args[‘credentials’] 4. If the –_creds has a .encrypted file, decrypt it using TREL_CREDENTIAL_KEY environment variable. This is what happens when you run the job through the Trel platform. 5. If the –_creds option is not provided, try to load ‘~/.trel_credentials.yml’ for credentials instead.
To the contents of args (if any), _input_paths and _output_paths are added along with _cli_args. See example below for format:
python3 job.py --_i log:s3:/.../ users:s3:/..../ \ --_o users:s3:/......./ \ --_cli_args '{"temp_paths":{"s3":"s3:/.../"}}'
This way, you can switch between debugging and production run without changing code while keeping the job code clean. E.g. consider the following job code
import treldev args = treldev.get_args() sc.textFile(args['inputs']['log'][0]['uri'])\ .distinct()\ .saveAsTextFile(args['outputs']['distinct_log'][0]['uri'])
Trel in production will run this as
python3 distinct.py --_args args.yml
and it will work fine.But for testing, you can construct a command line as follows:
pyspark distinct.py --_i log:s3:/.../ --_o distinct_log:s3:/.../
This allows you to develop and deploy with convenience. The full list of possible arguments accepted on the CLI are,
parameter
description
_cli_args
A json that is applied first to the result
-_i
Input paths. This and the ones below are applied to the
_cli_args
-_o
Output paths.
Note that if you are using any other attributes of inputs and outputs such as schema, you have to insert everything into _cli_args.
- treldev.instance_ts_str_to_ts_precision(ts_str)#
Convert an instance_ts_str to
instance_ts
andinstance_ts_precision
.Example:
20220101 -> datetime(2022,1,1), 'D' 20220101_01 -> datetime(2022,1,1,1), 'H' 20220101_0101 -> datetime(2022,1,1,1,1), 'M' 20220101_010101 -> datetime(2022,1,1,1,1,1), 'S'
treldev.gcputils
#
GCP Utils
Classes:
|
Connects to bigquery by wrapping credentials management. |
|
Parses BigQuery URIs. |
|
Parses Google Storage URIs |
- class treldev.gcputils.BigQuery#
Connects to bigquery by wrapping credentials management.
Methods:
get_client
([credentials])Returns an authenticated bigquery client object
- classmethod get_client(credentials=None)#
Returns an authenticated bigquery client object
- class treldev.gcputils.BigQueryURI(bq_uri)#
Parses BigQuery URIs. Provides functionality for common tasks in a Trel data pipeline such as querying to table, loading file, exporting to GS etc.
URI Format: bq://<project>/<dataset>/<table>
Methods:
__init__
(bq_uri)Initializes this class
export_to_gs
(gs_path, extract_job_config[, ...])Exports table to Google Storage at gs_path.
get_table
([return_none_on_missing])Returns the table object for the URI
load_file
(file_path, job_config[, kwargs])Loads a given file to this table.
save_sql_as_view
(sql)Saves the given SQL as a view at the URI location
save_sql_results
(sql[, query_job_config, quiet])Saves SQL results to this table.
- __init__(bq_uri)#
Initializes this class
- Parameters
bq_uri (str) – The URI to parse
- export_to_gs(gs_path, extract_job_config, kwargs={}, file_suffix='')#
Exports table to Google Storage at gs_path. Uses client.extract_table from bigquery Python API.
Example:
BigQueryURI('bq://myproject/mydataset/my_source_table')\ .export_to_gs('gs://mybucket/path/')
- Parameters
gs_path (str) – The path to export to
extract_job_config (dict) – provided to ExtractJobConfig and then client.extract_table.
kwargs (dict) – Provided to extract_table
file_suffix (str) – Unused
- get_table(return_none_on_missing=True)#
Returns the table object for the URI
- load_file(file_path, job_config, kwargs={})#
Loads a given file to this table. Uses client.load_table_from_file of bigquery python API.
Example:
BigQueryURI('bq://myproject/mydataset/my_source_table')\ .load_file('data_file.json', job_config={ 'source_format': bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, 'schema': [ bigquery.SchemaField('id', 'INT64',"REQUIRED"), bigquery.SchemaField('name', 'STRING',"REQUIRED"), ] })
- Parameters
file_path (str) – The path to the file to load
job_config (dict) – provided to client.query. First sets
write_disposition
to bigquery.job.WriteDisposition.WRITE_TRUNCATE
- save_sql_as_view(sql)#
Saves the given SQL as a view at the URI location
- Parameters
sql (str) – The SQL of the view
- save_sql_results(sql, query_job_config={}, quiet=False)#
Saves SQL results to this table. Uses client.query if bigquery Python API.
Raises exceptions if.
TODO: Query cancelled
Got exit events (cancelled from Trel)
Query failed
Example:
BigQueryURI('bq://myproject/mydataset/my_target_table')\ .save_sql_results('select * from my_source_table')
- Parameters
sql (string) – The SQL to run
query_job_config (dict) – provided as job_config to client.query. If
write_disposition
is not provided, this function will set it to bigquery.job.WriteDisposition.WRITE_TRUNCATE.quiet (bool) – If true, does not print the SQL. This printed SQL shows up in the stdout of the job and can be useful for debugging.
treldev.awsutils
#
Classes:
|
Connects to Athena by wrapping credentials management. |
|
Parses Athena URIs. |
|
|
|
Parses S3 URIs |
- class treldev.awsutils.Athena#
Connects to Athena by wrapping credentials management.
Methods:
get_client
(region[, module_name])Returns an authenticated Athena boto3 client object
- classmethod get_client(region, module_name='athena')#
Returns an authenticated Athena boto3 client object
- class treldev.awsutils.AthenaURI(athena_uri)#
Parses Athena URIs. Provides functionality for common tasks in a Trel data pipeline such as querying to table, etc.
URI Format: athena://<region>/<catalog>/<database>/<table>
Methods:
__init__
(athena_uri)Initializes this class
create_from_s3_uri
(s3_uri, schema[, ...])Saves the data to the specified s3_uri.
save_sql_results
(s3_uri, sql[, ...])Runs the query, saves it to the S3 location, and returns the query execution_id.
- __init__(athena_uri)#
Initializes this class
- Parameters
athena_uri (str) – The URI to parse
- create_from_s3_uri(s3_uri, schema, overwrite=True, quiet=False)#
Saves the data to the specified s3_uri.
- Parameters
s3_uri (string) – Point the table to this data
source_format (string) – This is used as part of the SQL
overwrite (bool) – If true, run a drop table first.
quiet (bool) – If true, does not print the SQL. This printed SQL shows up in the stdout of the job and can be useful for debugging.
- save_sql_results(s3_uri, sql, result_configuration={}, format='Parquet', write_compression='SNAPPY', partitioned_by=None, bucketed_by=None, bucket_count=None, quiet=False)#
Runs the query, saves it to the S3 location, and returns the query execution_id. Raises exceptions if 1) Query cancelled, 2) Got exit events (cancelled from Trel) 3) Query failed
- Parameters
s3_uri (string) – The path to store the results in.
sql (string) – The SQL to run
query_job_config (dict) – provided as job_config to client.query. First sets
write_disposition
to bigquery.job.WriteDisposition.WRITE_TRUNCATEquiet (bool) – If true, does not print the SQL. This printed SQL shows up in the stdout of the job and can be useful for debugging.
- class treldev.awsutils.S3#
Methods:
get_client
(region)Returns an authenticated Athena boto3 client object
- classmethod get_client(region)#
Returns an authenticated Athena boto3 client object