Tutorial: Create your first pipeline using the Lakeflow Pipelines Editor

Learn how to create a new pipeline using Lakeflow Spark Declarative Pipelines (SDP) for data orchestration and Auto Loader. This tutorial extends the sample pipeline by cleaning the data and creating a query to find the top 100 users.

In this tutorial, you learn how to use the Lakeflow Pipelines Editor to:

  • Create a new pipeline with the default folder structure and start with a set of sample files.
  • Define data quality constraints using expectations.
  • Use the editor features to extend the pipeline with a new transformation to perform analysis on your data.

Requirements

Before you start this tutorial, you must:

  • Be logged into a Azure Databricks workspace.
  • Have Unity Catalog enabled for your workspace.
  • Have permission to create a compute resource or access to a compute resource.
  • Have permissions to create a new schema in a catalog. The required permissions are ALL PRIVILEGES or USE CATALOG and CREATE SCHEMA.

Step 1: Create a pipeline

In this step, you create a pipeline using the default folder structure and code samples. The code samples reference the users table in the wanderbricks sample data source.

  1. In your Azure Databricks workspace, click Plus icon. New, then Pipeline icon. ETL pipeline. This opens the pipeline editor with a default pipeline name like New Pipeline <date> <time>.

  2. (Optional) Select the name and enter a descriptive name for the pipeline.

  3. (Optional) To the right of the name, click the catalog and schema to set different defaults.

  4. (Optional) In the my_transformation source file created for you, select Python or SQL from the language drop-down list to set the language for the file.

  5. Click Code icon. Use sample code.

    Sample code in your selected language appears in the my_transformation source file in the transformations folder. The output datasets have not yet been created, and the Pipeline graph on the right side of the screen is empty.

  6. To run the pipeline code (the code in the transformations folder), click Run pipeline in the upper right part of the screen.

    After the run completes, the bottom part of the workspace shows the two new tables that were created, sample_users_<date_time> and sample_aggregation_<date_time>. The Pipeline graph on the right side of the workspace now shows the two tables, including that sample_users is the source for sample_aggregation.

Step 2: Apply data quality checks

In this step, you add a data quality check to the sample_users table. You use pipeline expectations to constrain the data. In this case, you delete any user records that do not have a valid email address, and output the cleaned table as users_cleaned.

  1. In the pipeline asset browser, click Plus icon., and select Transformation.

  2. In the Create new transformation file dialog, make the following selections:

    • Choose either Python or SQL for the Language. This does not have to match your previous selection.
    • Give the file a name. In this case, choose users_cleaned.
    • For Destination path, leave the default.
    • For Dataset type, either leave it as None selected or choose Materialized view. If you select Materialized view, it generates sample code for you.
  3. Click Create to create the transformation code file.

  4. In your new code file, edit the code to match the following (use SQL or Python, based on your selection on the previous screen). Replace sample_users_<date_time> with the full name of your sample_users table.

    SQL

    -- Drop all rows that do not have an email address
    
    CREATE MATERIALIZED VIEW users_cleaned
    (
      CONSTRAINT non_null_email EXPECT (email IS NOT NULL) ON VIOLATION DROP ROW
    ) AS
    SELECT *
    FROM sample_users_<date_time>;
    

    Python

    from pyspark import pipelines as dp
    
    # Drop all rows that do not have an email address
    
    @dp.materialized_view
    @dp.expect_or_drop("no null emails", "email IS NOT NULL")
    def users_cleaned():
        return (
            spark.read.table("sample_users_<date_time>")
        )
    
  5. Click Run pipeline to update the pipeline. It should now have three tables.

Step 3: Analyze top users

Next get the top 100 users by the number of bookings they have created. Join the wanderbricks.bookings table to the users_cleaned materialized view.

  1. In the pipeline asset browser, click Plus icon., and select Transformation.

  2. In the Create new transformation file dialog, make the following selections:

    • Choose either Python or SQL for the Language. This does not have to match your previous selections.
    • Give the file a name. In this case, choose users_and_bookings.
    • For Destination path, leave the default.
    • For Dataset type, leave it as None selected.
  3. In your new code file, edit the code to match the following (use SQL or Python, based on your selection on the previous screen).

    SQL

    -- Get the top 100 users by number of bookings
    
    CREATE OR REFRESH MATERIALIZED VIEW users_and_bookings AS
    SELECT u.name AS name, COUNT(b.booking_id) AS booking_count
    FROM users_cleaned u
    JOIN samples.wanderbricks.bookings b ON u.user_id = b.user_id
    GROUP BY u.name
    ORDER BY booking_count DESC
    LIMIT 100;
    

    Python

    from pyspark import pipelines as dp
    from pyspark.sql.functions import col, count, desc
    
    # Get the top 100 users by number of bookings
    
    @dp.materialized_view
    def users_and_bookings():
        return (
            spark.read.table("users_cleaned")
            .join(spark.read.table("samples.wanderbricks.bookings"), "user_id")
            .groupBy(col("name"))
            .agg(count("booking_id").alias("booking_count"))
            .orderBy(desc("booking_count"))
            .limit(100)
        )
    
  4. Click Run pipeline to update the datasets. When the run is complete, you can see in the Pipeline Graph that there are four tables, including the new users_and_bookings table.

    Pipeline graph showing four tables in pipeline

Next steps

Now that you have learned how to use some of the features of the Lakeflow pipelines editor and created a pipeline, here are some other features to learn more about: