Skip to main content

How to create incremental models in MATE

Creating an incremental model is simple as this example demonstrates. We have a source table that gets loaded once a day with data. From that, we create an incremental model that only processes "new" records loaded. This ensures a lighter model that just inserts new rows rather than recreating the entire table from scratch.

Setup

Let's say we have the following setup:

  1. An ingestion table staging our daily load. We will call this SOURCE_TABLE

    SOURCE_TABLE __shadow__

    The table is set up as a source:

    dataops/modelling/sources/source_table.yml
    version: 2

    sources:
    - name: staging
    tables:
    - name: source_table
    columns:
    - name: name
    - name: schema
    - name: load_date
    - name: load_count
  2. An incremental model history_inc:

    dataops/modelling/models/history_inc.sql
     {{ config(
    materialized='incremental'
    ) }}

    select
    name,
    schema,
    load_date,
    load_count
    from {{source('staging', 'source_table') }}

    {% if is_incremental() %}

    -- this filter will only be applied on an incremental run
    where load_date > (select max(load_date) from {{ this }})

    {% endif %}

A couple of things happen here:

  • We configure our model with materialized = 'incremental'. The first run rebuilds the model entirely, every subsequent run that does not have the --full-refresh option set, triggers the is_incremental() clause. More on that in 2.3.

  • We selecting from {{source('staging', 'source_table') }} to read our SOURCE_TABLE.

  • We have our is_incremental() clause in Jinja {% if ... %} defining how we grab new rows. The condition what are new rows is:

    where load_date > (select max(load_date) from {{ this }}

    where we say, "give me only rows with load_date more recent than what is currently the oldest row in this table”.

If you are interested to read more about Jinja, head to our template rendering section.

The initial pipeline run

The initial pipeline run will produce the history_inc model:

initial pipeline run __shadow__

We can see that history_inc gets created. Checking the content, it is the same as the original table:

source and incremental model tables __shadow__

Subsequent pipeline runs

Now let's assume that ingestion happened and our original table had changed. It has an additional row that looks like row 5:

updated SOURCE_TABLE after ingestion __shadow__

The subsequent MATE job run triggers the where clause and just selects the rows that correspond to it.

subsequent MATE job run __shadow__

What happens is that the new row 5 gets added to the history_inc model:

row 5 gets added to the history model from SOURCE_TABLE __shadow__

Things to consider

You should be careful when using incremental runs, defining where to put is_incremental() macro may greatly lighten your model and execution time but one should take caution i.e when using window functions.