Design guidelines#

Trel is purpose-built for orchestrating and managing ETL, transform, and Reverse-ETL requirements, primarily OLAP. Therefore, it is not suitable for OLTP tasks.

Given a suitable requirement, to design a Trel solution, we have to figure out entities of 6 types:

  1. Repositories

  2. Sensors

  3. Dataset classes

  4. Jobs

  5. Execution profiles

  6. Scheduling classes

Design goals#

When we design jobs and sensors, we are trying to achieve the following:

Execution-time invariance

Tasks of any job that achieves this property will produce the same result no matter when it is run. In other words, the data that it generates is independent of the time of execution.

This is a complicated requirement for jobs and requires several other properties:

  1. Pure jobs

  2. Immutable datasets

  3. Idempotent jobs

  4. Strict data state tracking and cataloging with time

  5. Automatic time-travel

For sensors to be execution-time invariant, they require time travel for the data source and cataloging with time.

The Trel platform is geared towards these properties. But why focus on execution-time invariance?

Importance of execution time invariance#

In many data warehouses, delays are a significant source of incorrectness. This is because, data warehouse automation is geared towards correct behavior when everything is running on time. An experienced automation engineer can include several safety mechanisms in case of delays, but perfection is nearly impossible without the above properties.

Without execution time invariance, there is a lot of manual work to resolve the data issues and get things back on track, anytime there is a major disruption. A lot of specific training is needed for an operations person to be able to accurately recover from such disruptions.

As responsibilities change, it is easy to lose track of the corner cases. Furthermore, as more manual operations are needed, human error also increases. The error introduced into your data during such manual processes are the hardest to track down and catch.

With execution-time invariance, all these problems go away. Catalog-based dependency mechanism guarantees that failed jobs lead to only delays. Execution time invariance guarantees that delays cause no other issues.

Execution-time invariance leads to a highly automated, robust and predictable data warehouse that can recover from any disruptions with minimal manual involvement.

Recommendations#

  1. If you are moving data, don’t transform it. This is because transformations are requirements specific while moving data is not. You may be able to find a re-usable code just for the data transfer.

  2. If you are transforming data, avoid moving it. Use a separate moving job. Trel correctly catalogs copies and simplifies its automation.

  3. Jobs that write to locations where other systems will pick them up are reverse-ETL jobs. Don’t do data transformations in these.

  4. Transformation jobs can be of two types:

    1. Update type: Here, we want to update a dataset based on some other data. So, these jobs have at least two dataset classes as input, and the output’s dataset class will be one of the input dataset classes. You will typically use the periodic scheduling class for these types of jobs.

    2. Create type: An entirely new dataset class will be created from the inputs. You will typically use the single_instance or its generalization multi_period for these jobs.

  5. All transformation jobs should write to immutable paths. Writing to immutable paths is the default behavior.

  6. Completely avoid side effect in your transformation and copy jobs. Read from inputs only and write to outputs only. If you cannot achieve this, perhaps Trel is not the appropriate tool.

  7. If a job looks too complicated for the existing scheduling patterns, split up into smaller jobs.

With these best practices, Trel jobs are execution-time invariant.

Take the effort to make Trel sensors execution-time invariant as well. This means that even if the sensor is shut down for, a few days, when it resumes, the datasets it generates are identical to what it would have generated had it not been interrupted.

This may not be always possible, as it depends almost entirely on the capabilities of the data source. Try to do as best as possible.

There are a few more stylistic recommendations:

  1. If your job has only dataset class as output, name it the same as the output. The reason is, as the scale of the warehouse grows, this cuts the names you have to remember by almost half.

Next, let us take a look at some examples. We will highlight each choice we make.

Example 1: Show tweet and stock price analytics in a dashboard#

Our goal is to pull data from Twitter, and Finnhub for a company, say AMC. In addition, we want to derive frequency statistics for tweets and push it into a dashboard along with stock prices.

For a different perspective on this design, you can look at this blog post. Complete source code is also available on GitHub.

Bringing the data in#

We need to get Twitter data. For this, we use the tweepy package. We build a sensor using this that can read from Twitter. The information we receive is a set of large JSON, so we choose to keep it in a data lake technology.

Design

Repository s3-us-east2 of class s3

We need to pick a dataset class for the produced datasets. We also want to decide how frequently a new dataset will be generated. Finally, we want a reasonably responsive dashboard, so we choose 5 mins interval.

Design

Sensor twitter_amc pulls AMC tweets using the tweepy package.

It uploads data into paths given by s3-us-east2 and catalogs it as tweets.

Datasets have instance_ts as 2022-01-01 00:00:00, 2022-01-01 00:05:00 etc.

They contain tweets for 5 mins starting at instance_ts.

In the implementation, we are able to make this code execution-time invariant. Even if the sensor is shut down for a time, once started, it will study the catalog, find out all the missing entries, and start querying Twitter for all these 5 minute periods to create those datasets. Their contents will be the same as if they were created as soon as possible.

We have to do something similar for stock tick data. This data, however, is clean and tabular. So, we can load it directly into a data warehouse type technology. We choose BigQuery for this.

Design

Repository bq-us of class bq

Now we can build the sensor that pulls from Finnhub

Design

Sensor finnhub pulls AMC tweets using the finnhub package into bq-us and catalogs the dataset as tweets

instance_ts similar to twitter_amc.

At this point, the ETL portion is complete, and the pipeline is set up for landing the data in some repository and cataloging it. We are also able to achieve execution-time invariance for both the sensors.

Transforming the data#

Next, let us process the tweets. We have the tweets as unstructured JSON, and want them to be more usable. So, we write a job that extracts the information we want from JSON and loads it to BigQuery.

We choose to do this using Python.

What about the scheduling? Our requirement is simple: In each task, take a dataset of tweets and generate a dataset of tweets_condensed. Both the datasets have identical dimensions, except the dataset_class. The scheduling plugin that can get this done is single_instance which is best when the inputs and outputs all share the same instance dimensions.

This choice violates directive 3, but that is a soft suggestion, and we ignore it for now.

Design

Job tweets_condensed reads tweets and produces tweets_condensed in bq-us

Profile: python

Schedule: single_instance. Example:

tweets (2021-01-01 00:05)  ->  tweets_condensed (2021-01-01 00:05)

Next, we write a job to generate statistics. Again, BigQuery is the obvious choice. However, we have a complication: we want statistics for more than a single 5-minute interval. So, we maintain a larger set of statistics and use the latest 5-minute data to update them. This design follows the update pattern described in Recommendations. So, we use the periodic scheduling class.

Design

Job tweets_stats reads tweets_condensed and tweets_stats while producing tweet_stats in bq-us

Profile: bigquery

Schedule: periodic. Example:

tweets_condensed (2021-01-01 00:05)
                +                     ->  tweet_stats (2021-01-01 00:05)
tweet_stats      (2021-01-01 00:00)

We do something similar for the stock prices:

Design

Job stock_series_30day reads stock_ticks and stock_series_30day while producing stock_series_30day in bq-us

Profile: bigquery

Schedule: periodic

Now, we need a job to merge these two datasets and clean it up into a 7-day dataset that can be shown in a dashboard. This is a create type transformation job in BigQuery.

Design

Job dashboard_stats reads tweets_stats and stock_series_30day while producing dashboard_stats in bq-us

Profile: bigquery

Schedule: single_instance

At this point, we have the data we want to show in a dashboard cataloged as dashboard_stats. So next, let us push this to a dashboarding tool.

Exporting the data#

Let us choose a dashboarding tool for this. Given that we are using BigQuery, let us pick from the Google ecosystem.

Design

Google Data studio for dashboard

This platform can read directly from BigQuery. However, it is not compatible with our catalog or the way we organize the tables. Due to immutability, Trel will store the data in some table not known at the time of dashboard creation. How to connect the dashboard to the correct data?

The best solution here is to use a view to hand off the data. This view points to the right table, and the dashboard points to the view. We can write a job to update the view.

We would like one safety feature that we don’t want to update the view if the scheduler runs this job for an older dataset. So, we use the Other datasets mechanism for input selection to get the latest instance of dashboard_stats. In our code, we check the dates to make sure that we only update if the two datasets are the same.

This job also has an output dataset. However, it serves as a marker only and we are under no obligation to create any dataset at the specified location.

Design

Job publish.dashboard_stats reads dashboard_stats while producing dashboard_stats.published in bq-us

Profile: python

Schedule: single_instance

By pointing the dashboard at the view, we are able to get a live dashboard that updates every 5 mins.

Summary#

Finally, we have the following components as a result of this decomposition:

  1. Dataset classes: tweets, tweets_condensed, tweet_stats, stock_ticks, stock_series_30day, dashboard_stats, dashboard_stats.published

  2. Repositories:

    • s3-us-east2 (tweets)

    • bq-us (tweets_condensed, tweet_stats, stock_ticks, stock_series_30day, dashboard_stats, dashboard_stats.published)

  3. Sensors: twitter_amc, finnhub

  4. Jobs: tweets_condensed, tweet_stats, stock_series_30day, dashboard_stats, publish.dashboard_stats

  5. Execution profiles:

    • python (tweets_condensed, publish.dashboard_stats),

    • bigquery (tweet_stats, stock_series_30day, dashboard_stats)

  6. Scheduling classes:

    • single_instance (tweets_condensed, publish.dashboard_stats, dashboard_stats)

    • periodic (tweet_stats, stock_series_30day)

Implementation guidelines#

One of the strengths of Trel is this easy-to-understand break-down of work that can be used to assign responsibilities. We can safely task each sensor or job to a separate resource given the complete decomposition.

Given the catalog entries, Trel jobs are decoupled. To take advantage of this, create sample data for each dataset and register them in the catalog with a qa or dev label. Each resource is now able to develop its job separately. They use the test data and verify that the output matches the sample data provided. Once ready, they can directly deploy the job and even activate scheduling in the production label (master, main, or prod) without waiting on anyone.

This minimizes the need for collaboration and the reduction in productivity due to the lack of communication.

Once all the sensors and jobs are individually tested and are deployed, everything will start working correctly. This minimizes final integration problems.