Plugins#
Execution Profile Plugins#
python
#
Enables you to execute python scripts as a job.
Script Contract#
As explained in Execution Profiles, a file args.yml
will be created with all the details and provided to the user code in the same folder. It has the following keys:
Name |
Description |
---|---|
|
A list of dictionaries, each corresponding to an input dataset. See below for contents. |
|
A list of dictionaries each corresponding to an output dataset. |
|
The Attempt ID of this job. |
|
The Task ID of this job. |
|
The full set of parameters for this Task. |
|
Attempt specific parameters. |
|
Path to the credentials file. |
|
See Scheduling Classes for this and the ones below. |
|
|
|
|
|
|
|
|
|
A mapping from repositories to temporary folders. If you want to keep your code free of making assumptions about repository names, select the temp paths you need using execution.temp_repositories in the registration file. This allows you to use the provided paths directly without checking the repository. |
The dataset dictionaries will have the following keys. See Datasets.
id_
uri
name
instance_prefix
instance_ts_str
label
repository
schema_version
Additionally a command line will be provided. It has the following parameters.
Name |
Description |
---|---|
|
The paths of all the datasets with this name is provided with no guarantee of order. E.g. |
|
The filename of the args file. Always args.yml. |
|
The filename of the credentials file. Always credentials.yml.encrypted when run by profile. |
|
The attempt ID of this job |
|
See Scheduling Classes for this and the ones below. |
|
|
|
|
|
When executed with these parameters, the script is supposed to return 0 if and only if it was completely successful. Otherwise return a value greater than 0.
emr_pyspark
#
Enables you to execute a pyspark script commands as a Trel job. Uses a resource of class EMRMapReduceResource
.
This profile is very easy to use. Simply set the parameters defined below and Trel will run your pyspark script on a cluster.
Script Contract#
Same as that of python.
Profile Default Parameters#
These are to be entered under profile_default_parameters
in .trel.config in the trel server.
aws.s3_temp_paths_by_region
: Maps regions to temp S3 folders. This is required because the S3 folder may store bootstrap files, and they have to come from the same region as the cluster you are starting for the job.aws.s3_temp_paths_by_region: us-east-2: s3://<bucket in us-east-2>/<prefix>/{attempt_id}
aws.availablity_zone_subnets
: This specifies the subnet for each zone. Used byEMRMapReduceResource
.aws.availablity_zone_subnets: us-east-2a: subnet-???????? # (subnet for us-east-2a) us-east-2b: subnet-????????
emr_spark.ami_version
emr_spark.ami_version: emr-5.28.0
emr_spark.starting_wait_max_seconds
: How long with the profile wait for the cluster to leave “STARTING” phase.emr_spark.starting_wait_max_seconds: 1200
emr_spark.executor_memory
: How much memory will be allocated to each spark executor for various memory specifications.emr_spark.executor_memory: low: 12 normal: 24 normal_high: 36 high: 48
emr.jobflow_role
: DefaultEMR_EC2_DefaultRole
.emr.service_role
: DefaultEMR_DefaultRole
.emr_spark.bootstrap_actions
: A list of scripts that will be used to initialize the EMR cluster. Example:emr_spark.bootstrap_actions: - ['default','s3://...../bootstrap.sh', []] - ['datascience','s3://...../ds_bootstrap.sh', []]
The entries consist of name, path to the script, and script parameters.
Profile Parameters#
emr_spark.bootstrap_actions
: These will be applied after the bootstrap commands from the previous section. Note that if you have entries here, it will complicate sharing clusters with other jobs.
Credentials required: emr.access_key
, aws.instance_key_name
bigquery
#
This is to be used when you have a BigQuery job. As of now, it is identical to python
profile, but that can change.
Credentials required: gcp.service_json
Sample code:
input = BigQueryURI(input_bq)
BigQueryURI(output_bq).save_sql_results(f'''
SELECT distinct user_id from {input.path}''')
Primarily, writing a BigQuery job relies on the SDK, not the profile. The relevant classes in the SDK are given below:
The following SDK classes help with writing jobs for this profile:
athena
#
This is to be used when you have an Athena job. As of now, it is identical to python
profile, but that can change.
One important oddity to note is that because Athena does not store data, all athena queries require two output paths, one for s3 and one for athena. This requires a special behavior in the output specification.
execution.output_generator:
class: default
outputs:
- name: output_s3
dataset_class: logs
repository: dl_s3
- name: output_athena
repository: dw_athena
link_to_previous: True
This way, two outputs are generated, both are cataloged, and the athena dataset is registered to be dependent on the s3 dataset.
Credentials required: aws.access_key
Sample code:
input = AthenaURI(input_athena)
AthenaURI(output_athena).save_sql_results(output_s3, f'''
SELECT distinct user_id from {input.path}''')
Primarily, writing an Athena job relies on the SDK, not the profile. The relevant classes in the SDK are given below:
The following SDK classes help with writing jobs for this profile:
databricks_notebookjob
#
Enables you to execute a databricks notebook job as a Trel job. Sample registration file:
name: request_log_distinct_dbn
execution.profile: databricks_notebookjob
execution.source_code.main:
class: databricks_notebookjob
path: https://<something>.cloud.databricks.com#<job_id>
branch: master
repository_map:
- request_log: s3-us-east2
scheduler:
class: single_instance
depends_on: [ request_log ]
labels: [ demo ]
instance_ts_precisions: [ D ]
cron_constraint: "0 0 * * *"
schedule_ts_min: "2021-01-01 00:00:00"
execution.output_generator:
class: default
outputs:
- dataset_class: request_log_distinct
Credentials required: databricks.access_token
Script Contract#
Task information dict similar to what is found in args.yml from python is provided to the notebook with key args
as a yaml string. Parse it and look for the input and output paths. Transform the inputs to the outputs. Sample notebook:
import yaml
args = yaml.safe_load(dbutils.widgets.get("args"))
input_path = args['inputs']['<dataset_class>'][0]['uri']
output_path = args['outputs']['<dataset_class>'][0]['uri']
sc.textFile(input_path).distinct().saveAsTextFile(output_path)
Profile parameters#
These can be put in the registration file under profile_parameters
.
databricks_notebookjob.branch_job_map
: Maps branches to jobs. This is required if using the branch feature in Trel. You are only allowed to specify a branch that exists in this parameter. This is because each notebook has a fixed code.profile_parameters: databricks_notebookjob.branch_job_map: master: 'https://<something>.databricks.com#<job_id>' qa: 'https://<something>.databricks.com#<job_id2>'
ec2_single
#
Enables you to execute a python script in an EC2 instance. Uses a resource of class EC2Resource
.
Script Contract#
Same as that of python.
Job parameters:#
resource.static_resource_id
If provided, this will be used to run the attempt instead of a new instance being spun up.
Profile Default Parameters#
These are to be entered under profile_default_parameters
in .trel.config in the trel server.
ec2.python
The path to the python command to run
Default:
/usr/bin/python3
ec2.allocate_if_static_resource_is
If the static resource is found to be in any of these states, a new resource will be allocated. Valid states
Default: [‘_MISSING’,’terminated’]
ec2.starting_wait_max_seconds
Terminate ec2 request if the instance has not been allocated for this long.
Default: 1200
ec2.bootstrap_script
Run these shell commands provided as a list of strings when initializing an instance
Default: []
Profile Parameters#
These are to be set in profile_parameters in job registration.
ec2.job_setup_script
Run these shell commands provided as a list of strings just before starting the job.
Default: []
Credentials required: emr.access_key
, aws.instance_key
Execution APIs#
Azure Synapse#
To use Trel for running SQL queries in a Synapse workspace, you need to configure credentials. Provide the username, password for the SQL pool, and the driver information.
Azure URI Format#
The Azure URI format in trel is used to specify the location of data objects in Azure services. The format is as follows:
azsa://<server_url>/<db_name>/<schema>/<table_name>
Where:
<server_url>
: URL of the Synapse server.<db_name>
: Name of the database.<schema>
: Name of the schema.<table_name>
: Name of the table.
Running SQL Queries:
This sample code demonstrates how to run SQL queries in a Synapse workspace using Trel.
# Define the Azure URI for input data
input_uri = 'azsa://<server_url>/<db_name>/<schema>/<table_name>'
# Initialize a SynapseURI object with the input Azure URI
input = azureutils.SynapseURI(input_uri)
# Save the results of the SQL query to a Blob Storage location
input.save_sql_results(
azureutils.BlobURI('abfss://container@storage_ac_name.dfs.core.windows.net/path/to/table/'),
f"SELECT distinct user_id from {input.path}")
Creating External Tables from Blob Storage:
This sample code demonstrates how to create external tables in a Synapse workspace from Blob Storage using Trel.
# Define the Azure URI for input data
input_uri = 'azsa://<server_url>/<db_name>/<schema>/<table_name>'
# Initialize a SynapseURI object with the input Azure URI
input = azureutils.SynapseURI(input_uri)
# Create an external table from a Blob Storage location
input.create_from_blob_uri(
azureutils.BlobURI('abfss://container@storage_ac_name.dfs.core.windows.net/file/path'),
schema={"columns":[{"name":"column_name","type":"datatype"}], "format":"parquet"})
Validation Example#
This example demonstrates how to validate data and schema integrity using Trel.
schema:
columns:
- mode: required
name: name
type: nvarchar
format: parquet
sodacore_checks: |
- row_count:
fail: when < 1
warn: when < 2
Validation Code#
This code snippet validates the data and schema integrity.
res = input.validate_and_return(
args, args['outputs']['request_log'][0],
sodacore_check=True, sodacore_schema_check=True)
Ensure you properly configure credentials, understand the Azure URI format, and use the provided sample code for running SQL queries and creating external tables. Additionally, utilize the validation code example for ensuring data quality and schema integrity.
The following SDK classes help with writing jobs for this profile:
treldev.azureutils.Synapse
treldev.azureutils.SynapseURI
treldev.azureutils.BlobURI
Repository Plugins#
This section provides plugin specific information. See Repositories for details about the concept.
athena
: AWS Athena (Hive metastore)#
Supports Athena URIs. Config format:
{"catalog":"...", "database":"...","prefix":"...",
"region":"...", "temp_database":"...",
"staging_dir":"s3://...", "validation_schema":"tmp"}
Here prefix is mandatory, as Athena cannot have tables that are just numbers.
The repository will generate default URIs of the format athena://<region>/<catalog>/<database>/<prefix><dataset_id>
Temp URIs of the format athena://<region>/<catalog>/<temp_database>/<attempt_id>
Because Athena UI has no URL to show a specific table, the dataset access URLs can only take you to the athena homepage
Credentials required: aws.access_key
trino
: In-house Trino cluster#
Supports Trino URIs from an on-premise cluster. Config format:
{"server":"","port":"", "catalog":"...", "schema":"...","prefix":"...",
"temp_prefix":"...", "validation_schema":"tmp", "credential_key":""}
Here prefix is mandatory, as Trino cannot have tables that are just numbers.
The repository will generate default URIs of the format trino://<server>:<port>/<catalog>/<schema>/<prefix><dataset_id>
Temp URIs of the format trino://<region>/<catalog>/<temp_schema | schema>/zattempt_<attempt_id>_
Credentials required: trino
or as per credential_key
parameter.
s3
: AWS S3#
Supports Google Storage URIs. Config format:
{'bucket':'...', 'prefix':'...',
'region':'...', 'temp_prefix':'...'}
Paths must end in a /
if the path is a folder.
The repository will generate default URIs of the format s3://<bucket>/<prefix><dataset_id>
Temp URIs of the format s3://<bucket>/<temp_prefix><attempt_id>
Credentials required: aws.access_key
bq
: BigQuery#
Supports Google BigQuery URIs. Config format:
{'project':'...', 'dataset':'...','prefix':'...',
'region':'...', 'temp_dataset':'...'}
The repository will generate default URIs of the format bq://<project>/<dataset>/<prefix><dataset_id>
Temp URIs of the format bq://<project>/<temp_dataset>/<attempt_id>
Credentials required: gcp.service_json
gs
: Google Storage#
Supports Google Storage URIs. Config format:
{'bucket':'...', 'prefix':'...',
'region':'...', 'temp_prefix':'...'}
Paths must end in a /
if the path is a folder.
The repository will generate default URIs of the format gs://<bucket>/<prefix><dataset_id>
Temp URIs of the format gs://<bucket>/<temp_prefix><attempt_id>
Credentials required: gcp.service_json
Scheduling Class Plugins#
BaseSchedule#
All scheduling classes inherit from BaseSchedule and most of them let the parent class initialize certain common configuration. They are given below.
Configuration |
Comment |
---|---|
|
A main list of dataset classes to depend on. |
|
The list of labels that are acceptable in input combinations. All the datasets in the input combination must share the same label. This behaviour can be modified using Prefix label selection. |
|
If no label is present in the schedule_id, what is the schedule_label? By default, it is the first label in |
|
|
|
A list of valid instance_ts_precisions. All inputs datasets must share the precision. |
|
All input datasets must satisfy this cron constraint. |
|
All output datasets must satisfy this cron constraint. Some scheduling classes require that this is a subset of |
|
Use this if both the input and output cron constraints are the same. |
|
Ignore schedule_instance_ts older than this timestamp. |
|
Ignore schedule_instance_ts older than this from now. |
|
A specification of additional datasets required for the job, which are not strictly constrained. See Other datasets. |
Implementation note: This class has lots of default code that is useful to a new child class that needs cron constraints. The functions that are left to be implemented are clearly marked in code. For new scheduling classes that can support having no cron constraints like single_instance
, more functions need to be re-implemented.
single_instance
#
The main property of this scheduling class is that all input combinations must consist of datasets that share the same instance. That means the same (instance_prefix
, instance_ts
, instance_ts_precision
). It adds the following configurations on top of those supported by BaseSchedule.
Configuration |
Comment |
---|---|
|
A specific value for instance prefix to take. Default behavior is to not constrain the instance prefix. All schedule inputs with the same instance prefix are valid. |
|
A regex for the instance prefix. All schedule inputs must still share the same instance prefix to be valid. |
For example, consider the following scheduler configuration:
scheduler:
class: single_instance
depends_on: [ server_log, customer_list ]
instance_ts_precisions: [ D ]
labels: [ prod ]
cron_constraint: "0 0 * * *"
schedule_ts_min: "2020-01-01 00:00:00"
The following is a valid schedule input for this scheduler:
schedule_id: 20210101
inputs:
- ('server_log', None, '2021-01-01 00:00:00', 'D', 'prod', 's3-us-east2')
- ('customer_list', None, '2021-01-01 00:00:00', 'D', 'prod', 's3-us-east2')
schedule_instance_prefix: None
schedule_instance_ts: '2021-01-01 00:00:00'
schedule_instance_ts_precision: 'D'
schedule_label: 'prod'
Use case#
This is used extensively to convert datasets from one form to another. E.g., convert a day of server_log_raw
to the same day’s server_log
.
Also used for joins. E.g., server_log
and customer_list
is used to produce server_log_extended
.
This is also used to process datasets where instance_ts
is None. However, such an architecture is not recommened.
multi_period
#
In addition to single instance behavior, this allows additional instances as inputs. The instances added can have different instance_ts
only. Unlike single_instance, this scheduling class is no longer capable of handling datasets that have no instance_ts
.
The ability of this schedule to handle additional instances come from a new configuration given below:
Configuration |
Comment |
---|---|
|
There are three options.
|
The periods that this key periods
is referring to is defined by the input_cron_constraint
. E.g., if input_cron_constraint
is 0 0 * * *
, then the period is a day. If 2/15/2021 is the current period, then the next period is 2/16/2021.
For example, consider the following scheduler configuration:
scheduler:
class: multi_period
depends_on: [ server_log, customer_list ]
period:
server_log: # current, previous, and next periods
periods: 3
offset: -1
customer_list: 1
instance_ts_precisions: [ D ]
labels: [ prod ]
cron_constraint: "0 0 * * *"
schedule_ts_min: "2020-01-01 00:00:00"
The following is a valid schedule input for this scheduler:
schedule_id: 20210101
inputs:
- ('server_log', None, '2020-12-31 00:00:00', 'D', 'prod', 's3-us-east2')
- ('server_log', None, '2021-01-01 00:00:00', 'D', 'prod', 's3-us-east2')
- ('server_log', None, '2021-01-02 00:00:00', 'D', 'prod', 's3-us-east2')
- ('customer_list', None, '2021-01-01 00:00:00', 'D', 'prod', 's3-us-east2')
schedule_instance_prefix: None
schedule_instance_ts: '2021-01-01 00:00:00'
schedule_instance_ts_precision: 'D'
schedule_label: 'prod'
In this case, even though the schedule_id
is for 1/1/2021, this schedule input cannot execute until the 1/2/2021 dataset for server_log
is ready.
Here, output_cron_constraint
and input_cron_constraint
can be different. E.g., if the former is changed to 0 0 * * 0
, then a schedule is created only when schedule_ts is a sunday. However, inputs will still be Saturday, Sunday, and Monday.
If the output_cron_constraint
is not a subset of the input_cron_constraint
, no exception will be thrown, but the behavior of the class is not specified.
Use case#
This is used where a sequence of days or hours need to be processed to generate the output. E.g., reading 30 days of server_log
to generate kpi_30day
.
periodic
#
In addition to single instance behavior, this allows the same dataset class to be in both input and output. This is typically required for datasets that should be updated using another.
E.g., assume \(A\) is a dataset that needs to be updated weekly using daily datasets of \(B\) and \(C\). A dataset of \(D\) is also required with the schedule instance ts to generate the new \(A\). Then the output and input can be shown as,
The \(B\) and \(C\) can be indicated in configuration under depends_on
field. \(A\) and \(D\) can be specified using two new configurations.
Configuration |
Comment |
---|---|
|
A list of dataset classes that will be updated. Viz. \(A\) from above example. |
|
A list of dataset classes for which a single dataset is required with the |
|
Same as before, but restricted to one value. |
|
Provide a separate precision for depends_on datasets. This way you can have a job that combine hourly datasets into a daily one. Also restricted to exactly one value |
For example, consider the following scheduler configuration:
scheduler:
class: periodic
depends_on: [ server_log ] # B,C from example
self_depends_on: [ kpis ] # A from example
depends_on_for_schedule_ts_only: [ customer_list ] # D from example
instance_ts_precisions: [ D ]
labels: [ prod ]
input_cron_constraint: "0 0 * * *"
output_cron_constraint: "0 0 * * 0"
schedule_ts_min: "2020-01-01 00:00:00"
The following is the first valid schedule input for this scheduler in 2021:
schedule_id: 20210103
inputs:
- ('kpis', None, '2021-12-27 00:00:00', 'D', 'prod', 's3-us-east2')
- ('server_log', None, '2020-12-28 00:00:00', 'D', 'prod', 's3-us-east2')
- ('server_log', None, '2020-12-29 00:00:00', 'D', 'prod', 's3-us-east2')
- ('server_log', None, '2020-12-30 00:00:00', 'D', 'prod', 's3-us-east2')
- ('server_log', None, '2020-12-31 00:00:00', 'D', 'prod', 's3-us-east2')
- ('server_log', None, '2021-01-01 00:00:00', 'D', 'prod', 's3-us-east2')
- ('server_log', None, '2021-01-02 00:00:00', 'D', 'prod', 's3-us-east2')
- ('server_log', None, '2021-01-03 00:00:00', 'D', 'prod', 's3-us-east2')
- ('customer_list', None, '2021-01-03 00:00:00', 'D', 'prod', 's3-us-east2')
schedule_instance_prefix: None
schedule_instance_ts: '2021-01-03 00:00:00'
schedule_instance_ts_precision: 'D'
schedule_label: 'prod'
This class is flexible in terms of how it can be configured. The only constraint is that at least one of depends_on
or depends_on_for_schedule_ts_only
must not be empty. By leaving self_depends_on
as empty, this allows the following input selection as well.
The following scheduler configuration can combine hourly datasets of server_log
into daily:
scheduler:
class: periodic
depends_on: [ server_log ] # B,C from example
instance_ts_precisions: [ D ]
depends_on_instance_ts_precisions: [ H ]
labels: [ prod ]
input_cron_constraint: "0 * * * *"
output_cron_constraint: "0 0 * * *"
schedule_ts_min: "2020-01-01 00:00:00"
The output_cron_constraint
and input_cron_constraint
behave similar to multi_period.
Use case#
When
self_depends_on
is not empty, this schedule can be used to update a dataset with new data. The above example is this case.When
self_depends_on
is empty, this schedule can be used to combine and process data from multiple periods where the number of periods are better expressed as a cron rather than number of days. E.g., process all the data for each month: (input_cron_constraint
:0 0 * * *
,output_cron_constraint
:0 0 L * *
)
Resource Plugins#
ec2
: EC2 Instance#
Use this resource to get access to an ec2 instance. ec2_single
execution profile plugin uses this resource. This class can generate the parameters for run_instances
in boto3 library. It leaves out MinCount
, and MaxCount
Known parameters:#
creation_extra_args
This dictionary will be used to update profile_default_parameters/ec2.creation_extra_args. Update is done using the
.update
of python dictionary. So, only the top level will be updated. Inner dictionaries and lists are fully replaced.The profile will pass the updated dictionary to run_instances. This profile will add the information from the following parameters into this dictionary before passing to the function.
The following are in-built defaults that have the lowest precedence:
/BlockDeviceMappings/0/DeviceName /dev/xvda
/BlockDeviceMappings/0/Ebs/DeleteOnTermination True
/BlockDeviceMappings/0/Ebs/VolumeType gp3
The following are enforced values. These have the highest precedence:
IF SPOT:
/InstanceMarketOptions/SpotOptions/SpotInstanceType one-time
/InstanceMarketOptions/SpotOptions/InstanceInterruptionBehavior terminate
region
Uses
profile_default_parameters/ec2.region
as backup.
instance_type
Uses
profile_default_parameters/ec2.instance_type
as backup.E.g.: r5.25xlarge.
ami
Uses
profile_default_parameters/ec2.ami
as backup.
ebs_storage
Specified in GB. Uses
profile_default_parameters/ec2.ebs_storage
as backup.
subnet_id
Uses
profile_default_parameters/ec2.subnet_id
as backup.Default: Configured default subnet for the default VPC.
spot
Uses
profile_default_parameters/ec2.spot
as backup.Default: False
tags
Here is the syntax:
tags: resource_type: key1: value1 key2: value2These tags update
profile_default_parameters/ec2.tags
.
bootstrap
These commands are run on the instance after profile_default_parameters/ec2.bootstrap
Dynamic Virtual resources#
{region}_{instance_type}: 1 {region}_ec2_instance: 1 {region}_ec2_job: 1
Retry Strategy Plugins#
default
#
First, let us establish the behavior when the attempts only return ERROR
or CANCELLED
as the return code (see Return Codes).
execution.retry_strategy.attempts
is the maximum number of attemptsexecution.retry_strategy.delay
is the delay in seconds between two attempts.
Now let us examine how the behavior changes for various return codes.
ALL_ATTEMPTS_FAILING_ERROR
andTREL_ERROR
No more retries will happen. Ignores
execution.retry_strategy.attempts
.TRANSIENT_USER_CODE_EXCEPTION
These failures do not count towards attempt counts. So, the attempt will definitely retry after usual delay.
RESOURCE_DID_NOT_ALLOCATE
,RESOURCE_LOST_DURING_EXECUTION
,FAILED_DUE_TO_LOST_SLAVE
,READ_LOCK_DENIED
, andWRITE_LOCK_DENIED
These are resource failures also do not count towards attempt counts. So, the attempt will definitely retry, but with delay from
execution.retry_strategy.resource_error_delay
.
Output Generator Plugins#
This plugin system decides what are the outputs that are generated for each task. Trel comes with the default
plugin, which should be enough for almost all cases.
default
#
The default
plugin is the output generator that comes with Trel. To configure this, we provide a key outputs
, which is a list of entries, one for each output dataset. Each entry is a dict which can consist of,
dataset_class
: This mandatoryinstance_prefix
: This is optional. Default value _from_schedule_info.instance_ts
: This is optional. Default value _from_schedule_info.instance_ts_precision
: This is optional. Default value _from_schedule_info.label
: This is optional. Default value _from_schedule_info.repository
: This is optional. Default rule is_inherit
(see Output Generator Rules) if all the inputs have the same repository. Otherwise, mandatory.name
: This is an optional parameter. Default is same asdataset_class
. See Naming Datasets.uri_formatter
: This is an optional string that can be used to construct the uri for the output. See Default Output URI.schema_version
: This is optional. The default value will be the value specified for the dataset_class at the system level. You can also specify a time-series (see example below).copy_of_input_with_dataset_class
: This is optional. This indicates that this job will copy the data of the input with the specified dataset_class into this output.copy_of_input_with_dataset_class
: This is optional. This indicates that this job will link the output to the input.link_to_previous
: This is optional. This indicates that this output is to be linked to the previous output specification.
Given this configuration, the inputs and the scheduling info of those inputs, this output generator will come up with all the information required to register the outputs.
Output Generator Rules#
All fields 2 through 6 can be explicitly specified. But in general, we need some of them to change based on the input. So, certain rules are specified that can be applied on the inputs.
_max
#
For this field, pick the maximum from the inputs. E.g.,
instance_ts: _max
You can also specify it as a list where the second argument is the dataset_class
you want to apply the rule on:
instance_ts: [_max, server_log]
This picks the maximum instance_ts
of the server_log
inputs and ignores other inputs.
_min
#
Pick the minimum from the inputs
_inherit
#
For this field, assert that all the values are the same for the inputs and pick that value. This is the default for the repository field.
_from_schedule_info
#
This rule will get the value from the schedule info of the task which is determined by the scheduler. The scheduler’s choice is called schedule_info.
This rule is available only for fields 2 through 4.
Examples#
execution.output_generator:
class: default
outputs:
- dataset_class: "user_history"
name: "output_user_history"
schema_version:
- [_until, "20210105", "1"]
- [null, null, "2"]
Here, we have one entry in outputs, indicating output dataset. For this dataset, in addition to mandatory dataset_class
, the name is specified, which decides how the path to this output will be provided to the user code. Everything else, except schema is defaults, so they will be based on schedule info etc. The schema version is provided as a time-series, switching the schema from one to another based in the output instance_ts
.
Copying and Linking datasets#
If you used copy_of_input_with_dataset_class
, copy_of_input_with_dataset_class
or link_to_previous
, special situations apply.
Trel allows you to treat copying of datasets as a special case. This is because, copying does not change the data. Trel is then able to ensure the output inherits the version of the input. You can use this functionality by specifying copy_of_input_with_dataset_class
and specifying the dataset class of the input you are copying. You are required to specify the repository in this case. No other dimension is allows as everything inherits from the input. Example:
execution.output_generator:
class: default
outputs:
- copy_of_input_with_dataset_class: "user_history"
repository: bq_us
Trel also allows you to create datasets that depend on other datasets for the data. E.g., a bigquery table is an external table with paths from a google storage path that is its own dataset. In such cases, you use link_to_input_with_dataset_class
. Here any and all dimensions are allowed. However, version inheritance only works if no dimensions other than repository is provided.
Sometimes, you need to make two datasets. One with data and one a link of that data. E.g., You are running a query in Athena. The output goes to S3 necessarily. But you also want that to be registered as a table in Athena. How to do both together?
execution.output_generator:
class: default
outputs:
- dataset_class: quarterly_report
repository: dl-s3
- link_to_previous: True
repository: dw-athena
name: quarterly_report_dw
Configured like this, the job gets two paths, one for S3 and one for athena. It should run the query and write to S3 on the first path and register it into Athena using the second. The treldev.awsutil
package in the Trel SDK has functions that does this for you.
Multiple repository configurations and fronzen branch#
Trel supports the ability for users to retrieve scripts from multiple branches, facilitating the download of content from various repositories. Additionally, Trel provides support for frozen branches.
execution.source_code.additional:
utils:
class: github
branch: main
path: [email protected]:urllib3/urllib3.git
frozen_branch: True
helper:
class: github
branch: main
path: [email protected]:urllib3/urllib3.git
In the given example, by setting the frozen_branch
parameter to True, the system ignores branch overrides. If no frozen_branch
parameter is provided, it defaults to False. The “utils” and “helper” represent repository home directories where the checked-out code will be stored.
Source Code Downloader Plugins#
A plugin from this plugin system can be used to download the sources that are required. The relevant parameters are:
execution.source_code.additional For more information, refer to s3.
The primary repository is specified using execution.source_code.main
, and will be placed in a folder called _code
. Additional code can be specified in execution.source_code.additional
as a dictionary and will be placed in a folder named after the corresponding key.
All plugins take the following parameters:
Configuration |
Description |
---|---|
|
The class of the downloader. |
|
The path where the code is located |
|
A string that should be interpreted as the branch |
github
#
This class can download a repository from GitHub to a specific folder. It typically relies on credential called github.private_key
(see Known Credentials), which should consist of the private key that can authenticate against Github.com. You can use a credential stored in a different credential_name by specifying it in the configuration as credential_name
.
Example#
execution.source_code.main:
class: github
path: [email protected]:urllib3/urllib3.git
branch: main
credential_name: github.private_key_alt1
This will download to a folder named _code
in the working directory for the task using the private key in credential name github.private_key_alt1
.
s3
#
This class can download an S3 folder to a specific local folder. It uses a credential called aws.access_key
, (see Known Credentials).
Example#
execution.source_code.additional:
util:
class: s3
path: s3://bucket/prefix
branch: main
No need to put trailing slash in the path. It will be added. This config will download s3://bucket/prefix/main/
to a folder named util
in the working directory for the task. If no branch is specified, it downloads directly from s3://bucket/prefix/
.
Resource Plugins#
For an introduction to the concept, see Resource.
emr_spark
#
This resource corresponds to an EMR cluster configured to run spark or pyspark jobs.
It reads the following parameters:
It also reads the following sub-parameters from profile_default_parameter
.
sub-parameter |
Description |
---|---|
|
A dictionary mapping from AWS zone to subnet. Required for spinning up the resource using the correct subnet. E.g.: default_params:
profile_default_parameters:
aws.availablity_zone_subnets:
us-east-2a: subnet-0996a561
us-east-2b: subnet-9b274fe1
|
Finally, this resource can be configured with the following:
sub-parameter |
Description |
---|---|
|
The AWS region to spin up the cluster in. E.g.: |
Given below is a sample configuration of the resource
resource.name: emr_spark
resource.memory_level: normal
resource.num_cores: 1
resource.args:
region: us-east-2
spot: true
When the resource is allocated, it will request the following virtual resources:
virtual resource |
Examples |
---|---|
|
|
|
|
If these virtual resource are configured and given quantity by the Trel admin, then they will be allocated.