3.4. Dataform#

3.4.1. Dataform Overview#

Dataform is a serverless service for data analysts to develop and deploy tables, incremental tables, or views to BigQuery. Dataform offers a web environment for SQL workflow development, connection with GitHub, GitLab, Azure DevOps Services, and Bitbucket, continuous integration, continuous deployment, and workflow execution.

Dataform lets you manage data transformation in the Extraction, Loading, and Transformation (ELT) process for data integration. After raw data is extracted from source systems and loaded into BigQuery, Dataform helps you to transform it into a well-defined, tested, and documented suite of data tables.

Main features:

  • Develop and execute SQL workflows for data transformation.

  • Collaborate with team members on SQL workflow development through Git.

  • Manage a large number of tables and their dependencies.

  • Declare source data and manage table dependencies.

  • View a visualization of the dependency tree of your SQL workflow.

  • Manage data with SQL code in a central repository.

  • Reuse code with JavaScript.

  • Test data correctness with quality tests on source and output tables.

  • Version control SQL code.

  • Document data tables inside SQL code.

Repository Project

1. Types of files: (should be put in folder with same name)

  • Config files (JSON or SQLX files): let you configure your SQL workflows. They contain general configuration, execution schedules, or schema for creating new tables and views.

  • Definitions: are SQLX and JavaScript files that define new tables, views, and additional SQL operations to run in BigQuery.

  • Includes: are JavaScript files where you can define variables and functions to use in your project.

2. Workflow development and version control

In Dataform, the workflow development is the same local development, then you can pull changes from the repository, commit all or selected changes, and push them to Git branches of the repository.

In Workflow development, you can:

  • Develop the following SQL workflow actions

    • Source data declarations

    • Tables and views

    • Incremental tables

    • Table partitions and clusters

    • Dependencies between actions

    • Documentation of tables

    • Custom SQL operations

    • BigQuery labels

    • BigQuery policy tags

    • Dataform tags

    • Data quality tests, called assertions

  • Use JavaScript to reuse your Dataform SQL workflow code.

    • Across a file with code encapsulation

    • Across a repository with includes

    • Across repositories with packages

3. Workflow compilation

4. Workflow execution

  • You can schedule Dataform executions in BigQuery in the following ways:

    • Create workflow configurations to schedule executions of compilation results created in release configurations

    • Schedule executions with Cloud Composer

    • Schedule executions with Workflows and Cloud Scheduler

  • To debug errors, you can monitor executions in the following ways:

    • View detailed Dataform execution logs

    • View audit logs for Dataform

    • View Cloud Logging logs for Dataform

3.4.1.1. Terms#

  1. Release configuration: let you configure how Dataform should compile the code of your repository. If your repository is connected to a remote git repository, you can create release configurations from different branches. Dataform will pull code from your remote git repository before compiling it

  2. Workflow configurations: let you schedule workflow executions

  3. Development Workspace: Is the same local development branch (git) in google cloud web workspace

  4. Dataform core package: Is the same python version when develop python programming

3.4.2. Administer & Control Access#

3.4.2.1. Setup Repository#

https://cloud.google.com/dataform/docs/create-repository

3.4.2.2. Connect to GIT repository#

https://cloud.google.com/dataform/docs/connect-repository

3.4.2.3. Config Dataform Settings#

3.4.2.3.1. workflow_settings.yaml#

Repository Workflow setting workflow_settings.yaml stores Dataform workflow settings in the YAML format.

defaultProject: my-gcp-project-id             # BigQuery Google Cloud project ID
defaultDataset: dataform                      # BigQuery dataset in which Dataform creates assets
defaultLocation: asia-southeast1              # default BigQuery dataset region
defaultAssertionDataset: dataform_assertions  # BigQuery dataset in which Dataform creates views with assertion results
vars:
  executionSetting: dev
  environmentName: development

See all configs reference for workflow settings

Access the properties in Dataform code from workflow_settings.yaml options to the code accessible dataform.projectConfig options apply:

  • defaultProject => defaultDatabase.

  • defaultDataset => defaultSchema.

  • defaultAssertionDataset => assertionSchema.

  • projectSuffix => databaseSuffix.

  • datasetSuffix => schemaSuffix.

  • namePrefix => tablePrefix.

use clause:

${when(dataform.projectConfig.vars."YOUR_VARIABLE" === "SET_VALUE", "CONDITION", "ELSE")}
  config { type: "view" }
  SELECT ${when(
    !dataform.projectConfig.tablePrefix,
    "table prefix is set!",
    "table prefix is not set!"
  )}
  select
    *
  from ${ref("data")}
  ${when(
    dataform.projectConfig.vars.executionSetting === "staging",
    "where mod(farm_fingerprint(id) / 10) = 0",
  )}

3.4.2.4. Manage Core Packages#

  • If Only Dataform core package + No addition packages: put Dataform core package in the workflow_settings.yaml

dataformCoreVersion: "3.0.0"        # As a best practice, always use the latest available version of the Dataform core framework                
defaultProject: my-gcp-project-id   # BigQuery Google Cloud project ID
defaultDataset: dataform            # BigQuery dataset in which Dataform creates assets
  • If Dataform core package + Addition packages: put Dataform core package + addition packages in the package.json

{
  "name": "repository-name",
  "dependencies": {
    "@dataform/core": "3.0.0",
    "dataform-scd": "https://github.com/dataform-co/dataform-scd/archive/0.3.tar.gz"
  }
}

remove dataformCoreVersion in workflow_settings.yaml

3.4.2.5. Control Access#

https://cloud.google.com/dataform/docs/required-access

3.4.3. Development#

3.4.3.1. Datafrom Core (SQLX)#

Dataform core for the following purposes:

  • Defining tables, views, materialized views, or incremental tables.

  • Defining data transformation logic.

  • Declaring source data and managing table dependencies.

  • Documenting table and column descriptions inside code.

  • Reusing functions and variables across different queries.

  • Writing data assertions to ensure data consistency.

You can compile and run Dataform core locally through the Dataform CLI outside of Google Cloud.

A SQLX file consists of a config block and a body.

3.4.3.1.1. Config block#

In the config block, you can perform the following actions:

  • Specify query metadata: configure how Dataform materializes queries into BigQuery, for example the output table type, the target database, or labels using the config metadata.

  • Document data: document your tables and their fields directly

  • Define data quality tests (called assertions): check for uniqueness, null values, or a custom condition that run after table creation (also define assertions outside the config block, in a separate SQLX file.)

All config properties, and the config block itself, are optional

config {
  type: "table",
    description: "This table joins orders information from OnlineStore & payment information from PaymentApp",
  columns: {
    order_date: "The date when a customer placed their order",
    id: "Order ID as defined by OnlineStore",
    order_status: "The status of an order e.g. sent, delivered",
    customer_id: "Unique customer ID",
    payment_status: "The status of a payment e.g. pending, paid",
    payment_method: "How the customer chose to pay",
    item_count: "The number of items the customer ordered",
    amount: "The amount the customer paid"
  },
    assertions: {
    uniqueKey: ["id"]
  }
}

3.4.3.1.2. SQLX body#

following actions:

  • Define a table and its dependencies: use SQL SELECT statements and the ref function

ref function use to build a dependency tree of all the tables to be created or updated, lets you reference tables defined in project instead of hard coding the schema and table name

config { type: "table" }

SELECT
  order_date AS date,
  order_id AS order_id,
  order_status AS order_status,
  SUM(item_count) AS item_count,
  SUM(amount) AS revenue

FROM ${ref("store_clean")}

GROUP BY 1, 2, 3

After compilation, the SQL code is:

CREATE
OR REPLACE TABLE Dataform.orders AS
SELECT
    order_date AS date,
    order_id AS order_id,
    order_status AS order_status,
    SUM(item_count) AS item_count,
    SUM(amount) AS revenue
FROM
    Dataform_stg.store_clean
GROUP BY
    1,
    2,
    3
SELECT * FROM ...

post_operations {
  GRANT `roles/bigquery.dataViewer` ON TABLE ${self()} TO "group:someusers@dataform.co"
}
  • Generate SQL code with JavaScript Block: define reusable functions to generate repetitive parts of SQL code

Note: Reuse code defined in a JavaScript block only inside the SLQX file where the block is defined. For global, to reuse code across your entire repository, you can create includes.

js {
  const columnName = "foo";
}

SELECT 1 AS ${columnName} FROM "..."

3.4.3.2. Concept of workspace#

Compiled graph

Filter the graph by the following properties:

  • Name

  • Tag

  • Type

    • Assertion

    • Declaration

    • Incremental Table

    • Materialized view

    • Operations

    • Table

    • Unknown

    • View You can select multiple filters at once. Dataform will apply them with the OR condition.

Repository Structure

  • definitions/: a directory for asset definitions, in Dataform core or JavaScript.

  • includes/: an empty directory for scripts and variables that you can reuse across the repository.

  • workflow_settings.yaml(dataform.json for early version 3.0.0): the default Dataform configuration file containing the Google Cloud project ID and BigQuery schema to publish assets in. You can override the default settings to customize them to your needs, but it’s not a requirement to begin using Dataform.

  • package.json: the default Dataform dependencies configuration file with the latest version of @dataform/core. You can use this file to import packages.

  • definitions/sample.sqlx: a sample SQLX file to help you get started.

3.4.3.3. Dataform Tables#

https://cloud.google.com/dataform/docs/tables

1. type of table

  • table: a regular table.

  • incremental: an incremental table must include a where clause (updated table by insert new records by date)

  • view: a table view

    • materialized: store underlying data under view (combine table and view –> increase performance and cost, but need to refresh continuously)

Other value of type: operations, declaration, assertion,…

2. Partitions and clusters

3. Table/Field description

4. Assertions

  • Test and validate output table. Dataform runs assertions every time it updates your SQL workflow and alerts you if any assertions fail.

5. Config additional table settings

  • Override default table settings, such as database or schema, and disable table creation, or execute a SQL statement before or after table creation

6. Table labels

7. Setting column-level access control

3.4.3.3.1. Create table#

1. ref function: reference and automatically depend on the following objects defined in your Dataform SQL workflow instead of hard coding the schema and table names

  • ${ref(“database”, “schema”, “name”)} : project_id.schema.name

  • ${ref(“schema”, “name”)} : default_project_id.schema.name

  • ${ref(“name”)}: default_project_id.default_schema.name

2. resolve : similar ref but not set the table as a dependency to this action

config { 
    type: "table",
    bigquery: {
        partitionBy: "DATETIME_TRUNC(order_date, DAY)",
        requirePartitionFilter : true,
        partitionExpirationDays: 14,
        clusterBy: ["order_id"]
    },
    dependencies: [ "store_clean", "some_other_table" ] 
}

SELECT
  order_date AS order_date,
  order_id AS order_id,
  order_status AS order_status,
  SUM(item_count) AS item_count,
  SUM(amount) AS revenue

FROM ${ref("store_clean")}

GROUP BY 1, 2

3.4.3.3.2. Incremental table#

Incremental table is the table that was updated/inserted new rows instead rebuild table from the scratch each operation time:

  • Builds the incremental table from scratch only for the first time.

  • During subsequent executions, Dataform only inserts or merges new rows into the incremental table according to the conditions that you configure.

Dataform inserts new rows only into columns that already exist in the incremental table. If you make changes to the incremental table definition query — for example, add a new column — you must rebuild the table from scratch. To do so, the next time you trigger an execution of the table, select the Run with full refresh option.

Use case of Incremetal table

  • Performance optimization: want to only process new records instead of reprocessing the entire table (web logs or analytics data,…)

  • Latency reduction: execute workflows quickly but frequently, reducing the downstream latency of the output tables.

  • Daily snapshots: create daily snapshots of the table data, for example, for longitudinal analysis of user settings stored in a production database.

WHERE clause specify an incremental condition and a non-incremental condition. Dataform applies

  • incremental condition during table execution without a full refresh

  • non-incremental condition during execution with a full refresh.

config { type: "incremental" }

<SELECT_STATEMENT>  # the SELECT statement that defines your table

${when(
    incremental(), 
    `WHERE <INCREMENTAL_CONDITION>`,    # select rows for Dataform to process during table execution without a full refresh
    `WHERE <NON_INCREMENTAL_CONDITION>` # select rows for Dataform to process during table execution with a full refresh
    ) }

`

When you select the full refresh option in execution, Dataform ignores the ${when(incremental(), ... } if not set protected = True in the config block

config { type: "incremental" }

-- Fetches the columns timestamp and message from the source table logs located in the database/schema productiondb.
SELECT timestamp, message FROM ${ref("productiondb", "logs")}

${when(incremental(),
    -- Incremental Mode: appends only rows with a date > the maximum date refers to the current table + country = "UK".
   `WHERE date > (SELECT MAX(date) FROM ${self()}) AND country = "UK"`,
   
    -- Full Refresh Mode: table is rebuilt from scratch, appends all rows with country = "UK".
   `WHERE country = "UK"`)}

Merge rows in an incremental table

When updating the table, Dataform merges rows with uniqueKey instead of appending them.

config {
  type: "incremental",
  uniqueKey: ["transaction_id"]
}

SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

Filter rows in an incremental table

To avoid Dataform scanning the whole table to find matching rows, set updatePartitionFilter to only consider a subset of records.

config {
  type: "incremental",
  uniqueKey: ["transaction_id"],
  bigquery: {
    partitionBy: "DATE(timestamp)",
    updatePartitionFilter:  -- filter to update only the last 24 hours
        "timestamp >= timestamp_sub(current_timestamp(), interval 24 hour)"
  }
}

SELECT timestamp, action FROM weblogs.user_actions
${ when(incremental(), `WHERE timestamp > (SELECT MAX(timestamp) FROM ${self()})`) }

Avoid full table scans

config {
  type: "incremental",
}

pre_operations {
  DECLARE event_timestamp_checkpoint DEFAULT (
    ${when(incremental(),
    `SELECT max(event_timestamp) FROM ${self()}`,
    `SELECT timestamp("2000-01-01")`)}
  )
}

SELECT
  *
FROM
  ${ref("raw_events")}
WHERE event_timestamp > event_timestamp_checkpoint

Protect an incremental table from full refresh

In the config block, enter protected: true

config {
  type: "incremental",
  protected: true
}
SELECT ...

3.4.3.3.3. Document table#

config {
  type: "table",
  description: "Description of the table.",
  columns: {
    column1_name: "Description of the first column",
    column2_name: "Description of the second column",
    column3_name: "Description of the third column",
    record_name: {
      description: "Description of the record.",
      columns: {
       record_column1_name: "Description of the first record column",
       record_column2_name: "Description of the second record column",
      }
    }
  }
}
SELECT
  "first_column_value" AS column_1_name,
  "second_column_value" AS column_2_name,
  "third_column_value" AS column_3_name,
  STRUCT("first" AS record_column1_name,
    "second" AS record_column2_name) AS record_name

Reuse column documentation in Dataform with includes

--// filename is includes/docs.js

const user_id = `A unique identifier for a user`;
const age = `The age of a user`;
const creation_date = `The date this user signed up`;
const user_tenure = `The number of years since the user's creation date`;
const badge_count = `The all-time number of badges the user has received`;
const questions_and_answer_count = `The all-time number of questions and answers the user has created`;
const question_count = `The all-time number of questions the user has created`;
const answer_count = `The all-time number of answers the user has created`;
const last_badge_received_at = `The time the user received their most recent badge`;
const last_posted_at = `The time the user last posted a question or answer`;
const last_question_posted_at = `The time the user last posted an answer`;
const last_answer_posted_at = `The time the user last posted a question`;

module.exports = {
   user_id,
   age,
   creation_date,
   user_tenure,
   badge_count,
   questions_and_answer_count,
   question_count,
   answer_count,
   last_badge_received_at,
   last_posted_at,
   last_question_posted_at,
   last_answer_posted_at,
};
config {
  type: "table",
  description: "Table description.",
  columns: {
    user_id: docs.user_id,
    column2_name: "Description of the second column",
    column3_name: "Description of the third column",
    age: docs.age,
  }
}

SELECT ...

Define whole table document

// filename is includes/docs.js

const columns = {
    user_id = `A unique identifier for a user`,
    age = `The age of a user`,
    creation_date = `The date this user signed up`,
    user_tenure = `The number of years since the user's creation date`,
    badge_count = `The all-time number of badges the user has received`,
    questions_and_answer_count = `The all-time number of questions and answers the user has created`,
    question_count = `The all-time number of questions the user has created`,
    answer_count = `The all-time number of answers the user has created`,
    last_badge_received_at = `The time the user received their most recent badge`,
    last_posted_at = `The time the user last posted a question or answer`,
    last_question_posted_at = `The time the user last posted an answer`,
    last_answer_posted_at = `The time the user last posted a question`,
}


module.exports = {
  columns
};
config { type: "table",
description: "My table description",
columns: docs.columns
}

SELECT 1 AS one

3.4.3.3.4. Table Settings#

Override the schema, database, and name of a selected table

By default, a table follows the schema and database configuration you set in dataform.json/workflow_settings.yaml. The name of a table is the same as the name of the table definition SQLX file.

To override the schema and name of a selected table:

 -- config block
 {
   schema: "OVERRIDDEN_SCHEMA",
   database: "OVERRIDDEN_DATABASE",
   name: "OVERRIDDEN_NAME"
 }

Define a SQL statement to be executed before table creation

Use pre_operations block

  pre_operations {
    CREATE TEMP FUNCTION AddFourAndDivide(x INT64, y INT64)
      RETURNS FLOAT64
      AS ((x + 4) / y);
  }

Define a SQL statement to be executed after table creation

Use post_operations block

    post_operations {
      GRANT `roles/bigquery.dataViewer`
      ON
      TABLE ${self()}
      TO "group:allusers@example.com", "user:otheruser@example.com"
    }

Disable table creation

  • keeps a disabled table in the dependency graph

  • but does not compile and create it

for example: if a table fails and you don’t want your whole workflow to fail while you fix the issue.

  config {
    type: "table",
    disabled: true
  }

  select * from ${ref("source_data")}

3.4.3.3.5. Dependencies#

  • use ref

// filename is incremental_table.sqlx

config { type: "incremental" }

SELECT * FROM ${ref("source_data")}
  • use config block

config { dependencies: [ "some_table", "some_assertion" ] }

3.4.3.3.6. Tags#

config {
  type: "view",
  name: "user_counts",
  tags: ["daily", "hourly"]
}

3.4.3.3.7. Table labels#

https://cloud.google.com/dataform/docs/labels

config {
  type: "table",
  bigquery: {
    partitionBy: "DATE(ts)",
    labels: {
      department: "shipping",
      "cost-center": "logistics"
    }
  }
}

SELECT CURRENT_TIMESTAMP() AS ts

3.4.3.4. SQL Workflow, Variables & Functions#

3.4.3.4.1. Variable & Function#

js {
 const foo = 1;
 function bar(number){
     return number+1;
 }
}

select
 ${foo} as one,
 ${bar(foo)} as two
3.4.3.4.1.1. Re-use across a single SQLX file#
-- // filename is includes/customize.js
const launch_date = "11.11.2011";
const PROJECT_ID = "my_project_name";

function renderScript(table, dimensions, metrics) {
return `
    select
    ${dimensions.map(field => `${field} as ${field}`).join(",")},
    ${metrics.map(field => `sum(${field}) as ${field}`).join(",\n")}
    from ${table}
    group by ${dimensions.map((field, i) => `${i + 1}`).join(", ")}
  `;
}

module.exports = { launch_date, PROJECT_ID , renderScript };
-- Use the constants in the SQL file
config {type: "table"}

SELECT * 
FROM ${customize.PROJECT_ID}.my_schema_name.my_table_name 
WHERE date > ${customize.launch_date}
-- Use the renderScript function to generate a SQL query
config {
  type: "table",
  tags: ["advanced", "hourly"],
  disabled: true
}

${customize.renderScript(ref("source_table"),
                            ["country", "device_type"],
                            ["revenue", "pageviews", "sessions"]
                            )}
3.4.3.4.1.2. Reuse across with includes#

https://cloud.google.com/dataform/docs/reuse-code-includes

Includes are JavaScript constants or functions global to your repository. You define includes in the includes directory of your repository. You can then reuse them across your repository in JavaScript and SQLX files.

3.4.3.4.2. SQL Operator#

file .sqlx with config { type: "operations" }

Use Case 1: Xóa dữ liệu cũ trong bảng

  • Kịch bản: Bạn có một bảng lưu trữ các bản ghi giao dịch và muốn xóa tất cả dữ liệu đã quá 2 năm để giảm chi phí lưu trữ.

  • Giải pháp: Dùng tệp .sqlx với type: operations để thực hiện lệnh SQL DELETE.

  • Lý do dùng type: operations:

    • Việc xóa dữ liệu không phải là quy trình ETL thông thường (như tạo bảng hay view).

    • Cần thực hiện trực tiếp thao tác quản lý dữ liệu.

config { type: "operations" }

DELETE FROM dataset.table WHERE country = 'GB';

DELETE FROM dataset.table WHERE country = 'FR';

Use Case 2: Tạo bảng tạm (Temporary Table) để sử dụng ở các bước khác

  • Kịch bản: Bạn muốn chuẩn bị một bảng tổng hợp tạm thời để sử dụng trong nhiều bước khác trong pipeline của Dataform.

  • Giải pháp: Tạo bảng tạm bằng một tệp .sqlx và thiết lập hasOutput: true.

  • Lý do dùng type: operations: Bảng tạm này không cần định nghĩa như một “target” chuẩn trong Dataform, nhưng vẫn phải tồn tại để phục vụ các bước khác.

--file: definitions/your_file_name.sqlx

config { type: "operations", hasOutput: true }

CREATE OR REPLACE TABLE ${self()} AS
SELECT customer_id, COUNT(*) AS order_count
FROM my_project.my_dataset.orders
WHERE order_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 1 YEAR)
GROUP BY customer_id;
-- In Other File

SELECT *
FROM ${ref("your_file_name")}
WHERE order_count > 10;

Use Case 3: Chạy lệnh DDL (Data Definition Language) để tạo một bảng mới

  • Kịch bản: Bạn cần tạo bảng mới hoặc chỉnh sửa schema của bảng hiện có, điều không thể thực hiện qua các mô hình chuẩn của Dataform.

  • Giải pháp: Dùng tệp .sqlx để chạy lệnh DDL.

  • Lý do:

    • Lệnh DDL không phù hợp để mô hình hóa như bảng hay view trong Dataform.

    • Bạn cần tự thiết kế schema hoặc chạy các thao tác thay đổi bảng.

config { type: "operations" }

CREATE OR REPLACE TABLE my_project.my_dataset.new_table (
  id INT64,
  name STRING,
  created_at TIMESTAMP
);

Use Case 4: Tích hợp kiểm tra dữ liệu tùy chỉnh

  • Kịch bản: Bạn muốn kiểm tra xem dữ liệu trong một bảng có bất kỳ giá trị NULL nào trong cột bắt buộc không.

  • Giải pháp: Dùng type: operations để viết logic kiểm tra dữ liệu.

  • Lý do: Kiểm tra dữ liệu không tạo output cố định, mà chỉ thực hiện logic kiểm tra và báo lỗi nếu cần.

config { type: "operations" }

DECLARE null_count INT64;

SET null_count = (
  SELECT COUNT(*)
  FROM my_project.my_dataset.users
  WHERE email IS NULL
);

IF null_count > 0 THEN
  RAISE ERROR 'Validation failed: NULL values found in "email" column';
END IF;

Use Case 5: Chạy nhiều lệnh SQL cùng lúc

  • Kịch bản: Bạn cần chạy một loạt lệnh SQL trong một bước duy nhất, ví dụ: xóa dữ liệu cũ và sau đó cập nhật trạng thái dữ liệu.

  • Giải pháp: Dùng tệp .sqlx để chạy nhiều lệnh.

  • Lý do: Dễ dàng thực hiện nhiều thao tác SQL mà không cần chia thành các bước riêng biệt.

config { type: "operations" }

-- Xóa dữ liệu 
DELETE FROM my_project.my_dataset.logs
WHERE log_date < DATE_SUB(CURRENT_DATE(), INTERVAL 1 YEAR);

-- Cập nhật trạng thái
UPDATE my_project.my_dataset.logs
SET status = 'archived'
WHERE log_date < CURRENT_DATE();

3.4.3.4.3. Assertion#

An assertion is a data quality test query that finds rows that violate one or more conditions specified in the query. If the query returns any rows, the assertion fails. Dataform runs assertions every time it updates your SQL workflow and it alerts you if any assertions fail.

Dataform automatically creates views (in BigQuery assertions schema) that contain the results of compiled assertion queries.

Assertions for all Dataform table types: tables, incremental tables, views, and materialized views

3.4.3.4.3.1. Built-in assertions#

Add built-in assertions to the config block of a table. Dataform runs these assertions after table creation

This condition asserts that all table rows follow the custom logic you define. The assertion fails if any table row results in false

config {
  type: "table",
  assertions: {
    -- assert that the column is not duplicated (single column)
    uniqueKey: ["user_id"],

    -- assert that the columns is not duplicated (multiple columns)
    uniqueKeys: [
        ["user_id"], -- unique "user_id" for the table
        ["signup_date", "customer_id"] -- unique "customer_id" + "signup_date" for the table
        ],

    -- assert that the columns is not null
    nonNull: ["user_id", "customer_id"],

    -- assert that the columns is not duplicated
    rowConditions: [
      'signup_date is null or signup_date > "2019-01-01"',
      'email like "%@%.%"'
    ]
  }
}
SELECT ...
3.4.3.4.3.2. Manual assertions#

Add manual assertions in a separate SQLX file.

-- definitions/custom_assertion.sqlx

config { type: "assertion" }

SELECT
  *
FROM
  ${ref("sometable")}
WHERE
  a IS NULL
  OR b IS NULL
  OR c IS NULL
3.4.3.4.3.3. Set assertions as dependencies#

https://cloud.google.com/dataform/docs/assertions#add_assertions_as_dependencies

When workflow action B depends on workflow action A that has assertions, failure of assertions of action A does not block Dataform from executing action B. To execute action B only if assertions of action A pass, you need to set assertions of action A as dependencies of action B.

  1. Set selected assertions as dependencies: manually set selected assertions as dependencies by adding them to dependencies: [ "" ] in the config block

    config {
      dependencies: ["assertion_name"]
    }
    
  2. Set assertions of a selected dependency action as dependencies: set the includeDependentAssertions parameter to automatically set all direct assertions of a selected dependency workflow action as dependencies of the edited action

    config {
      dependencies: [{ action: "actionA", includeDependentAssertions: true }]
    }
    
  3. Set assertions of all dependency actions as dependencies: set the dependOnDependencyAssertions parameter to automatically set all direct assertions from all dependency actions of the edited action as additional dependencies of the edited action

    config {
      dependOnDependencyAssertions: true
    }
    

Lưu ý rằng khi bạn thiết lập cả dependOnDependencyAssertionsincludeDependentAssertions trong cùng một tệp, tham số includeDependentAssertions sẽ được ưu tiên. Điều này có nghĩa là nếu bạn thiết lập dependOnDependencyAssertionstrue, nhưng cũng thiết lập includeDependentAssertionsfalse cho một hành động phụ thuộc cụ thể, Dataform sẽ không thêm các assertion của hành động đó vào phụ thuộc.

--Set selected assertions as dependencies
config {
  type: "table",
  dependencies: [ "manual_assertion",  "dataform_sometable_assertions_nonNull" ,  "dataform_sometable_assertions_rowConditions"]
}

SELECT * FROM ${ref("referenced_table")} LEFT JOIN ...
-- Set assertions of a selected dependency action as dependencies
config { type: "ACTION_TYPE" }

SELECT * FROM ${ref({name: "DEPENDENCY_ACTION_NAME", includeDependentAssertions: true})}
-- Set assertions of all dependency actions as dependencies
--// filename is sometableE.sqlx

config {
type: "table",
dependOnDependencyAssertions: true,
dependencies: [ "sometableA", "sometableB" ]
}

SELECT * FROM ${ref("sometableC")}
SELECT * FROM ${ref("sometableD")}

3.4.3.5. Workspace compilation#

3.4.4. Execution & Monitoring#

3.4.4.1. Trigger Execution#

3.4.4.2. Schedule Execution#

3.4.4.3. Monitoring#

3.4.4.4. Best Practice#

3.4.4.4.1. Development#

  1. Having npm, node.js, gcloud installed

  2. Install dataform/cli and dataform/core

$ npm i -g @dataform/cli
$ npm i -g @dataform/core
  1. Install dependencies

# inside the project folder:
$ dataform install
  1. Open source code with VS Code (install extension: Dataform for syntax highlighting)

3.4.4.4.2. Test and compile code#

To check your code:

$ dataform compile

# to view the output of compilation
$ dataform compile --json > compile.json

# to view the output of compilation with custom variables
$ dataform compile --vars={custom_var_name}={customer_var_value} --json > compile.json

To execute your code in your data warehouse

# Init credential
$ dataform init-creds  # use ADC authen as default
$ dataform run
$ dataform run --vars={custom_var_name}={customer_var_value}

# to run all tables from the scratch (the incremental tables)
$ dataform run --full-refresh

# to see the final compiled SQL code without actually executing it
$ datafrom run --dry-run

Remember to run dataform format before committing your code.

Notes:

  • Do not use database credential files, use gcloud auth login instead.

  • Table documentation should be defined separatedly, in includes folder with the naming convention: docs_{table_name}.js.

  • Fields with date/datetime/timestamp should have clear sufix: {field_name}_date/field_name}_datetime/field_name}_timestamp

3.4.4.4.3. How to collaborate in GCP Dataform#

In Google Cloud Console:

  1. Open Dataform in Google Cloud Console: Dataform

  2. Click to a repository that you intend to work on, for example: dw-ods, noted that on dw-ods the default branch is master, you must not commit and push to master directly.

  3. Click the button: Create Development Workspace in the screen (this step is equivalent to create a local branch since Development Workspace in Dataform equals to local branch in git)

  4. Name your new development workspace (this name should be simple yet self-explained)

  5. Open your new development workspace, commit and push it to github

  6. Start coding, either directly in Google Cloud Console (Dataform) or in your local machine

Otherwise, you can create a branch in your local machine, push to remote git repository then create dataform Developement Workspace (with the same name) later.

Rule:

  • During development, you only push to your corresponding development branch, not the default branch. After development completed, create a pull request in github to default branch. Your pull request must be reviewed and approved by another team member

  • After golive, you have to delete manually your development branch in GCP Dataform to avoid redundant development workspace in GCP Dataform project

  • Get an aggreement on how to manage code lifecycle the right way at:

3.4.4.4.4. Naming conventions#

3.4.4.4.4.1. Objects Action#

Object Name Format:

<zone>_<business_domain>_<table_role>_<update_frequency>_<execution_type>_<table_name>
  1. Zone (<zone>): Indicates the stage of the data pipeline.

    • stg for staging.

    • prep for preprocessing.

    • ft for feature engineering.

    • hlp for helper datasets.

    • vld for assertions.

  2. Business Domain (<business_domain>): Represents the functional or business domain.

    • info: Account information

    • trm: Trading management

    • avb: App event behavior

    • tra: Trading activity

    • mki: Market information

    • oth: Others

  3. Table Role (<table_role>)

    • dim: Contain descriptive, categorical data

    • fact: Contain measurable, numerical data

    • lkp: Contain mappings or relationships.

    • agg: Contain pre-aggregated data for performance optimization.

  4. Frequency (<update_frequency>): Indicates the update cadence.

    • daily

    • weekly

    • monthly

    • adhoc

  5. Execution Type (<execution_type>): Specifies how the table is built.

    • lastest for full data loads.

    • incr for incremental updates.

    • batch for batch processing.

    • stream for streaming pipelines.

  6. Table Name (<table_name>): Describes the content or purpose of the table. Use snake_case for readability.

    • customer_transactions

    • product_recommendations.

3.4.4.4.4.2. Tags & Labels#

Dataform objects must have at least one tag for dynamic execution.

labels:
  loading-type: # {frequency}-{type of execution}
    frequency:
      - daily
      - weekly
      - monthly
      - quarterly
      - yearly
    type of execution:
      - full_load
      - incremental_load
  domain: 
    - trading_management
    - account_info
    - app_event
    - market_info
    - trading_activity
  model:
    - product_recommendations
  layer:
    - staging
    - preprocessing
    - features
tags:
  domain: # domain-{domain name}
    - domain-account_info
    - domain-app_event
    - domain-market_info
    - domain-trading_activity
    - domain-trading_management
    - model-product_recommendations
  env:
    - env-train_dataset
    - env-full_dev
    - env-full_production
    - `env-full_${constants.ENV}`
  layer:
    - layer-udf
    - layer-helper
    - layer-staging
    - layer-preprocessing
    - layer-feature
  schedule: # scheduler-{time}
  - "scheduler-after_dwh"
  - "scheduler-backdate"
  - "scheduler-initial_setup"

3.4.4.5. References#

  1. Use the open source dataform/cli

  2. How-to: Configure additional table settings in table definition files

Relates to naming convention, coding style guide:

  1. SQL style guide | Gitlab

  2. Best practices guides from dbt project

  3. dbt guides | Gitlab

  4. dbt style guide | dbt labs

3.4.4.6. Projects#

  1. Feature Engineering (Pinetree): datkt1998/trading-aiml-feature_engineering