Plugins#

Execution Profile Plugins#

python#

Enables you to execute python scripts as a job.

Script Contract#

As explained in Execution Profiles, a file args.yml will be created with all the details and provided to the user code in the same folder. It has the following keys:

User script command line parameters#

Name

Description

inputs

A list of dictionaries, each corresponding to an input dataset. See below for contents.

outputs

A list of dictionaries each corresponding to an output dataset.

attempt_id

The Attempt ID of this job.

task_id

The Task ID of this job.

parameters

The full set of parameters for this Task.

attempt_parameters

Attempt specific parameters.

credentials

Path to the credentials file.

schedule_instance_prefix

See Scheduling Classes for this and the ones below.

schedule_instance_ts

schedule_instance_ts_precision

schedule_label

schedule_id

temp_paths

A mapping from repositories to temporary folders. If you want to keep your code free of making assumptions about repository names, select the temp paths you need using execution.temp_repositories in the registration file. This allows you to use the provided paths directly without checking the repository.

The dataset dictionaries will have the following keys. See Datasets.

  • id_

  • uri

  • name

  • instance_prefix

  • instance_ts_str

  • label

  • repository

  • schema_version

Additionally a command line will be provided. It has the following parameters.

User script command line parameters#

Name

Description

--<dataset_name>

The paths of all the datasets with this name is provided with no guarantee of order. E.g. --server_log s3://bucket/path/date1/ s3://bucket/path/date2/

--_args

The filename of the args file. Always args.yml.

--_creds

The filename of the credentials file. Always credentials.yml.encrypted when run by profile.

--attempt_id

The attempt ID of this job

--schedule_instance_prefix

See Scheduling Classes for this and the ones below.

--schedule_instance_ts

--schedule_instance_ts_precision

--schedule_label

When executed with these parameters, the script is supposed to return 0 if and only if it was completely successful. Otherwise return a value greater than 0.

emr_pyspark#

Enables you to execute a pyspark script commands as a Trel job. Uses a resource of class EMRMapReduceResource.

This profile is very easy to use. Simply set the parameters defined below and Trel will run your pyspark script on a cluster.

Script Contract#

Same as that of python.

Profile Default Parameters#

These are to be entered under profile_default_parameters in .trel.config in the trel server.

  1. aws.s3_temp_paths_by_region: Maps regions to temp S3 folders. This is required because the S3 folder may store bootstrap files, and they have to come from the same region as the cluster you are starting for the job.

    aws.s3_temp_paths_by_region:
      us-east-2: s3://<bucket in us-east-2>/<prefix>/{attempt_id}
    
  2. aws.availablity_zone_subnets: This specifies the subnet for each zone. Used by EMRMapReduceResource.

    aws.availablity_zone_subnets:
      us-east-2a: subnet-???????? # (subnet for us-east-2a)
      us-east-2b: subnet-????????
    
  3. emr_spark.ami_version

    emr_spark.ami_version: emr-5.28.0
    
  4. emr_spark.starting_wait_max_seconds: How long with the profile wait for the cluster to leave “STARTING” phase.

    emr_spark.starting_wait_max_seconds: 1200
    
  5. emr_spark.executor_memory: How much memory will be allocated to each spark executor for various memory specifications.

    emr_spark.executor_memory:
      low: 12
      normal: 24
      normal_high: 36
      high: 48
    
  1. emr.jobflow_role: Default EMR_EC2_DefaultRole.

  2. emr.service_role: Default EMR_DefaultRole.

  3. emr_spark.bootstrap_actions: A list of scripts that will be used to initialize the EMR cluster. Example:

    emr_spark.bootstrap_actions:
    - ['default','s3://...../bootstrap.sh', []]
    - ['datascience','s3://...../ds_bootstrap.sh', []]
    

    The entries consist of name, path to the script, and script parameters.

Profile Parameters#

  1. emr_spark.bootstrap_actions: These will be applied after the bootstrap commands from the previous section. Note that if you have entries here, it will complicate sharing clusters with other jobs.

Credentials required: emr.access_key, aws.instance_key_name

bigquery#

This is to be used when you have a BigQuery job. As of now, it is identical to python profile, but that can change.

Credentials required: gcp.service_json

Sample code:

input = BigQueryURI(input_bq)
BigQueryURI(output_bq).save_sql_results(f'''

SELECT distinct user_id from {input.path}''')

Primarily, writing a BigQuery job relies on the SDK, not the profile. The relevant classes in the SDK are given below:

The following SDK classes help with writing jobs for this profile:

athena#

This is to be used when you have an Athena job. As of now, it is identical to python profile, but that can change.

One important oddity to note is that because Athena does not store data, all athena queries require two output paths, one for s3 and one for athena. This requires a special behavior in the output specification.

execution.output_generator:
  class: default
  outputs:
  - name: output_s3
    dataset_class: logs
    repository: dl_s3
  - name: output_athena
    repository: dw_athena
    link_to_previous: True

This way, two outputs are generated, both are cataloged, and the athena dataset is registered to be dependent on the s3 dataset.

Credentials required: aws.access_key

Sample code:

input = AthenaURI(input_athena)
AthenaURI(output_athena).save_sql_results(output_s3, f'''

SELECT distinct user_id from {input.path}''')

Primarily, writing an Athena job relies on the SDK, not the profile. The relevant classes in the SDK are given below:

The following SDK classes help with writing jobs for this profile:

databricks_notebookjob#

Enables you to execute a databricks notebook job as a Trel job. Sample registration file:

name: request_log_distinct_dbn

execution.profile: databricks_notebookjob

execution.source_code.main:
  class: databricks_notebookjob
  path: https://<something>.cloud.databricks.com#<job_id>
  branch: master

repository_map:
  - request_log: s3-us-east2

scheduler:
  class: single_instance
  depends_on: [ request_log ]
  labels: [ demo ]
  instance_ts_precisions: [ D ]
  cron_constraint: "0 0 * * *"
  schedule_ts_min: "2021-01-01 00:00:00"

execution.output_generator:
  class: default
  outputs:
  - dataset_class: request_log_distinct

Credentials required: databricks.access_token

Script Contract#

Task information dict similar to what is found in args.yml from python is provided to the notebook with key args as a yaml string. Parse it and look for the input and output paths. Transform the inputs to the outputs. Sample notebook:

import yaml
args = yaml.safe_load(dbutils.widgets.get("args"))

input_path = args['inputs']['<dataset_class>'][0]['uri']
output_path = args['outputs']['<dataset_class>'][0]['uri']

sc.textFile(input_path).distinct().saveAsTextFile(output_path)

Profile parameters#

These can be put in the registration file under profile_parameters.

  1. databricks_notebookjob.branch_job_map: Maps branches to jobs. This is required if using the branch feature in Trel. You are only allowed to specify a branch that exists in this parameter. This is because each notebook has a fixed code.

    profile_parameters:
      databricks_notebookjob.branch_job_map:
        master: 'https://<something>.databricks.com#<job_id>'
        qa: 'https://<something>.databricks.com#<job_id2>'
    

ec2_single#

Enables you to execute a python script in an EC2 instance. Uses a resource of class EC2Resource.

Script Contract#

Same as that of python.

Job parameters:#

resource.static_resource_id

If provided, this will be used to run the attempt instead of a new instance being spun up.

Profile Default Parameters#

These are to be entered under profile_default_parameters in .trel.config in the trel server.

ec2.python

The path to the python command to run

Default: /usr/bin/python3

ec2.allocate_if_static_resource_is

If the static resource is found to be in any of these states, a new resource will be allocated. Valid states

Default: [‘_MISSING’,’terminated’]

ec2.starting_wait_max_seconds

Terminate ec2 request if the instance has not been allocated for this long.

Default: 1200

ec2.bootstrap_script

Run these shell commands provided as a list of strings when initializing an instance

Default: []

Profile Parameters#

These are to be set in profile_parameters in job registration.

ec2.job_setup_script

Run these shell commands provided as a list of strings just before starting the job.

Default: []

Credentials required: emr.access_key, aws.instance_key

Execution APIs#

Azure Synapse#

To use Trel for running SQL queries in a Synapse workspace, you need to configure credentials. Provide the username, password for the SQL pool, and the driver information.

Azure URI Format#

The Azure URI format in trel is used to specify the location of data objects in Azure services. The format is as follows:

azsa://<server_url>/<db_name>/<schema>/<table_name>

Where:

  • <server_url>: URL of the Synapse server.

  • <db_name>: Name of the database.

  • <schema>: Name of the schema.

  • <table_name>: Name of the table.

Running SQL Queries:

This sample code demonstrates how to run SQL queries in a Synapse workspace using Trel.

# Define the Azure URI for input data
input_uri = 'azsa://<server_url>/<db_name>/<schema>/<table_name>'

# Initialize a SynapseURI object with the input Azure URI
input = azureutils.SynapseURI(input_uri)

# Save the results of the SQL query to a Blob Storage location
input.save_sql_results(
    azureutils.BlobURI('abfss://container@storage_ac_name.dfs.core.windows.net/path/to/table/'),
    f"SELECT distinct user_id from {input.path}")

Creating External Tables from Blob Storage:

This sample code demonstrates how to create external tables in a Synapse workspace from Blob Storage using Trel.

# Define the Azure URI for input data
input_uri = 'azsa://<server_url>/<db_name>/<schema>/<table_name>'

# Initialize a SynapseURI object with the input Azure URI
input = azureutils.SynapseURI(input_uri)

# Create an external table from a Blob Storage location
input.create_from_blob_uri(
    azureutils.BlobURI('abfss://container@storage_ac_name.dfs.core.windows.net/file/path'),
    schema={"columns":[{"name":"column_name","type":"datatype"}], "format":"parquet"})

Validation Example#

This example demonstrates how to validate data and schema integrity using Trel.

schema:
  columns:
   - mode: required
     name: name
     type: nvarchar
format: parquet
sodacore_checks: |
  - row_count:
     fail: when < 1
     warn: when < 2

Validation Code#

This code snippet validates the data and schema integrity.

res = input.validate_and_return(
    args, args['outputs']['request_log'][0],
    sodacore_check=True, sodacore_schema_check=True)

Ensure you properly configure credentials, understand the Azure URI format, and use the provided sample code for running SQL queries and creating external tables. Additionally, utilize the validation code example for ensuring data quality and schema integrity.

The following SDK classes help with writing jobs for this profile:

  • treldev.azureutils.Synapse

  • treldev.azureutils.SynapseURI

  • treldev.azureutils.BlobURI

Repository Plugins#

This section provides plugin specific information. See Repositories for details about the concept.

athena: AWS Athena (Hive metastore)#

Supports Athena URIs. Config format:

{"catalog":"...", "database":"...","prefix":"...",
 "region":"...", "temp_database":"...",
 "staging_dir":"s3://...", "validation_schema":"tmp"}

Here prefix is mandatory, as Athena cannot have tables that are just numbers.

The repository will generate default URIs of the format athena://<region>/<catalog>/<database>/<prefix><dataset_id>

Temp URIs of the format athena://<region>/<catalog>/<temp_database>/<attempt_id>

Because Athena UI has no URL to show a specific table, the dataset access URLs can only take you to the athena homepage

Credentials required: aws.access_key

trino: In-house Trino cluster#

Supports Trino URIs from an on-premise cluster. Config format:

{"server":"","port":"", "catalog":"...", "schema":"...","prefix":"...",
 "temp_prefix":"...", "validation_schema":"tmp", "credential_key":""}

Here prefix is mandatory, as Trino cannot have tables that are just numbers.

The repository will generate default URIs of the format trino://<server>:<port>/<catalog>/<schema>/<prefix><dataset_id>

Temp URIs of the format trino://<region>/<catalog>/<temp_schema | schema>/zattempt_<attempt_id>_

Credentials required: trino or as per credential_key parameter.

s3: AWS S3#

Supports Google Storage URIs. Config format:

{'bucket':'...', 'prefix':'...',
 'region':'...', 'temp_prefix':'...'}

Paths must end in a / if the path is a folder.

The repository will generate default URIs of the format s3://<bucket>/<prefix><dataset_id>

Temp URIs of the format s3://<bucket>/<temp_prefix><attempt_id>

Credentials required: aws.access_key

bq: BigQuery#

Supports Google BigQuery URIs. Config format:

{'project':'...', 'dataset':'...','prefix':'...',
 'region':'...', 'temp_dataset':'...'}

The repository will generate default URIs of the format bq://<project>/<dataset>/<prefix><dataset_id>

Temp URIs of the format bq://<project>/<temp_dataset>/<attempt_id>

Credentials required: gcp.service_json

gs: Google Storage#

Supports Google Storage URIs. Config format:

{'bucket':'...', 'prefix':'...',
 'region':'...', 'temp_prefix':'...'}

Paths must end in a / if the path is a folder.

The repository will generate default URIs of the format gs://<bucket>/<prefix><dataset_id>

Temp URIs of the format gs://<bucket>/<temp_prefix><attempt_id>

Credentials required: gcp.service_json

Scheduling Class Plugins#

BaseSchedule#

All scheduling classes inherit from BaseSchedule and most of them let the parent class initialize certain common configuration. They are given below.

Configuration

Comment

depends_on

A main list of dataset classes to depend on.

labels

The list of labels that are acceptable in input combinations. All the datasets in the input combination must share the same label. This behaviour can be modified using Prefix label selection.

canonical_label

If no label is present in the schedule_id, what is the schedule_label? By default, it is the first label in labels.

prefix_label_selection

See Prefix label selection.

instance_ts_precisions

A list of valid instance_ts_precisions. All inputs datasets must share the precision.

input_cron_constraint

All input datasets must satisfy this cron constraint.

output_cron_constraint

All output datasets must satisfy this cron constraint. Some scheduling classes require that this is a subset of input_cron_constraint.

cron_constraint

Use this if both the input and output cron constraints are the same.

schedule_ts_min

Ignore schedule_instance_ts older than this timestamp.

schedule_ts_max_seconds_old

Ignore schedule_instance_ts older than this from now.

other_datasets

A specification of additional datasets required for the job, which are not strictly constrained. See Other datasets.

Implementation note: This class has lots of default code that is useful to a new child class that needs cron constraints. The functions that are left to be implemented are clearly marked in code. For new scheduling classes that can support having no cron constraints like single_instance, more functions need to be re-implemented.

single_instance#

The main property of this scheduling class is that all input combinations must consist of datasets that share the same instance. That means the same (instance_prefix, instance_ts, instance_ts_precision). It adds the following configurations on top of those supported by BaseSchedule.

Configuration

Comment

instance_prefix

A specific value for instance prefix to take. Default behavior is to not constrain the instance prefix. All schedule inputs with the same instance prefix are valid.

instance_prefix_regex

A regex for the instance prefix. All schedule inputs must still share the same instance prefix to be valid.

For example, consider the following scheduler configuration:

scheduler:
  class: single_instance
  depends_on: [ server_log, customer_list ]
  instance_ts_precisions: [ D ]
  labels: [ prod ]
  cron_constraint: "0 0 * * *"
  schedule_ts_min: "2020-01-01 00:00:00"

The following is a valid schedule input for this scheduler:

schedule_id: 20210101
inputs:
  -  ('server_log',    None, '2021-01-01 00:00:00', 'D', 'prod', 's3-us-east2')
  -  ('customer_list', None, '2021-01-01 00:00:00', 'D', 'prod', 's3-us-east2')
schedule_instance_prefix: None
schedule_instance_ts: '2021-01-01 00:00:00'
schedule_instance_ts_precision: 'D'
schedule_label: 'prod'

Use case#

This is used extensively to convert datasets from one form to another. E.g., convert a day of server_log_raw to the same day’s server_log.

Also used for joins. E.g., server_log and customer_list is used to produce server_log_extended.

This is also used to process datasets where instance_ts is None. However, such an architecture is not recommened.

multi_period#

In addition to single instance behavior, this allows additional instances as inputs. The instances added can have different instance_ts only. Unlike single_instance, this scheduling class is no longer capable of handling datasets that have no instance_ts.

The ability of this schedule to handle additional instances come from a new configuration given below:

Configuration

Comment

periods

There are three options.

  1. A whole number indicating number of periods. Negative means towards the past.

    period: -3  # current and previous 2 periods
    
  2. OR A dictionary mapping from dataset class to the periods.

    period:
       server_log: 3  # current and next 2 periods
       customer_list: 1  # current period only
    
  3. OR A dictionary mapping from dataset class to a dictionary with keys periods and offset. Offset is a whole number. It denotes how many periods to shift the selection by. Negative means towards the past.

    period:
       server_log:  # current, previous, and next periods
          periods: 3
          offset: -1
       customer_list: 1  # current period only
    

The periods that this key periods is referring to is defined by the input_cron_constraint. E.g., if input_cron_constraint is 0 0 * * *, then the period is a day. If 2/15/2021 is the current period, then the next period is 2/16/2021.

For example, consider the following scheduler configuration:

scheduler:
  class: multi_period
  depends_on: [ server_log, customer_list ]
  period:
    server_log:  # current, previous, and next periods
      periods: 3
      offset: -1
    customer_list: 1
  instance_ts_precisions: [ D ]
  labels: [ prod ]
  cron_constraint: "0 0 * * *"
  schedule_ts_min: "2020-01-01 00:00:00"

The following is a valid schedule input for this scheduler:

schedule_id: 20210101
inputs:
  -  ('server_log',    None, '2020-12-31 00:00:00', 'D', 'prod', 's3-us-east2')
  -  ('server_log',    None, '2021-01-01 00:00:00', 'D', 'prod', 's3-us-east2')
  -  ('server_log',    None, '2021-01-02 00:00:00', 'D', 'prod', 's3-us-east2')
  -  ('customer_list', None, '2021-01-01 00:00:00', 'D', 'prod', 's3-us-east2')
schedule_instance_prefix: None
schedule_instance_ts: '2021-01-01 00:00:00'
schedule_instance_ts_precision: 'D'
schedule_label: 'prod'

In this case, even though the schedule_id is for 1/1/2021, this schedule input cannot execute until the 1/2/2021 dataset for server_log is ready.

Here, output_cron_constraint and input_cron_constraint can be different. E.g., if the former is changed to 0 0 * * 0, then a schedule is created only when schedule_ts is a sunday. However, inputs will still be Saturday, Sunday, and Monday.

If the output_cron_constraint is not a subset of the input_cron_constraint, no exception will be thrown, but the behavior of the class is not specified.

Use case#

This is used where a sequence of days or hours need to be processed to generate the output. E.g., reading 30 days of server_log to generate kpi_30day.

periodic#

In addition to single instance behavior, this allows the same dataset class to be in both input and output. This is typically required for datasets that should be updated using another.

E.g., assume \(A\) is a dataset that needs to be updated weekly using daily datasets of \(B\) and \(C\). A dataset of \(D\) is also required with the schedule instance ts to generate the new \(A\). Then the output and input can be shown as,

\[A_7 \leftarrow A_0 + \sum_{i=1}^7 B_i + \sum_{i=1}^7 C_i + D_7\]

The \(B\) and \(C\) can be indicated in configuration under depends_on field. \(A\) and \(D\) can be specified using two new configurations.

Configuration

Comment

self_depends_on

A list of dataset classes that will be updated. Viz. \(A\) from above example.

depends_on_for_schedule_ts_only

A list of dataset classes for which a single dataset is required with the schedule_instance_ts as the instance_ts. Viz. \(D\) from above example.

input_ts_precisions

Same as before, but restricted to one value.

depends_on_input_ts_precisions

Provide a separate precision for depends_on datasets. This way you can have a job that combine hourly datasets into a daily one. Also restricted to exactly one value

For example, consider the following scheduler configuration:

scheduler:
  class: periodic
  depends_on: [ server_log ] # B,C from example
  self_depends_on: [ kpis ] # A from example
  depends_on_for_schedule_ts_only: [ customer_list ] # D from example
  instance_ts_precisions: [ D ]
  labels: [ prod ]
  input_cron_constraint:  "0 0 * * *"
  output_cron_constraint: "0 0 * * 0"
  schedule_ts_min: "2020-01-01 00:00:00"

The following is the first valid schedule input for this scheduler in 2021:

schedule_id: 20210103
inputs:
  -  ('kpis',          None, '2021-12-27 00:00:00', 'D', 'prod', 's3-us-east2')
  -  ('server_log',    None, '2020-12-28 00:00:00', 'D', 'prod', 's3-us-east2')
  -  ('server_log',    None, '2020-12-29 00:00:00', 'D', 'prod', 's3-us-east2')
  -  ('server_log',    None, '2020-12-30 00:00:00', 'D', 'prod', 's3-us-east2')
  -  ('server_log',    None, '2020-12-31 00:00:00', 'D', 'prod', 's3-us-east2')
  -  ('server_log',    None, '2021-01-01 00:00:00', 'D', 'prod', 's3-us-east2')
  -  ('server_log',    None, '2021-01-02 00:00:00', 'D', 'prod', 's3-us-east2')
  -  ('server_log',    None, '2021-01-03 00:00:00', 'D', 'prod', 's3-us-east2')
  -  ('customer_list', None, '2021-01-03 00:00:00', 'D', 'prod', 's3-us-east2')
schedule_instance_prefix: None
schedule_instance_ts: '2021-01-03 00:00:00'
schedule_instance_ts_precision: 'D'
schedule_label: 'prod'

This class is flexible in terms of how it can be configured. The only constraint is that at least one of depends_on or depends_on_for_schedule_ts_only must not be empty. By leaving self_depends_on as empty, this allows the following input selection as well.

\[A_7 \leftarrow \sum_{i=1}^7 B_i\]

The following scheduler configuration can combine hourly datasets of server_log into daily:

scheduler:
  class: periodic
  depends_on: [ server_log ] # B,C from example
  instance_ts_precisions: [ D ]
  depends_on_instance_ts_precisions: [ H ]
  labels: [ prod ]
  input_cron_constraint:  "0 * * * *"
  output_cron_constraint: "0 0 * * *"
  schedule_ts_min: "2020-01-01 00:00:00"

The output_cron_constraint and input_cron_constraint behave similar to multi_period.

Use case#

  1. When self_depends_on is not empty, this schedule can be used to update a dataset with new data. The above example is this case.

  2. When self_depends_on is empty, this schedule can be used to combine and process data from multiple periods where the number of periods are better expressed as a cron rather than number of days. E.g., process all the data for each month: (input_cron_constraint: 0 0 * * *, output_cron_constraint: 0 0 L * *)

Resource Plugins#

ec2: EC2 Instance#

Use this resource to get access to an ec2 instance. ec2_single execution profile plugin uses this resource. This class can generate the parameters for run_instances in boto3 library. It leaves out MinCount, and MaxCount

Known parameters:#

creation_extra_args

This dictionary will be used to update profile_default_parameters/ec2.creation_extra_args. Update is done using the .update of python dictionary. So, only the top level will be updated. Inner dictionaries and lists are fully replaced.

The profile will pass the updated dictionary to run_instances. This profile will add the information from the following parameters into this dictionary before passing to the function.

The following are in-built defaults that have the lowest precedence:

/BlockDeviceMappings/0/DeviceName                                   /dev/xvda
/BlockDeviceMappings/0/Ebs/DeleteOnTermination                      True
/BlockDeviceMappings/0/Ebs/VolumeType                               gp3

The following are enforced values. These have the highest precedence:

IF SPOT:
/InstanceMarketOptions/SpotOptions/SpotInstanceType                one-time
/InstanceMarketOptions/SpotOptions/InstanceInterruptionBehavior    terminate

region

Uses profile_default_parameters/ec2.region as backup.

instance_type

Uses profile_default_parameters/ec2.instance_type as backup.

E.g.: r5.25xlarge.

ami

Uses profile_default_parameters/ec2.ami as backup.

ebs_storage

Specified in GB. Uses profile_default_parameters/ec2.ebs_storage as backup.

subnet_id

Uses profile_default_parameters/ec2.subnet_id as backup.

Default: Configured default subnet for the default VPC.

spot

Uses profile_default_parameters/ec2.spot as backup.

Default: False

tags

Here is the syntax:

tags:
  resource_type:
    key1: value1
    key2: value2

These tags update profile_default_parameters/ec2.tags.

bootstrap

These commands are run on the instance after profile_default_parameters/ec2.bootstrap

Dynamic Virtual resources#

{region}_{instance_type}: 1
{region}_ec2_instance:    1
{region}_ec2_job:         1

Retry Strategy Plugins#

default#

First, let us establish the behavior when the attempts only return ERROR or CANCELLED as the return code (see Return Codes).

  1. execution.retry_strategy.attempts is the maximum number of attempts

  2. execution.retry_strategy.delay is the delay in seconds between two attempts.

Now let us examine how the behavior changes for various return codes.

  1. ALL_ATTEMPTS_FAILING_ERROR and TREL_ERROR

    No more retries will happen. Ignores execution.retry_strategy.attempts.

  2. TRANSIENT_USER_CODE_EXCEPTION

    These failures do not count towards attempt counts. So, the attempt will definitely retry after usual delay.

  3. RESOURCE_DID_NOT_ALLOCATE, RESOURCE_LOST_DURING_EXECUTION, FAILED_DUE_TO_LOST_SLAVE, READ_LOCK_DENIED, and WRITE_LOCK_DENIED

    These are resource failures also do not count towards attempt counts. So, the attempt will definitely retry, but with delay from execution.retry_strategy.resource_error_delay.

Output Generator Plugins#

This plugin system decides what are the outputs that are generated for each task. Trel comes with the default plugin, which should be enough for almost all cases.

default#

The default plugin is the output generator that comes with Trel. To configure this, we provide a key outputs, which is a list of entries, one for each output dataset. Each entry is a dict which can consist of,

  1. dataset_class: This mandatory

  2. instance_prefix: This is optional. Default value _from_schedule_info.

  3. instance_ts: This is optional. Default value _from_schedule_info.

  4. instance_ts_precision: This is optional. Default value _from_schedule_info.

  5. label: This is optional. Default value _from_schedule_info.

  6. repository: This is optional. Default rule is _inherit (see Output Generator Rules) if all the inputs have the same repository. Otherwise, mandatory.

  7. name : This is an optional parameter. Default is same as dataset_class. See Naming Datasets.

  8. uri_formatter: This is an optional string that can be used to construct the uri for the output. See Default Output URI.

  9. schema_version: This is optional. The default value will be the value specified for the dataset_class at the system level. You can also specify a time-series (see example below).

  10. copy_of_input_with_dataset_class: This is optional. This indicates that this job will copy the data of the input with the specified dataset_class into this output.

  11. copy_of_input_with_dataset_class: This is optional. This indicates that this job will link the output to the input.

  12. link_to_previous: This is optional. This indicates that this output is to be linked to the previous output specification.

Given this configuration, the inputs and the scheduling info of those inputs, this output generator will come up with all the information required to register the outputs.

Output Generator Rules#

All fields 2 through 6 can be explicitly specified. But in general, we need some of them to change based on the input. So, certain rules are specified that can be applied on the inputs.

_max#

For this field, pick the maximum from the inputs. E.g.,

instance_ts: _max

You can also specify it as a list where the second argument is the dataset_class you want to apply the rule on:

instance_ts: [_max, server_log]

This picks the maximum instance_ts of the server_log inputs and ignores other inputs.

_min#

Pick the minimum from the inputs

_inherit#

For this field, assert that all the values are the same for the inputs and pick that value. This is the default for the repository field.

_from_schedule_info#

This rule will get the value from the schedule info of the task which is determined by the scheduler. The scheduler’s choice is called schedule_info.

This rule is available only for fields 2 through 4.

Examples#

execution.output_generator:
  class: default
  outputs:
    - dataset_class: "user_history"
      name: "output_user_history"
      schema_version:
      - [_until, "20210105", "1"]
      - [null, null, "2"]

Here, we have one entry in outputs, indicating output dataset. For this dataset, in addition to mandatory dataset_class, the name is specified, which decides how the path to this output will be provided to the user code. Everything else, except schema is defaults, so they will be based on schedule info etc. The schema version is provided as a time-series, switching the schema from one to another based in the output instance_ts.

Copying and Linking datasets#

If you used copy_of_input_with_dataset_class, copy_of_input_with_dataset_class or link_to_previous, special situations apply.

Trel allows you to treat copying of datasets as a special case. This is because, copying does not change the data. Trel is then able to ensure the output inherits the version of the input. You can use this functionality by specifying copy_of_input_with_dataset_class and specifying the dataset class of the input you are copying. You are required to specify the repository in this case. No other dimension is allows as everything inherits from the input. Example:

execution.output_generator:
  class: default
  outputs:
    - copy_of_input_with_dataset_class: "user_history"
      repository: bq_us

Trel also allows you to create datasets that depend on other datasets for the data. E.g., a bigquery table is an external table with paths from a google storage path that is its own dataset. In such cases, you use link_to_input_with_dataset_class. Here any and all dimensions are allowed. However, version inheritance only works if no dimensions other than repository is provided.

Sometimes, you need to make two datasets. One with data and one a link of that data. E.g., You are running a query in Athena. The output goes to S3 necessarily. But you also want that to be registered as a table in Athena. How to do both together?

execution.output_generator:
  class: default
  outputs:
    - dataset_class: quarterly_report
      repository: dl-s3
    - link_to_previous: True
      repository: dw-athena
      name: quarterly_report_dw

Configured like this, the job gets two paths, one for S3 and one for athena. It should run the query and write to S3 on the first path and register it into Athena using the second. The treldev.awsutil package in the Trel SDK has functions that does this for you.

Multiple repository configurations and fronzen branch#

Trel supports the ability for users to retrieve scripts from multiple branches, facilitating the download of content from various repositories. Additionally, Trel provides support for frozen branches.

execution.source_code.additional:
   utils:
      class: github
      branch: main
      path: [email protected]:urllib3/urllib3.git
      frozen_branch: True
   helper:
      class: github
      branch: main
      path: [email protected]:urllib3/urllib3.git

In the given example, by setting the frozen_branch parameter to True, the system ignores branch overrides. If no frozen_branch parameter is provided, it defaults to False. The “utils” and “helper” represent repository home directories where the checked-out code will be stored.

Source Code Downloader Plugins#

A plugin from this plugin system can be used to download the sources that are required. The relevant parameters are:

  1. execution.source_code.main

  2. execution.main_executable

  3. execution.source_code.additional For more information, refer to s3.

The primary repository is specified using execution.source_code.main, and will be placed in a folder called _code. Additional code can be specified in execution.source_code.additional as a dictionary and will be placed in a folder named after the corresponding key.

All plugins take the following parameters:

Configuration

Description

class

The class of the downloader. github, s3 etc.

path

The path where the code is located

branch

A string that should be interpreted as the branch

github#

This class can download a repository from GitHub to a specific folder. It typically relies on credential called github.private_key (see Known Credentials), which should consist of the private key that can authenticate against Github.com. You can use a credential stored in a different credential_name by specifying it in the configuration as credential_name.

Example#

execution.source_code.main:
  class: github
  path: [email protected]:urllib3/urllib3.git
  branch: main
  credential_name: github.private_key_alt1

This will download to a folder named _code in the working directory for the task using the private key in credential name github.private_key_alt1.

s3#

This class can download an S3 folder to a specific local folder. It uses a credential called aws.access_key, (see Known Credentials).

Example#

execution.source_code.additional:
  util:
    class: s3
    path: s3://bucket/prefix
    branch: main

No need to put trailing slash in the path. It will be added. This config will download s3://bucket/prefix/main/ to a folder named util in the working directory for the task. If no branch is specified, it downloads directly from s3://bucket/prefix/.

Resource Plugins#

For an introduction to the concept, see Resource.

emr_spark#

This resource corresponds to an EMR cluster configured to run spark or pyspark jobs.

It reads the following parameters:

  1. resource.memory_level

  2. resource.num_cores

It also reads the following sub-parameters from profile_default_parameter.

sub-parameter

Description

aws.availablity_zone_subnets

A dictionary mapping from AWS zone to subnet. Required for spinning up the resource using the correct subnet. E.g.:

default_params:
  profile_default_parameters:
    aws.availablity_zone_subnets:
      us-east-2a: subnet-0996a561
      us-east-2b: subnet-9b274fe1

Finally, this resource can be configured with the following:

sub-parameter

Description

region

The AWS region to spin up the cluster in. E.g.: us-east-2.

Given below is a sample configuration of the resource

resource.name: emr_spark
resource.memory_level: normal
resource.num_cores: 1
resource.args:
  region: us-east-2
  spot: true

When the resource is allocated, it will request the following virtual resources:

virtual resource

Examples

<zone>_<instance_type>

us-east-2a_r3.xlarge

<region>_emr_job

us-east-2_emr_job

If these virtual resource are configured and given quantity by the Trel admin, then they will be allocated.