Data Pipeline in Snowflake
Snowflake is emerging as one of the leaders in Cloud DW. It provides enormous number of features, as compared to the other tradition databases, to start an application. Recently, I was involved in the implementation of a Data Pipeline, using Snowflake, framework for my ML models. The ETL team was busy in their day-to-day tasks & I was given a longer waiting time for the data to be moved in. I was supposed to execute my models on a series of JSON files that the sources were producing. Here I will discuss only about moving the data to Snowflake from S3 bucket.
There were 6 sources who were sending their data to the S3 bucket.
Few questions that came in –
• How to parse the data?
• How to move the data to Snowflake?
Option 1 — Since Snowflake, provides feature to parse JSON using queries, we can move the data to Snowflake, parse the JSON, transform it and load in my table.
Option 2 — Parse the JSON & create another file in S3 bucket pull the data into the S3 bucket.
Since, I didn’t have any ETL tool with me & Lambda does not support more than 15 min processing, I decided to go with Option 1.
Step 1 — Load data “as is” into Snowflake
An external (i.e. S3) stage specifies where data files are stored so that the data in the files can be loaded into a table. Data can be loaded directly from files in a specified S3 bucket, with or without a folder path (or prefix, in S3 terminology). If the path ends with /, all of the objects in the corresponding S3 folder are loaded. In addition to loading directly from files in S3 buckets, Snowflake supports creating named external stages, which encapsulate all of the required information for staging files, including:
• The S3 bucket where the files are staged.
• The named storage integration object or S3 credentials for the bucket (if it is protected).
• An encryption key (if the files in the bucket have been encrypted).
Step 1a — Create a table with only one column
CREATE OR REPLACE TABLE DB1.SCHEMA1.TBL_NM (JSON_COLUMN VARINT);
Step 1b — Create the Storage Integration
CREATE STORAGE INTEGRATION ML1
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = ‘arn:aws:iam::xxxx:role/snowflake_role’
STORAGE_ALLOWED_ALLOCATIONS = (‘s3://xxxxxx/xxxx’);
Step 1c — Create the File Format
CREATE FILE FORMAT DB1.SCHEMA1.JSON_FORMAT
TYPE = ‘JSON’
COMPRESSION = ‘AUTO’
ENABLE_OCTAL = FALSE
ALLOW_DUPLICATE = TRUE
STRIP_OUTER_ARRAY = FALSE
STRIP_NULL_VALUES = FALSE
IGNORE_UTF8_ERRORS = FALSE
Step 1d — Create Stage
CREATE OR REPLACE STAGE STG1
URL = ‘s3://xxxxxx/xxxx’ ## this folder should be same as that of the Storage Integration Folder
STORAGE_INTEGRATION = ML1;
Step 1e — Create the COPY statement and test in SNOWSQL (CLI)
COPY INTO TBL_NM FROM @STG1 FILE_FORMAT = DB1.SCHEMA1.JSON_FORMAT;
Step 2 — Create SQS on top of the S3 bucket and call the Lambda. In the Lambda, put the Copy command to executed. It will put data into snowflake as soon as the data file arrives from the application into S3 bucket.
Step 3 — Once you validate that the data is getting loaded into the table, create SQL to parse JSON from that table.
SELECT
JSON_COLUMN: ELEMENT1.ELEMENT11.ELEMENT111::INTEGER AS ID,
JSON_COLUMN: ELEMENT1.ELEMENT11.ELEMENT112::STRING AS COL1,
JSON_COLUMN: ELEMENT2[0].ELEMENT22[0]::STRING AS COL2
FROM
DB1.SCHEMA1.TBL_NM;
Step 4 — Create the Target table
CREATE OR REPLACE TABLE INTG_TBL1 (ID INTEGER, COL1 STRING, COL2 STRING);
Step 5 — Enable the SQS
Once it was successful with 1 table I created other tables for other sources and started loading the data. I created separate folder for the Apps in the same S3 bucket. We can put a piece of code for the lambda to handle different folder names for different tables. Till here, I was done loading the data in the stage table. Now I had to data cleansing and feature selection in the data and put it in the Semantic zone. I decided to go for an asynchronous approach. I decided that even if the data files are coming every minute, I don’t need the data to be refreshed so frequently in my semantic layer. So I created Stored Proc to load the data and created Snowflake Task to schedule the job from Snowflake.
Step 6 — Created JOB_PARAMETER_TABLE to contain all the SQLs to load the data from Staging to Semantic. It will contain all the JSON Parsing (which is mentioned in Step 3).
Step 7 — Created JPB_LOG_TABLE to keep track of the data loads from Staging to Semantic.
CREATE OR REPLACE PROCEDURE DATA_LOAD()
RETURNS VARCHAR
NOT NULL
LANGUAGE JAVASCRIPT
AS
$$
var res_stmt;
var sql_cmd = ‘ INSERT INTO …… ;`; ## This is not single quote. This is back quote.
var stmt = snowflake.createStatement ( {sqlText: sql_cmd} );
try {
var result_set = stmt.execute();
……
} catch (err) {
}
return res_stmt;
$$;
Step 8 — Created the task to Schedule the job. I had few stored proc. So I sequenced the loading of the stored proc in the task.
CREATE OR REPLACE TASK STAGING_TASK_1
WAREHOUSE = WH1
SCHEDULE = ‘120 MINUTE’
AS
CALL PROC_1();
## Dependent Task
CREATE OR REPLACE TASK STAGING_TASK_2
WAREHOUSE = WH1
AFTER STAGING_TASK_1
AS
CALL PROC2()