Tutorial#
Tutorial scenario#
In this tutorial, we build a data pipeline for a web portal. We start by informing Trel about one day’s log from the portal’s server by adding the path to Trel data catalog. Next, we build a job to transform this data to extract needed information. Finally, we review a troubleshooting process to fix bad data due to buggy code.
To try out commands in this tutorial, you are expected to have access to a Trel instance. The tutorial is broken down into exercises with explanations and commands.
Trel has size main concepts that are used to build a data pipeline.
Datasets: Your dataset paths associated to multi-dimensional space similar to a spreadsheet. Lives in the catalog
Repositories: Each dataset is associated to one of these registered entities that is attached to a repository class.
Jobs: Transforms data. Reads from the catalog and creates (or replaces) datasets.
Execution Profiles: Helps the a job task execute.
Scheduling Classes: Decides if a task is ready to run and with what inputs and outputs.
Sensors: Crawls and optionally loads data into one of the repositories and adds the path to the catalog.
Of these Datasets, repository entities, jobs and sensors are user definable. Execution profiles, scheduling classes, repository classes, and other plugins are limited to what is available with the platform.
This tutorial will cover all but sensors. For sensors please refer to the Sensors section as well as documentation for the sensor itself.
Register a server log as a dataset within Trel#
The data pipeline processes logs from the web server. The web server loads these logs to Amazon S3 on a daily basis. In production, these daily logs will be added to Trel as a Datasets as soon as it is ready. This will be done using the Sensors of Trel.
For now, we will enter the path to a day’s log manually. When we do so, we have to provide Trel some context for the path. This context consists of six dimensions:
dataset_class
: A string that represents the type of the dataset.instance_prefix
: Indicates the non time-based origin of the dataset.instance_ts
: The time that this dataset corresponds to.instance_ts_precision
: Is it hourly, daily etc.label
: Environment for the data. E.g., prod, qa, etc. Another interpretation is that this represents the code that generated the data.repository
: Repositories represent broad locations where data is stored. E.g., AWS S3 Ohio region (s3-us-east2). They have to pre-registered with trel.
We combine 3 and 4 into a string called instance_ts_str
for convenience of input. Finally, the command can be written as follows:
trel dataset_add server_log_raw "" "20210101" demo s3-us-east2 \
"s3://trel-demo/adhoc/server_log_raw/20210101/"
Let us look at the explanation for each value.
dataset_class = server_log_raw
: We call itserver_log_raw
as this is unstructured, and we will later process it.instance_prefix = ''
: We leave this blank for nowinstance_ts_str = 20210101
: The log is for the day of 1/1/2021.label = demo
: We make up ademo
label to isolate this pipeline from production.repository = s3-us-east2
: we registereds3-us-east2
as a repository for Amazon S3 in Ohio region.uri
: We make up a path. In this tutorial, the path does not matter as we will not actually process the data. It is only important that a unique path for the day is provided.
Next let us double-check whether the entry is correct:
trel datasets server_log_raw
We confirm that the entry exists.
Interpretation of the dimensions#
The 6 dimensions that we provide Trel for each path has a similarity to Excel. Let us explore that more.
Excel provides us a set of 2D sheets. This can be thought of as a 3 dimensional space. Let us say we put a number 1500, which is a total in E26. Here, the meaning is “total”, the value is 1500. But what is E26? It is tempting to think it is just the location, but frankly, the location is somewhere in the RAM. E26 is the identity of the total, an identity granted to 1500 by the fact that it was placed in an Excel sheet.
By giving an identity E26, we are now free to related to other cells in the sheet. E.g., E25 can be the total for a previous date. D26 can be one of the components that make up the total.
Similarly, path to a dataset is just the location of the dataset. Using Trel, you give it an identity using 6 dimensions. Unlike Excel, where the dimensions are general purpose, Trel’s dimensions are specifically created for data pipelines. These dimensions offer you various implicit relationships.
E.g., datasets that differ only in repository have identical data. Datasets that differ only in label are generated from alternative algorithms that we are exploring. Datasets with all the instance_ts
for a month can be combined to get a dataset for the entire month.
These relationships give us the information we need to easily express the data flow we need for the business requirements.
Register the first job: format_log
#
Trel allows you to register jobs that process datasets to generate new datasets. This involves using the register sub-command with a YAML registration file. See Jobs.
We will store the registration information for format_log
in trel_regis_format_log.yml
by convention. The contents are given below.
name: format_log
execution.profile: python
execution.source_code.main:
class: s3
path: s3://trel-demo/tutorial_demo
branch: master
execution.main_executable: _code/src/do_nothing.py
repository_map:
- server_log_raw: s3-us-east2
scheduler:
class: single_instance
depends_on: [ server_log_raw ]
labels: [ demo ]
instance_ts_precisions: [ D ]
cron_constraint: "0 0 * * *"
schedule_ts_min: "2021-01-01 00:00:00"
execution.output_generator:
class: default
outputs:
- dataset_class: server_log
Let us go through this line by line.
name
: The name of the job, which we callformat_log
execution.profile
: A string representing a Profile plugin (See Plugins). In Trel, Profile classes are responsible for encapsulating details regarding specific types of jobs and how to run them (See Execution Profiles). We chosepython
profile suitable for running python scripts.execution.source_code.main
: Specifies how to get the source code for this job. This uses a plugin. See Plugins.execution.main_executable
: This specifies the main executable for this job. Here, the filedo_nothing.py
is functionally blank. That means all it will do is return successfully and will not actually process any data.repository_map
: This parameter is part of how input datasets are selected. Here, it says that input datasets of class server_log_raw must come from s3-us-east2 repository.scheduler
: Review Scheduling Classes first. Here, thesingle_instance
scheduler will choose datasets of classserver_log_raw
and daily instance ts withdemo
label as input. Every input can form a separate task.execution.output_generator
: The output generator chosen isdefault
(see Output Generator Plugins). It is configured to create a dataset of classserver_log
with everything else from schedule_info, which should be identical to the input.
Before you can register, you have to create do_nothing.py
an empty file:
touch do_nothing.py
Place it in a suitable location. If you put it in <path>/master/src/do_nothing.py
, you can put <path>
as path
in the registration.
Finally, to register format_log
, run the command below:
trel register trel_regis_format_log.yml
If no error is shown, it was successful.
Run format_log
#
Now that the job is successfully registered, we want to run it. There are three ways to run a job in Trel.
Manually, with inputs provided. In this case, we explicitly provide the inputs for the job on the command line.
trel execute <job_name> -i \ <dataset1.dataset_class>,<instance_prefix>,<instance_ts>,<label>\ ... <datasetN.dataset_class>,<instance_prefix>,<instance_ts>,<label>
Manually with scheduling info provided. This allows the scheduler associated with the job to figure out the inputs automatically and execute the job against those inputs. The schedule_id for single_instance is just the instance_ts of the input.
trel execute <job_name> -s <schedule_id>
Ask the scheduler to run all possible jobs. The first command activates the scheduler which will start monitoring all new datasets. The second command asks the scheduler to run all possible tasks using existing datasets.
trel job_update <job_name> --enable_scheduler trel job_update <job_name> --trigger_scheduler
Manual Run using inputs#
Let us try all the ways. Try the command given in method 1. We add a -w so that it Trel waits for the job to complete and shows the final outcome.
trel execute format_log -i server_log_raw,,20210101,demo -w
The output clearly indicates the outcome, inputs, attempts and outputs. Let us examine the visible datasets.
trel datasets server_log
Manual Run using schedule info#
trel execute format_log -s 20210101 -w
Note how we ran differently, the same job again. Observer how the final output differs. The input dataset is the same down to the id and path. But for the output, while dataset_class, instance_prefix, instance_ts, label, and repository are the same, the id (in square brackets) is different. This indicates that a new dataset entry is created for the second task, even though the scheduling info is the same.
Furthermore, as the default uri_formatter has the id in the path, a new path is used for the output. In this case, outputs of the first and second runs exist simultaneously within Trel. However, due to the fact that these two datasets share dataset_class, instance_prefix, instance_ts, label, and repository, the newer one suppresses the older one, and only the new one is visible and active.
trel datasets server_log
Confirm that the dataset.id
in the visible dataset matches the output of the second task.
Automatic run using scheduler#
To observe the scheduler working under various circumstances, let us first add another dataset.
trel dataset_add server_log_raw "" "20210102" demo s3-us-east2 \
"s3://trel-demo/adhoc/server_log_raw/20210102/"
Now, let us enable the scheduler.
trel job_update format_log --enable_scheduler
Observe that no tasks have started.
trel tasks -j format_log
This is because the scheduler is only monitoring new datasets. Let us add another dataset to see if it catches that.
trel dataset_add server_log_raw "" "20210103" demo s3-us-east2 \
"s3://trel-demo/adhoc/server_log_raw/20210103/"
Wait 2 seconds and run
trel tasks -j format_log
We should be seeing a new task for 1/3. Next, let us make the scheduler process 1/2 as well.
trel job_update format_log --trigger_scheduler
Wait 2 seconds and run
trel tasks -j format_log
We should be seeing a new task for 1/2. But why did it not run 1/3 again?
The concept of schedule_id
#
schedule_id
is an important concept in Trel. Once a scheduling plugin is selected and configured in the registration, it defines a set of sets of inputs. Each set of inputs is called a schedule and has a unique identity defined by the schedule_id
. The set of inputs and the id is interchangeable using the plugin.
Furthermore, by providing the schedule_id
, you are defining a precise set of points from the catalog space as inputs to your task. This is the source of Trel’s strictness. Every scheduled run of a Trel job must match a schedule_id
. Trel guarantees it. If the exact inputs are not present in the catalog, the task will not run.
To answer the question a few paragraphs above, the mechanism that Trel uses to avoid running the same task again is also schedule_id
. Trel’s schedulers will not create a task with a schedule_id
same as an existing one. If you need it rerun, the typical solution is to run it manually using the schedule_id
. You can also explicitely setting that task to be ignored and ask the scheduler to run any possible task.
Fixing a failed job#
Suppose a task failed. E.g., format_log
for schedule_id
20210103. What needs to happen to get things back on track?
Trel comes with a set of design guidelines for the sensors and the jobs. If properly followed, the data warehouse gains a few properties that come in very handy for this.
The first property is accurate state tracking and dependency. If a task fails all the tasks that directly or indirectly need its outputs will be unable to proceed. They will wait automatically for the task to finish successfully and the required entry to show up in the catalog.
So the only step you need to do is fix the failed task. What does this involve? First of course, is to fix the code and commmit it. What do we need to do to safely repeat it?
Trel warehouses also ensure immutability of data and have pure jobs. These properties, coupled with the catalog-based dependecy ensure idempotent tasks by default. That means any task can be repeated safely with no negative impacts. So, once the code fix is committed, just run the task again.
But what if there is a time gap between when the task failed and when it was re-run? Let us say it was re-run 1 week later. How do you ensure that the outcome is correct?
Designed properly, Trel warehouses also has a unique property: execution-time invariance. This is a guarantee for the sensors and the jobs (within certain configurable time limits) that the effect outcome of a task is independent of when it is run. Coupled with idempotency, this means a Trel task can be run at any time, and any number of times, with no side effect or undesirable behavior.
In conclusion, if a task fails, fix the code an run it again. If a sensor is failing, fix code and restart it.
This works everytime for Sense, ETL, Transform and copy. It typically does not work for reverse-ETL jobs as they are often not idempotent.
Running a pyspark job#
Consider the given pyspark code below stored in a file called distinct.py
is given below
import pyspark
import pyspark.sql
from pyspark import SparkContext
from pyspark.conf import SparkConf
from pyspark.sql import SQLContext
import json, time, sys, yaml
def parse_args():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--_args")
args, _ = parser.parse_known_args()
return args
if __name__ == '__main__':
cli_args = parse_args()
args_fname = cli_args._args
with open(args_fname) as f:
args = yaml.load(f)
spark_conf = SparkConf()
sc = SparkContext(conf=spark_conf)
spark = SQLContext(sc)
input_uri = args['inputs']['request_log'][0]['uri']
output_uri = args['outputs']['request_log_distinct'][0]['uri']
sc.textFile(input_uri)\
.distinct()\
.coalesce(1)\
.saveAsTextFile(output_uri)
sc.stop()
It takes input specification in a yaml file, loads the input data, does unique and writes it to the specified output. This is the outline of a typical Trel job code, pyspark or otherwise.
To run this code, we can write a registration file as follows:
name: request_log_distinct
execution.profile: emr_pyspark
execution.source_code.main:
class: s3
path: s3://trel-demo/code/pyspark_test
execution.main_executable: _code/distinct.py
repository_map:
- request_log: s3-us-east2
resource.name: emr_spark
resource.memory_level: normal
resource.num_cores: 1
resource.args:
region: us-east-2
spot: true
scheduler:
class: single_instance
depends_on: [ request_log ]
labels: [ demo ]
instance_ts_precisions: [ D ]
cron_constraint: "0 0 * * *"
schedule_ts_min: "2021-01-01 00:00:00"
execution.output_generator:
class: default
outputs:
- dataset_class: request_log_distinct
Save this to a file called trel_regis_request_log_distinct.yml
. Make sure you replace the path to the code after you upload distinct.py
to an appropriate place. Execute,
trel register trel_regis_request_log_distinct.yml
Next, we need some data to process. we have placed the following data at s3://trel-demo/adhoc/request_log/20210101/part-00000
line1
line1
line1
line2
line3
line1
After you upload this data to your bucket and change the path below, we can register this data as follows:
trel dataset_add request_log "" "20210101" demo s3-us-east2 s3://trel-demo/adhoc/request_log/20210101/
Finally, we can run the job as follows:
trel execute request_log_distinct -i request_log,,20210101,demo -w
Confirm through the AWS console that the job is running. Once the job finished, it reports the input and output paths. Download them to confirm that the output consists of the distinct lines in the input.
Deploying fix for format_log
#
Consider the scenario where there was a bug in format_log
code. Remember, the formal_log
is running on the master
branch. This has caused bad data to occur in server_log
for 1/3, 1/4, 1/5 (1/5 is the latest). We would like to:
Fix the problem in the code,
QA it in a code branch and separate label,
Deploy code by merging with master and
Fix the problem in the production data.
Let us set up this scenario:
trel dataset_add server_log_raw "" "20210104" demo s3-us-east2 \
"s3://trel-demo/adhoc/server_log_raw/20210104/"
trel dataset_add server_log_raw "" "20210105" demo s3-us-east2 \
"s3://trel-demo/adhoc/server_log_raw/20210105/"
Assume we are able to identify the problem and make the corrections locally. Then our steps are as follows:
First we commit the changes to hotfix branch say hotfix_343.
Then we register a new job using this branch that accepts inputs with production label while outputting to a label called hotfix_343.
name: hotfix_343.format_log execution.profile: python execution.source_code.main: class: s3 path: s3://trel-demo/tutorial_demo/hotfix_343/src execution.main_executable: _code/do_nothing.py repository_map: - server_log_raw: s3-us-east2 scheduler: name: single_instance depends_on: [ server_log_raw ] labels: [ demo ] instance_ts_precisions: [ D ] schedule_ts_min: "2021-01-01 00:00:00" execution.output_generator: class: default outputs: #- datast_class: server_log - [ "server_log", "_inherit", "_max", "_inherit", "hotfix_343", "_inherit" ]
Once this is done, you can run the branch as follows:
trel execute hotfix_343.format_log -s 20210103 -w
If the above command run through, we can QA the data for
server_log,,20210103,hotfix_343
and make sure it is satisfactory.At this point, the code in hotfix_343 is validated and we have to deploy it. We make and approve a pull request to master branch. This is deployment.
Finally, we fix the data. In Trel, thanks to the data catalog, we can just re-run the job to replace the data. This does not leave the catalog in an inconsistent state ever.
We choose not to re-run 1/3 as that validated data is available. We re-register that path with the production label.
trel dataset_add server_log "" 20210103 demo s3-us-east2 \ s3://<uri_for_dataset_from_step_3>/
For 1/4 and 1/5, we re-run.
for date in 20210104 20210105 do trel execute format_log -s $date done
Without the -w
flag, the command will queue the jobs to be processed in parallel.
What we have learned#
In this tutorial, we have learned the following:
How the Trel data catalog organizes your data
How to add a dataset to the catalog.
How to replace a dataset in the catalog.
How to create and register a
python
a job.How to execute a task from a job.
A simple scheduling pattern:
single_instance
How to schedule a job and what that does.
What is
schedule_id
, the identity of a task?How to fix a failed task.
How to create and register an
emr_pyspark
job.How to QA and deploy a fix for a buggy transformation code that resulted in bad data.
More advanced situations are covered in How to….