Concepts#
Datasets#
A Dataset in Trel is a URI stored in the dataset table identified by the value of the id column. Associated with a Dataset is the following information:
dataset_class
: This is a string that represents the type of the dataset. E.g.,server_log
. It should indicate what information this dataset is storing.
Important
Two datasets that are different enough that a job may be able to accept one as input but not the other should never share the same dataset_class.
Note, however, that the reverse is less of a concern.
instance_prefix
: Instance fields indicate the origin of a dataset. Typically, the origin is time-based. Use this field to indicate the non time-based origin of a dataset.E.g., If the same type of dataset is being received from different partners daily, you can store the partner id in this field.
instance_ts
: When a dataset has a time-based origin, use this field to store the origin time. Note, this is ideally not the time the dataset was received or created but the time that this data reflects.instance_ts_precision
: Indicates the precision of the time that is specified.E.g. (2021-05-01 00:00:00, H) indicates that the data’s origin is from first hour of 5/1. but (2021-05-01 00:00:00, D) indicates the data is for the entire 5/1.
label
: Trel provides unlimited environments called labels that allow you to experiment without interfering with each other. Every dataset has to get a specific label. Based on the frequent use ofmaster
for the release branch, the recommended label for production ismaster
.repository
: A Repository is an entity in Trel that represents a storage location that is meaningfully distinct from another. See Repositories for more details.
Fields 2 through 4 together can be referred to as the instance. Fields 3 and 4 together can be referred to as instance_ts_str. E.g.:
(datetime(2021,5,1), 'H') -> '20210501_00'
(datetime(2021,5,1), 'D') -> '20210501'
Trel can use all six fields for scheduling.
While to be avoided, it is normal and allowed to have multiple datasets to share the same URI. Trel implements file system primitives for datasets such as read / write locks. (Soon: These locks will be shared at the URI level rather than restricted to the dataset level).
Naming convention
To be safe, restrict everything to alphanumeric, _
, .
, and -
. Start with an alphabet. Labels can contain a /
.
Repositories#
These are the essential facts:
Each dataset has a repository associated with it.
Jobs restrict their inputs to specific repositories. E.g., a BigQuery job will force the scheduler to find input datasets from BigQuery repositories. It would not know what to do with an S3 path.
Repositories also provide unique paths for each dataset to facilitate immutability.
Repositories facilitate data governance by understanding possible states and storage classes of the data and its possible actions.
If possible, repositories provide a link to access the data associated with a dataset.
These are registered entities that have code associated with them. These will typically be registered ahead of time within Trel by the administrator. In addition, each repository is related to a location within which datasets tied to that repository can be stored (see Default Output URI).
treladmin
has subcommands for managing repositories. Here is an example of how to add a repository called s3-us-east2
:
treladmin repository_add s3-us-east2 s3 '{"bucket":"mybucket","prefix":"trel",
"region":"us-east-2","temp_prefix":"tmp"}'
This repository will create URIs such as s3://mybucket/trel/...
. See s3: AWS S3.
Typically, we will register a repository for every place where we store our data. However, we do not need to make separate repositories if two places are of the same type and have no data transfer costs. So, we will typically not register another repository where the default URI is in the us-east2 region. But we may register an s3-us-west2 repository.
We don’t need more repositories in us-east2 because there is no restriction that the URI you are registering with s3-us-east2
has to be from this repository. So, let us look at some possible paths for datasets that you want to register under it:
Path |
Comment |
---|---|
|
Default URI, OK |
|
Default Bucket, OK |
|
Bucket in same region, OK |
|
Bucket in a different region. Not recommended, but still acceptable. Consider |
|
Wrong protocol. Not OK |
A repository represents the entire region, not just the bucket.
A repository has code for uploading and downloading data. It is also aware of the various states the data can be in and the various actions we can take on the data depending on its state. E.g., delete, archive, etc.
Default Output URI#
In Trel, each repository is configured with a default URI formatter for that repository. Output Generators (see Output Generator Plugins) will often use this to generate the URI that will store the data for this dataset. This URI formatter will always place the dataset_id
of the dataset in the URI. Thus, the default URI for any dataset is unique.
In Trel, by convention, it is highly recommended to write to the default output URI as much as possible. Why? Outputs written to a URI unique to the dataset allow you to treat the dataset as immutable. Immutable datasets greatly simplify pipeline design and maintenance.
Jobs#
Trel jobs are a combination of registration information and business logic code. In most jobs, the business logic is responsible for and only responsible for transforming the inputs into the outputs.
Based on the registration file, Trel will provide the business logic with detailed information about what needs to happen. Trel will then require the business logic to return success or failure accurately.
Trel guidelines for the business logic are:
Do not modify any dataset other than the output datasets
Avoid having existing datasets as output datasets.
Registration file#
The registration files summarize the following information:
What is the name of the job?
What technology does the business logic use?
Where is the source code located?
What are all the possible valid sets of inputs for this job? (See Scheduling Classes)
In which repositories must those inputs reside?
Which resources are required to run this job?
Which output datasets does this job create?
The format of the registration file is YAML. The keys present in the YAML file are from a set of formally defined parameters. See Parameters for all the choices.
Executing a Job: Tasks and Attempts#
When you execute a job, Trel must create a task. To create a task, you are only required to offer the minimum amount of information the scheduler needs to determine a set of inputs. E.g., the date for the task to process.
Once the task is created, Trel will immediately create an Attempt, which will try to fulfill the task. The attempt will be executed by a worker process. Trel will expect the business logic to return success or failure.
Upon success, the attempt and task are marked successful and all the corresponding outputs are marked ready. Upon failure, the datasets stay invalid, and the retry logic will determine whether it should create a new attempt. If not, the task is marked as a failure as well.
Execution Profiles#
An execution profile is responsible for setting up and executing a certain type of job. E.g., the python
profile is responsible for
Setting up the working directory
Checking out the code
Read-locking the input datasets.
Figuring out and creating the output datasets.
Building the final args.yml (see Business Logic Contract).
Executing the business logic.
Enforcing timeout duration.
Storing the outcome of the attempt and task.
Closing the output datasets with success or failure.
Releasing the read locks on the input datasets
The emr_pyspark
profile is additionally responsible for
Spinning up the cluster
Monitoring cluster and step status.
Shut down the cluster.
See a list of available Execution Profile Plugins.
Execution profiles are given a lot of responsibility. This enables Trel to support new data processing technologies with a profile plugin for it. Due to the very limited assumptions that Trel makes regarding the profile, it can support a wide variety of technologies.
Business Logic Contract#
Trel makes a general contract with the business logic through the execution profile.
Trel will construct (done by profile) an args.yml file and provide it to the business logic. This file will contain various details such as
What are the inputs? What are their names? (see Naming Datasets).
Which are the outputs
What resources are available for execution
All the relevant configuration information
Trel will also optionally construct (done by profile) a command-line based on the contents of args.yml for convenience. E.g.
python3 format_log.py --_args args.yml \
--server_log_raw s3://trel-demo/adhoc/server_log_raw/20190601/ \
--server_log s3://trel-demo/data/server_log/20190601/342/
Given this, the business logic must follow the instructions and generate the outputs from the inputs
The business logic must report at least a boolean (success or failure) outcome.
Naming Datasets#
By default, the name of the dataset is identical to its dataset_class. But, in Trel, there is the option of providing a different name.
Naming your datasets covers some cases where there is ambiguity on how to proceed with the information about inputs and outputs given to the business logic. E.g., say job new_customers
have to compute the difference between two customer_list
datasets. Given the inputs on the command line, as the arg parser does not reliably propagate the order of paths, it is hard to determine which is the minuend and which is the subtrahend. However, we can remove this ambiguity by naming one dataset minuend
and the other subtrahend
. This situation applies during output generation as well.
While these cases may not seem typical, often the same dataset classes appear in both the input and output. While not ambiguous when using the args.yml
file, it can be unclear if the business logic uses its command-line arguments only (see Business Logic Contract). Naming the output differently from the input removes this ambiguity.
Return Codes#
If a user code ran successfully, it must return 0. However, if it failed it must return something other than 0 and it has some options.
- class trel.util.ReturnCode#
- OK = 0#
- ERROR = 1#
- TREL_ERROR = 2#
- TIMEOUT_ERROR = 3#
- CANCELLED = 4#
- ALL_ATTEMPTS_FAILING_ERROR = 32#
- PREEXEC_FAIL = 33#
- RESOURCE_DID_NOT_ALLOCATE = 64#
- RESOURCE_LOST_DURING_EXECUTION = 65#
- TRANSIENT_USER_CODE_EXCEPTION = 66#
- FAILED_DUE_TO_LOST_SLAVE = 67#
- READ_LOCK_DENIED = 68#
- WRITE_LOCK_DENIED = 69#
The profile can directly return some of these return codes. However, if any outcome applies to the user code, it may return the corresponding value. E.g., if it returns 32 (ALL_ATTEMPTS_FAILING_ERROR), then the default
retry strategy will not try another attempt for this job, even if multiple retries are configured in the registration.
Retry Strategies#
If an attempt to complete a task fails, a RetryStrategy
plugin decides whether to make a new attempt and whether to add some delay. The retry strategy default makes these decisions based on parameters starting with execution.retry_strategy
, and the return code from the attempt.
Scheduling Classes#
A scheduling class strictly defines the valid inputs for a job. The class itself implements a high-level pattern. A job can select a class and provide parameters representing a specific set of valid inputs. A parameterized scheduling class is simply called the scheduler of the job. One of the input combinations for the scheduler is called a schedule input or sometimes just a schedule.
More formally, a scheduler along with the repository_map
defines a (potentially infinite) set of input combination tuples where each tuple consists of the following:
List of input datasets’ (
dataset_class
,instance_prefix
,instance_ts
,instance_ts_precision
,label
,repository
).Schedule ID: This string uniquely defines this schedule input given the parameters and the
repository_map
. It is concise because the parameters are used to interpret it. E.g.,20210501
can be different inputs to different jobs.Schedule instance_prefix: the instance_prefix tied to the schedule id.
Schedule instance_ts: the instance_ts tied to the schedule id. Sometimes also called
schedule_ts
.Schedule instance_ts_precision: the instance_ts_precision tied to the schedule id.
Schedule label: the label tied to the schedule id.
Take a look at the single_instance scheduling class for an example.
See Scheduling Class Plugins for more details.
schedule_info#
The elements 3 through 6 form the schedule_info
of this schedule. If possible, use these to specify fields for output instances (see Output Generator Plugins) and other inputs (see Other datasets).
Prefix label selection#
As schedule has an instance_prefix
and instance_label
, by default, all the inputs must share this prefix and label. However, using a special configuration key prefix_label_selection
, it is possible to specify a different label (or prefix) for certain dataset classes than the schedule label (or schedule prefix). Given below is an example:
prefix_label_selection:
- _schedule_prefix: 'thirdparty'
_schedule_label: 'qa'
customer_list: [['partner1','prod'],['partner2','qa']]
server_log: [null,'prod']
Here, we specify that a schedule prefix of thirdparty
and label qa
requires two datasets of dataset class customer_list
with prefixes partner1
and partner2
. Also, server_log
must come with no instance prefix. Furthermore, the partner2
dataset must have qa
label and all other labels must be prod
. This is a potential use-case where you want to QA adding the partner2
dataset to a job that already runs in production with only partner1
.
If prefix_label_selection
is present, it supersedes all other label and instance_prefix specifications.
Other datasets#
Other datasets specification allows you to define inputs to a job with certain dimensions that depend on the datasets available at the time of execution. E.g., you want the latest customer_list
as input, no matter the schedule_ts
for the task. We can specify this as follows
scheduler:
other_datasets:
- [ "customer_list", null, "_max_in_catalog", "D", "prod" ]
other_datasets
is a list of entries where each entry selects a dataset. The entry specifies what are the 5 dimensions of the input: (dataset_class, instance_prefix, instance_ts, instance_ts_precision, label)
. You can specify the exact values you need for the input. But you can also specify some rules that Trel will apply to pick an appropriate dimension value.
Rule |
Comment |
---|---|
|
For: all except |
|
For: |
|
For: |
|
For: |
|
For: |
|
For: |
|
For: |
|
For: |
|
For: |
Sensors#
Sensors are how data from outside of Trel can get registered within Trel so that it can trigger jobs and be processed. These are deployed as live processes that frequently check whether new data is available and act if they are. Sensors perform EtL with a focus on keeping the transform within sensors as light as possible. So, along with the jobs, it becomes EtLT.
Sensors and jobs are the only two user-defined codes possible in Trel. Plugins are limited to what the platform provides.
There are 4 major types of sensors:
Sense real-time data, has to upload the data: Here, the data is made available in real-time, which can be folders or through some API. The key thing about real-time is that there is no need to check whether data is present. We only have to check the clock. If the time is correct, data is ready to be used. A helper function
get_new_datasetspecs_with_cron_and_precision
covers a lot of this functionality. See sensorfinnhub
for how to use it.Also, the data has to be uploaded to be useful for jobs. E.g., Twitter data. A job can only process it from a repository. We do not write Trel jobs to read directly from Twitter.
Sense real-time data, no upload: Here, the data is made available in real-time. But, it does not have to be uploaded. E.g., logs are available in s3 5 mins (no need to check) after the hour. The path is known, and it can be directly registered with an appropriate s3 repository.
Crawl for data, has to upload the data: Here, the sensor will check for data at a pre-defined location. E.g., crawling a website for the drop of a specific page. There is expected drop time. So, you must verify before you can catalog the path.
This data also has to be uploaded to S3 or something else. Trel jobs typically do not process data directly from websites.
Crawl for data, no upload: E.g., crawling a pre-defined S3 location for drops from a partner with frequent delays. Before cataloging you must verify, but you can directly use the data for jobs.
All four combinations are based on whether or not you must implement both the functions, either, or none.
Sensor creation#
treldev.Sensor
class from the SDK simplifies sensor creation.
Here is a simple sensor:
#!/usr/bin/env python3
import argparse, os, sys
import treldev
from os import listdir
from os.path import isfile, join, isdir
class LocalFileSensor(treldev.Sensor):
def __init__(self, config, credentials, *args, **kwargs):
super().__init__(config, credentials, *args, **kwargs)
self.folder_to_monitor = os.path.expanduser(self.config['folder_to_monitor'])
self.insert_instance_ts = self.config['insert_instance_ts']
self.instance_ts_precision = self.config['instance_ts_precision']
self.locking_seconds = self.config.get('locking_seconds',86400)
self.credentials = credentials
self.known_contents = set([])
self.s3_commands = treldev.S3Commands(credentials=self.credentials)
def find_monitored_files(self):
only_files = [f for f in listdir(self.folder_to_monitor)
if isfile(os.path.join(self.folder_to_monitor, f))]
return only_files
def get_new_datasetspecs(self, datasets):
''' load_info is the file name '''
only_files = self.find_monitored_files()
matching_prefixes = set([ ds['instance_prefix'] for ds in datasets ])
for filename in only_files:
if filename in self.known_contents:
continue
if filename in matching_prefixes:
self.known_contents.add(filename)
self.logger.debug(f"file {filename} has a matching dataset")
continue
instance_ts = datetime.datetime.now() if self.insert_instance_ts else None
yield filename, { 'instance_prefix':filename,
'instance_ts':instance_ts,
'instance_ts_precision':self.instance_ts_precision,
'locking_seconds': self.locking_seconds }
def save_data_to_path(self, load_info, uri, **kwargs):
''' load the file to S3 '''
filename = load_info
print(f"Uploading {filename} to {uri}",file=sys.stderr)
self.s3_commands.upload_file(self.folder_to_monitor+'/'+filename, uri+filename)
sys.stderr.flush()
if __name__ == '__main__':
treldev.Sensor.init_and_run(LocalFileSensor)
Clock based sensor#
These are a frequent type of sensor where data availability depends entirely on the clock. In such cases, get_new_datasets
should follow a predefined logic that is implemented in get_new_datasets_with_cron_and_precision
. Alternately, you can inherit from treldev.ClockBasedSensor
and implement only save_data_to_path
.
There are two common use cases for these:
Real-time data accessible through API. E.g., Twitter data, stock tick data from Finnhub.
Periodic pulls from a transactional database. E.g., loading
sales
table from an ODBC data source every hour into the data warehouse.
Check the documentation for treldev.Sensor.get_new_datasetspecs_with_cron_and_precision()
for more details.
See odbc_table_load for an example. Review its code to see how to implement one.
Registering a sensor#
Sensor registration requires writing the config into a file and using the command trel sensor_register
to register it.
The first type of configuration for a sensor relates to its name, code, etc.:
Config |
Description |
---|---|
|
The name of the sensor. |
|
|
|
|
|
Additional files needed for the sensor. |
|
|
|
default |
|
Get credentials from this key. Default varies based on the sensor. |
The next is related to the fields in the datasets that this sensor inserts:
Config |
Description |
---|---|
|
|
|
|
|
|
|
|
|
The schema version of the output |
Some extra fields
Config |
Description |
---|---|
|
|
|
|
|
Don’t load any instance older than this. |
|
Compute min_instance_ts from current timestamp. |
Handle any additional parameters in your code.
Sample config file for the above sensor:
sensor_id: local_file_demo
sensor.source_code.main:
class: s3
path: s3://trel-demo/code/sensor2_local_file
sensor.main_executable: _code/sensor2_local_file.py
checked_out_files_to_use: []
manager_name: sm_test
credentials.requested_name: default
dataset_class: dropped_file
instance_ts_precision: D
label: prod
repository: s3-us-east2
schema_version: "1"
sleep_duration: 10
folder_to_monitor: ~/trel/samples/folder_to_monitor
max_age_in_seconds_to_insert: 86400000
insert_instance_ts: false
Running a sensor#
You must also do the following command to activate the sensor:
trel sensor_update <sensor_id> --activate
Once activated, the process becomes live and immediately starts looking for data and populating the catalog.
See trel_contrib@GIT for some examples.
Now is a great time to look at how we can decompose a problem into jobs and sensors using catalog-based dependency.
Plugins#
Trel has multiple Plugin systems. Each plugin system fulfills a specific role within Trel. The plugin frameworks allow the core Trel core framework to make the absolute minimum assumptions about what is required for your pipeline, expanding its applicability. It also allows functionality to be added later on.
Name |
Description |
---|---|
Responsible for supporting the execution of the business logic. E.g. |
|
Responsible for defining all valid sets of inputs for some job. Each plugin covers a pattern. The parameters to the class define the precise set of valid inputs. |
|
Implements support for specific storage technologies, like AWS S3, Google Cloud Storage, Google BigQuery, MySQL, etc. |
|
Given configuration, and input information, this will figure out the outputs the job can create. |
|
Resource |
Represents a specific resource that a job needs to execute. E.g., |
Source Code Downloader |
Downloads source code from specific locations. E.g., |
Retry Strategy |
Decides whether a new attempt should follow a failed attempt, and when. Typically, |
Schema Management#
Schema of a dataset is an identification string (say 1
). This string may be associated with a schema specification for that dataset_class, but that is optional.
Schema of a dataset is decided as follows:
if thedataset is being inserted manually:
if the schema is provided in the CLI:
return schema
elif dataset is the output of a job:
if schema specified in output generator:
return schema
elif the dataset is coming from a sensor:
if the schema specified in sensor:
return schema
return default schema for dataset_class
During execution, Trel will provide a task with the schema for every dataset and the schema specification if present. This is for both inputs and outputs. After that, it is up to the individual job to follow the directions.
Data Quality Checks#
To guarantee the integrity of your output dataset, you can add sodacore (an open source data validation library) checks to the schema entry for a dataset class. This approach re-uses the schema system and empowers you to define specific checks tailored to your dataset schema.
To add validation rules, follow these steps:
Right-click on the output dataset class in your UI.
Choose “Show Schema” to view the schema details.
Edit the currently active version. If there is none, create a new schema and set it as default.
Add a validation rule as follows. This can be appended to the existing schema.
sodacore_checks: |1
- row_count > 0
- missing_count(column_name) = 0
# Add additional checks as needed
Given below is an example of a schema entry with both schema and validation information.
columns:
- name: CustomerID
type: integer
- name: FirstName
type: string
- name: LastName
type: string
format: parquet
sodacore_checks: |1
- row_count > 0
- missing_count(FirstName) = 0
- missing_count(CustomerID) = 0
# Add additional checks as needed
Once the validation information is provided, you can request the treldev SDK to apply the rules by calling validate() on the URI classes that implement the validate_and_return function such as AthenaURI in athena.
The following code snippet demonstrates the usage of the AthenaURI class to create a uri object based on the provided path in the output dictionary. The validate method is then called with specific arguments, including the validation of args , enabling saving, and performing a sodacore check.
uri = AthenaURI(output['uri'])
uri.validate(args, save=True, sodacore_check=True)
The validate() function checks if the data meets the specified rules for the expected output.
These checks, incorporated directly into the output dataset schema, play a crucial role in ensuring the integrity and quality of the generated output dataset through a centrally governed mechanism.
Temp Table Creation and Configuration#
Many complex jobs require temp tables. Without platform support for temporary tables, problems can occur related to data integrity, and cleanup. However, they are avoided with Trel’s support of temporary tables.
When you register a job with trel, you can request temp paths from specific repositories. Trel will then generate temporary paths while running a job, and once the job is done, it takes care of automatically deleting those paths for you.
To configure temporary table support, you need to specify the repository name in the job registration file:
execution.temp_repositories:
key_name: repository_name
When using the temporary paths in the code, utilize the following logic:
args['temp_paths']['key_name'] + str(suffix)
Adjust the prefix and suffix as needed to create temporary paths according to your requirements.
Parameters#
The following places can store Parameters in Trel:
The main configuration file is
.trel.config
located in/home/trel_admin
in the trel instance. This is a YAML file that has the following structure:
<platform parameters>
default_params:
<job parameters>
profile_default_parameters:
<execution profile parameters>
Job registration file consists of job parameters with this structure.
<job parameters>
Every user has a config file in their home directory called
.trel.user.config
. There is an equivalent field in the WebUI.
<UI/CLI parameters>
user_config_parameters:
<job parameters>
user_profile_parameters:
<execution profile parameters>
Additionally, the user can override the parameters on the command line as part of an
execute
command.
Let us look at what is involved in each of these groups.
UI/CLI parameters#
Trel has the following UI/CLI parameters.
api
This informs the Trel CLI where to connect for access to Trel. Due to tunneling, this will be as follows:
api: host: localhost port: 5440
api_public_key
This contains the public key used by you to authenticate against the Trel instance.
api_user
This should be your e-mail address
Platform parameters#
Trel has the following platform parameters.
db_credentials
This gives
treladmin
command and the owner oftrel_admin
login access to the underlying database of the Trel platform. E.g.:db_credentials: username: --- password: --- db: trel
resource_pool
The administrator can control the number of simultaneous attempts by defining a set of virutal resources. See Virtual Resource pool.
Default:
{}
server
Some configurations for the REST API
- result_limit
The number of results to be returned by the REST API by default
Default: 50
admin_email
This is primarily used in communications as a point of contact.
machine_name (Unused)
In case the Trel services are spread out over multiple machines, this parameter can be used to differentiate them.
Default:
trel_main
mail.mechanism
This controls how the e-mails from the Trel platform will be sent. The only option now is
smtp
. This will cause the server to look for credentials with keysmtp
for sending e-mails.Default:
smtp
sla.send_emails
Allows you to disable SLA emails globally. You can still see SLA from the UI. Mostly used for QA purposes.
Default: True
credentials.rule
See Approval rule
Default:
default
sdk_path
This will be inserted into the authentication request email by the
treladmin
command
lifecycle.repository_action_storage_map
A mapping from repositories to storage repositories where actions can be stored. E.g.,
lifecycle.repository_action_storage_map: bq-us: gs-usHere lifecycle action items for bigquery repository
bq-us
is stored ings-us
.Warning
If a repository has no entry here, it will not be considered for lifecycle. This is how you prevent lifecycle management.
python_executable
This python will be used to run all the user-space code such as sensors and jobs. Some profiles may support this at the job level.
attempt_folder_backup_path
Attempt folders will be backed up here after
attempt_folder_backup_delay
seconds since attempt completion.
attempt_folder_backup_format (xz, gz, folder)
Use this compression to compress the folders. gz creates tar.gz files and xz creates tar.xz files.
attempt_folder_backup_lookback
If the attempt was completed more than this many seconds ago, it will no longer be backed up.
attempt_folder_backup_delay
How long to wait before an attempt folder is backed up.
attempt_folder_backup_repo
Backed up paths will be registered with this repo.
attempt_folder_backup_credentials_name
Which credential to use for backing up.
All these parameters are mandatory. They are configured with their default values.
Job parameters mechanism#
Trel has a defined set of job parameters. Each of these parameters is pre-defined to have up to 4 places where they can be set and overridden. In order, they are:
config: Trel configuration file in the server. See Job parameters: .trel.config.
registration: In the registration file of the job. See Job parameters: Registration.
user_config: In the user_config file of the user. See Job parameters: User config.
execute: On the command line where the user is executing the job. See Job parameters: Execute CLI.
In the configuration given below, with each parameter is a list of places where it can be specified. If allowed, the configuration is overridable in the above order. That means if a parameter is allowed in config and registration, and specified in both, the registration value will be the final one.
Each job will execute using the final set of parameters after merging.
Trel will verify these parameters at all stages. It will prevent any unknown parameter or attempts to change a parameter when not allowed.
Note
user_config and execute params should be optional for task execution as the scheduler will not have access to those.
Job parameters: .trel.config
#
Trel system config file can contain the following parameters. The Trel administrator specified these:
credentials.force_jobuser_permissions (config, registration)
If this is turned on, it forces the job execution to use for credentials, the user who is executing the job, not the user who created the job.
default: False
credentials.key_blacklist (config)
These credentials are not provided to the user.
default: [“smtp”,”github.private_key”,”aws.access_key.trel”,”openai”]
credentials.requested_name (config, registration, user_config, execute)
This parameter will be used to select the set of credentials used for this job. You can set this to override the default, and subject to the credential policy set by the administrator, it may be approved.
default: default
execution.cli_execute_files_path_root (config)
This is the root path where files specific to an attempt provided by the user will be stored. A new subfolder folder with the attempt id will be used for this.
execution.dataset_locking_seconds (config)
The output datasets be locked for this many seconds. The attempt will by default refresh the locks. If the attempt dies, after this duration since the last lock refresh, the datasets will expire.
default: 300
execution.priority (config, registration, user_config, execute)
As of now, not used. However, if you set it to 2 in your job, the workers will process your job earlier than other jobs in the queue. One intended use-case is to set this value in the
trel execute
command so that the job skips the queue and minimizes your waiting time.default: 1
execution.registration_files_path_root (config)
If your registration specifies local files, they will be uploaded to this path and made accessible to the job.
execution.retry_strategy (config, registration, execute)
The configuration for the retry strategy of this job.
execution.retry_strategy.attempts (config, registration, user_config, execute)
How many attempts can a task of this job have?
default: 1
execution.retry_strategy.delay (config, registration, user_config, execute)
What delay to introduce between retry’s for this job. This can be specified in multiple places for convenience.
default: 0
execution.retry_strategy.resource_error_delay (config, registration, user_config, execute)
What delay to introduce between retries if the failure occurred due to resource and not the code.
default: 3600
execution.temp_repositories (config, registration)
You may need temp locations for a job to hold some temporary data. Each repository can generate a temp location for each attempt. Here you can provide a mapping of temp_name to repositories to generate temp locations.
The profile will place the temp paths from each temp_name in args.yml under temp_paths.
default: {}
execution.timeout_seconds (config, registration, execute)
After this duration, the profile will terminate the job and marked it as FAILED.
default: 5000
execution.workspace_folder (config)
Each attempt will have a new dedicated subfolder to work with.
lifecycle.repository_action_storage_map (config)
A repository will be a key in this dictionary if and only if lifecycle should act on it. The value for each key is the storage repository where Trel will store actions for the job to read. This is because, sometimes, the key repository is not a suitable location to store a list of actions for a job. E.g., table-based stores.
default: {}
mail.mechanism (config)
What mechanism to use for sending emails. Options:
mail.mechanism
,smtp
. The latter requires appropriate credentials. See Known Credentialsdefault: mail_command
profile_default_parameters (config)
Use this to store information the profiles need to function but are not suitable to be provided by the registration
default: {}
profile_parameters (config, registration, execute)
Use this to store information the profiles need to function and are suitable to be provided by the registration
default: {}
python_command_path (config, registration, execute)
The profile provides this from the user config to the task.
default: None
resource.C_dataproc_spark.reduce_multiplier (config, registration, execute)
This many reducers will be configured per core of the execution cluster in (Py)Spark jobs.
default: 32
resource.allocate_if_static_resource_is (config, registration, user_config, execute)
The possible values here are from (busy, missing, terminated). If the status of the static resource matches what is specified in this list, the profile will choose to make a new resource.
Not all profiles or resources may support the busy option.
default: [“terminated”,”_MISSING”]
resource.memory_level (config, registration, execute)
This is the required memory level. The memory levels themselves are configured in the Trel system config. Ask your admin for details.
default: normal
resource.multi_config (config, registration)
If you need multiple resources, they are specified here.
default: None
resource.name (config, registration)
The name of the resource class required for this job.
default: NoResource
resource.num_cores (config, registration, execute)
How many cores does the (Py)Spark job need?
default: 1
resource.static_resource_id (config, registration, user_config, execute)
If this job should be executed in an existing resource, specify the id here. This will be interpreted by the profile and used for executing the business logic.
default: None
resource.storage_level (config, registration, execute)
This is the required storage level. The memory levels themselves are configured in the Trel system config. Ask your admin for details.
default: normal
What tags to apply to the resource
default: {}
scheduler (config, registration)
The config for the scheduler
default: {}
Job parameters: Registration#
Next are the parameters that can be specified only in the registration file and later.
execution.additional_arguments (registration)
The profile adds these to the CLI of the the main executable. E.g.,
--debug
. This parameter may not apply to some profiles.default: []
execution.checked_out_files_to_use (registration)
If you need files other than the execution.main_executable file in the working directory, specify them as a list here. The paths must be relative to the attempt directory. E.g.: [‘_code/myutils.py’]
default: []
execution.custom_workspace_initializer (registration)
Provide config for an instance of WorkspaceInitializer. This will be run after the default initializer. This feature is DEPRECATED.
default: {}
execution.file_map (registration, execute)
This parameter specifies the source of the file for each entry in execution.file_names. Trel CLI has mechanisms where you can specify a local file against an entry in execution.file_names, and the CLI will upload that file to a temp folder and set the path correctly in the file_map.
default: {}
execution.file_names (registration)
This parameter provides a mechanism to provide un-committed files to the job. Specify a list of file names required. These files will be downloaded and kept in the working directory before execution. Note that only file names should be specified, not the path to the files. execution.file_map will cover that.
default: []
execution.initial_attempt_parameters (registration, execute)
These parameters are passed to the first attempt as attempt parameters. The attempt parameters allow the job to change behavior based on the number and nature of previous attempts.
default: {}
execution.main_executable (registration)
This file is executed to run this job. This is relevant only for some execution profiles.
default: None
execution.main_sql_file (registration)
IF the main executable is a wrapper file, it can use this file and execute this SQL.
default: None
execution.output_generator (registration, execute)
This specifies the class and configuration that figures out the outputs of this job.
execution.preexec_executable (registration)
This python file is called just before your business logic is executed. This has to be a python file. But this can be used for any execution profile.
The typical behavior is to update the args.yml file. This script runs in the Trel worker, which can be needed for cluster based jobs.
default: None
execution.profile (registration)
The registered name of the execution profile that is attached to this job. Example:
python
.
execution.profile_must_close_datasets (registration)
If this is set to false, the user code must close the datasets based on what is valid and what is not.
default: True
execution.report_incidents (registration)
If a task fails, should Trel create an incident? Options: none,scheduled,all
default: scheduled
execution.source_code.additional (registration, execute)
Additional source codes can be provided using this argument. See Source Code Downloader Plugins.
default: {}
execution.source_code.main (registration, execute)
These are the arguments to initialize the source code downloader. See Source Code Downloader Plugins.
job_description (registration)
A one-line description of this job.
default: None
name (registration)
The name of the job
owner (registration)
repository_map (registration, execute)
This maps input dataset classes to repositories required by the job. The scheduler respects this constraint. See Register the first job: format_log for an example.
resource.args (registration, execute)
This is used to initialize the resource class and is specific to the resource.
default: {}
task.comment (registration, execute)
Let others know what this task was for. Don’t set this for normal production jobs. Set for manual execution using the CLI. Also set for experimental pipelines in the registration file.
default: None
Job parameters: User config#
These parameters can be in the user_config or the execute CLI.
execution.branch_override (user_config, execute)
Use this to override the branch in the registration file. Can be provided for all source codes or just the ones you choose. E.g.
feature1
,utils:feature2
.default: None
execution.output_label_override (user_config, execute)
This allows you to override the output label from any jobs registration at execution time. Can be provided for the whole task or each dataset separately. Example:
qa
,request_log:qa,stats:master
.default: None
job_user (user_config, execute)
This will be set by the profile during execution. See credentials.force_jobuser_permissions.
default: None
user_profile_parameters (user_config, execute)
The profile provides this from the user config to the task.
default: {}
Job parameters: Execute CLI#
These parameters can only be in the execute CLI.
execution.additional_arguments_cli (execute)
The profile adds these to the CLI of the main executable after the additional arguments from registration. You can set these directly or use -a parameter.
default: []
execution.are_inputs_canonical (execute)
Specifies whether this task was run with canonical inputs or not.
default: True
Execution profile parameters#
Execution of jobs in Trel, may require many parameters specific to the execution profile. There are three job parameter keys under which you can file these parameters:
profile_default_parameters
: Use this to store parameters in.trel.config
.default_params: profile_default_parameters: emr.availablity_zone_subnets: us-east-2a: subnet-xxxxxxxx us-east-2b: subnet-xxxxxxxx
profile_parameters
: Use this to provide parameters in registration. E.g.:profile_parameters: emr_spark.ami_version: emr-6.1.0
user_profile_parameters
: Some parameters can be provided here. E.g.:profile_parameters: aws.athena.workspace: DataScience
Users can override these parameters from the CLI.
Warning
Trel cannot validate these parameters during registration time.
In the end, it is upto the execution profile to decide if it wants to respect or ignore this parameter. For security reasons, it may choose to ignore some parameters. Refer to the plugin documentation to know precisely which execution profile parameters will be respected from each location.
All these parameters, respected or not, are available to the job in args.yml
Parameters on the command line#
Parameters can be overriden on the CLI as follows:
trel execute ... -p execution.retry_strategy.attempts:2
trel execute ... -p repository_map:'{"tweets_condensed":"s3-us-east2"}'
Even the profile parameters can be overridden. E.g.:
trel execute ... -p profile_parameters:'{"emr_spark.ami_version":"emr-6.0.1"}'
You can do this because profile_parameters
can be overridden on the CLI. So can user_profile_parameters
. E.g.:
trel execute ... -p user_profile_parameters:'{"aws.athena.workspace":"DataEngg"}'
Some parameters have dedicated CLI options. E.g., -a
is specifically for execution.additional_arguments_cli
. For example, the following two commands are identical:
trel execute ... -a " --force" " --verbose"
trel execute ... -p execution.additional_arguments_cli:'["--force"]'
For convenience, this option will strip the leading spaces in the arguments.
Resource#
Resource management offered by Trel primarily involves two aspects. Resource specification and virtual resource pools.
Virtual Resource pool#
Trel allows the administrator to specify the availability of a static pool of virtual resources in the config file. This is an abstract, platform-independent mechanism to throttle resource usage. The system-wide pool of virtual resources is simply a list of (string, integer) tuples where the string is the name of the virtual resource and the integer is how much of it is available. Here is how it looks in .trel.config
resource_pool:
us-west-1b_r3.xlarge: 10
us-west-1_emr_job: 5
us-east-2_emr_job: 5
prod_mysql_slot: 5
prod_redshift_slot: 8
Each attempt can require some virtual resources from the pool. Therefore, the attempt will block until all those virtual resources can be allocated. When the attempt completes, the virtual resources will be freed. The manager.py process performs the allocation and de-allocation.
An attempt can require virtual resources through two methods. First are the virtual resources automatically requested by the real resource (see Resource Specifications). Next are the explicit virtual resources that are requested in the registration file. They are also made part of the resource specification to keep it simple. Here is the relevant portion from a registration file:
resource.name: NoResource
resource.args:
extra_virtual_resources:
prod_mysql_slot: 1
prod_redshift_slot: 1
Resource Specifications#
Trel allows you to specify the real resources needed to execute the business logic. This usually requires a Resource plugin. E.g., an EMR cluster to execute a pyspark job (see emr_spark). The specification needs to be in the registration file.
The profile uses the specification to create the resource (if the resource does not exist). It also help the profile understand the existing resource and properly configure the business logic for execution. In the case of pyspark jobs, the profile is responsible for spinning up the cluster as well as deleting it at the end of the job.
Additionally, the resource specification is an alternate way to specify some Virtual Resource pool requirements. By default, every resource also requires some virtual resources. If and only if the admin makes these virtual resources part of the system pool, they will be required to be allocated for the job to proceed.
Several parameters that start with the prefix resource.
are relevant to resource specifications (see resource.name).
Credential management#
Credentials are sets of (key, value) pairs. The key identifies the credential, and its value is the actual credentials.
Each credential set also has a name, which can be the user_id of the associated user or something else like default
.
Known Credentials#
The following keys can be used to specify credentials for specific services.
Credential Key |
Description |
Example |
---|---|---|
|
Credentials for accessing AWS services. |
|
|
Credentials for accessing EC2 instances. |
|
|
Service account credentials for Google Cloud. |
|
|
Contents of your private SSH key that can access Github. |
|
|
Credentials to access MySQL/MariaDB |
|
|
Credentials for sending e-mails using SMTP. Required for authenticating users and SLA module. |
|
|
API access token for databricks |
|
The command given below adds the aws.access_key
credential to the default
credential set.
treladmin credential_add default aws.access_key '{"key":"AK..","skey":"tb.."}'
Credentials specific to users can be added as follows:
treladmin credential_add email@email.com aws.access_key '{"key":"AK..","skey":"tb.."}'
Users can add credentials to the set with the same name as their user_id as follows:
trel credential_set aws.access_key '{"key":"AK..","skey":"tb.."}'
Credential use#
For running a task, Trel will provide an entire credential set. Service-specific profiles know which key from the credential set to use. E.g., emr_pyspark
will use key aws.access_key
.
Job codes can also get access to the credentials through credentials.yml
in their working directory.
Approval rule#
Which credential set does Trel provide to the job?
By default, Trel chooses to provide a task with credential set with the name default
. This is automatically approved if the approval policy credentials.rule
is set to approve_default
(see table below).
Rule |
Description |
---|---|
|
Approve all requests to default credential set name. Reject anything else. |
Users can change the set by setting credentials.requested_name
. A user always has access to the credential set named the same as user_id.
Permissions Management#
Permissions allow specific actions using the Trel platform. It is complementary to restrictions on the credentials. E.g., permissions can prevent a user from creating Trel jobs. There following permissions are designed in trel
Rule |
Description |
---|---|
|
Access to the data catalog. Can see and search through dataset metadata. Can see sensors, jobs and tasks. |
|
Can create and run jobs and sensors. Other ability to affect them. |
|
Can publish dataset classes in some repository for other people to use. |
|
Can make changes for other users. E.g, see other users’ SLA report. Can also add/update/delete datasets in the catalog. Can also create schema for dataset_classes. Can create global SLA rules. |
|
Can create global lifecycle rules. |
|
Allow modification of datasets with this label, directly or indirectly. X can be a regex |
|
Prevents modification of datasets with this label, directly or indirectly. Lower precedence than /label. X can be a regex. |
|
All permissions |
Permissions can be added and removed from a group using treladmin
command. Users can be assigned to a group to get the related permissions.
SLA - service-level agreement#
Trel allows you to specify some time limit within which datasets that match the criteria must exist. If the dataset is not ready within that time, the SLA module will send out alerts.
This subsystem has three commands:
Command |
Description |
---|---|
|
List the ruleset currently active. |
|
Update the sla ruleset from a file |
|
List the datasets violating the SLA that you are monitoring. |
Here is a sample ruleset that explains how it works.
watchers:
- [email protected] # These users get all the SLA notifications.
tag_watchers:
reporting:
- [email protected] # These users get SLAs with these tags
sla:
- dataset_class: request_log # Mandatory
instance_prefix_regex: "^.*$" # DEFAULT: None
instance_prefix_is_null: True # DEFAULT: False
instance_ts_precision: D # Mandatory
cron_constraint: "0 0 * * *" # Mandatory. This applies to instance_ts. SLA can be applied only to datasets with instance_ts.
label: master # Mandatory
repository: s3-us-east2 # Mandatory
wait_hours: 3 # Mandatory. Defines how long to wait from instance_ts + period (from instance_ts_precision).
ignore_hours: 72 # Mandatory. How long to wait from instance_ts + period to ignore this dataset.
watchers: # These users watch only this SLA.
- [email protected]
tags: # These are the tags for this SLA
- reporting
# The SLAs are grouped by tags. The ordering is specified below.
# Put the important ones first for visibility.
tag_order:
- deliverable
- reporting
To use the SLA,
Create a specification using the template given above and save to a file.
Run
trel sla_rules_update <filename>
. Note that you need permissions/sla
or/sla:update
for this to work.
Once this is done, the emails will be sent as soon as some datasets become delayed.
By default, a maximum of one e-mail is sent every hour. The exception is when the SLA clears up, in which case, you get an email immediately letting you know that the SLAs cleared up. This way you never have to wonder whether it has cleared up since the last notification of SLA misses. If it had cleared up, you would have gotten an e-mail letting you know.
This is a sample subject line of an email letting you know of an SLA miss:
SLA: 4 datasets missing as of 2021-08-11 07:06:32
This is the subject line if the SLA cleared up:
SLA: No missed datasets as of 2021-08-11 07:27:08
The SLA module is a powerful component of Trel. Used effectively, it ensures peace of mind for the operations team, removing any need to “check if everything is fine”.
Lifecycle Management#
As the scale of data that we deal with grows, so does the storage cost. Many companies struggle to develop effective strategies to control the size of their data. The immutable datasets in Trel amplify this situation.
On the other hand, thanks to the catalog, Trel can do powerful lifecycle on the data. Trel’s lifecycle management module does precisely this. How it works:
Trel allows you to specify lifecycle rules. These rules create lifecycle action requests on datasets.
Lifecycle action requests can also be made manually.
Lifecycle module studies these requests for a time frame (typically an hour). It converts them to actions.
Each repositories’ actions are uploaded to a repository specified by
lifecycle.repository_action_storage_map
and registered with labellifecycle
.These datasets are per repository and are named
lifecycle.actions
. The instance_prefix is set to the repository, label is set tolifecycle
, instance_ts_precision is set toH
.The actual lifecycle process consists of jobs that accept these datasets as inputs and produce equivalent
lifecycle.actions.complete
datasets after executing these actions.You can find appropriate jobs in trel_contrib. Please find the ones for your repository and register them appropriately.
The Trel lifecycle module monitors the
lifecycle.actions.complete
datasets and collects the outcome of lifecycle actions. These actions are used to close the loop on the requested lifecycle actions.
Warning
Every lifecycle.actions
dataset must have a corresponding lifecycle.actions.completed
. Without this, the lifecycle process will be stuck.
This restriction is to ensure the correctness of the lifecycle process and to avoid inconsistencies in the catalog due to incomplete lifecycle process.
While this mechanism is definitely complicated, it has a number of benefits:
Decouples Trel from the actual lifecycle action process.
Can plug in support for new repositories by registering jobs.
Lifecycle action code is open-source.
Lifecycle tracking code is reused between repositories.
Lifecycle tracking code can have features specific to repositories.
Seamlessly merge automatic lifecycle with manual data lifecycle actions.
Lifecycle action can be performed using a suitable and scalable technology, allowing the process not to be limited by the capabilities of the Trel instance.
Setting up#
These are the steps for setting up lifecycle management for a repository:
Decide on an action storage repository for your repository. E.g., for BigQuery repository, you may choose Google storage to store the actions.
Find an appropriate lifecycle job code for your repository type. Look through trel_contrib.
Register it with the registration file updated as required. You will need the repository name and the action storage repository name.
Place the repository and the action storage repository into
lifecycle.repository_action_storage_map
in the .trel.config file.Update the rules to include this repository.
Restart the lifecycle management system.
Lifecycle rules#
Lifecycle rules define the conditions under which lifecycle actions will be taken on a dataset. Here is an example:
# These rules are applied to all invalid datasets
rules_for_invalid:
- action: delete
condition:
- [ AND, [LabelIs, prod], [NotAccessedFor, 5d ] ]
- [ AND, [NOT, [LabelIs, prod]], [NotAccessedFor, 3d] ]
# These rules are applied to all valid but inactive datasets
rules_for_inactive:
- action: delete
condition:
- [ AND, [LabelIs, prod], [InactiveForOver, 21d ] ]
- [ AND, [NOT, [LabelIs, prod]], [InactiveForOver, 14d] ]
# These rules are applied to valid datasets
rules_for_active:
- dataset_classes: [ request_log ]
rules:
- action: delete
condition:
- [ InstanceTsAgeOver, 365d ]
- [ AND, [NOT, [LabelIs, prod]], [InstanceTsAgeOver, 180d] ]
The structure of rules_for_active
is as follows:
It consists of a list of
rules
anddataset_classes
pairs. The rule applies to every dataset_class in the list.Each rule consists of a list of
action
andcondition
pairs. Theaction
must be a valid action for this dataset’s repository.The pairs are evaluated sequentially, and the action for the first valid condition is taken.
The condition consists of a list of boolean expressions. The list is effectively an OR expression on the boolean expressions. If any of the boolean expression is true, the condition is true.
The boolean expression consists of several predicates and boolean operators viz. AND, OR and NOT.
Supported predicates#
The following predicates are available:
LabelIs, label
Return True if the label of the dataset matches label.
InstanceTsPrecisionIs, precision
Return True of the Instance TS precision of the dataset matches value.
RepositoryIs, repository
Return True of the repository of the dataset matches repository.
PrefixRegexIs, regex
Return True of the Instance prefix of the dataset matches regex.
PrefixIsNull
Return True of the Instance prefix of the dataset is None.
InactiveForOver, duration
Return True if the dataset has been inactive for duration. This value can be a string with suffixes w, d, h, or m. Without any suffix, it will be treated as seconds.
InstanceTsAgeOver, duration
Return True if the dataset’s instance_ts is older by more than duration. This value can be a string with suffixes w, d, h, or m. Without any suffix, it will be treated as seconds.
NotAccessedFor, duration
Return True if the dataset has not been accessed for more than duration. This value can be a string with suffixes w, d, h, or m. Without any suffix, it will be treated as seconds.
Supported actions#
As of now, lifecycle supports the following actions:
s3: delete
This command will delete your dataset. It can take the following parameters.
- all_versions (true, false)
Delete the data for all versions.
- effective_immediately (true, false)
If true, hide the dataset entry from the catalog immediately instead of hiding it when the actions start being processed.
bq: delete
Delete the BigQuery table.
Behaves identically to (s3, delete)
gs: delete
Delete the Google Storage dataset.
Behaves identically to (s3, delete)
Manual operations#
You can request lifecycle actions through the UI or the CLI. Here is an example command:
trel dataset_do_action 453 delete --params '{"all_versions":true}'
You can get suggestions for possible actions if you do not provide one:
trel dataset_do_action 453