Apache Airflow as an ETL/ELT tool to Snowflake

Adrian Lee Xinhan
6 min readAug 16, 2021

--

In this post, we will be looking at how we can use Apache Airflow to allow us to perform ETL and ELT jobs into any database/data warehouse. In this case, we will be using Snowflake as our data warehouse to store our data.

A general architecture is shown as below.

Set up of environment

First, let’s get our environment set up. We would need to have Docker install on your local computer: https://www.docker.com/products/docker-desktop

We will now get the docker-compose file or Airflow. To do so lets do a curl of the file curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.2/docker-compose.yaml'

As we will be installing our snowflake connectors the pip packages for Snowflake, we would first need to create 2 additional files

  1. requirements.txt: This is requirements file which will contain the necessary snowflake libraries and connectors for airflow

requirements.txt

pandas
apache-airflow-providers-snowflake==2.1.0
snowflake-connector-python==2.5.1
snowflake-sqlalchemy==1.2.5

2. Dockerfile: This is requirements file which will contain the necessary snowflake libraries and connectors for airflow

Dockerfile

FROM apache/airflow:2.1.2
COPY requirements.txt .
RUN pip install -r requirements.txt

We would now need to create 2 folders files_to_analyse and files_to_upload

Your tree repository should look like this

Let’s run our docker-compose up and go to http://localhost:8080/. The default username is airflow and password is airflow

Adjusting our docker-compose file

We will be now adjusting our docker-compose file. There are 2 parts which we will be adjusting

  • We will be commenting out the image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.2} and include in the build: . in the line as below
version: '3'
x-airflow-common:
&airflow-common
build: . # add this in
#image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.2}
environment:
  • We will be add in our 2 folders as volumes. The files_to_upload is the folder where the csv are placed for Airflow to pick up and analyse. The files_to_analyse is the folder after the ETL workflow has processed the file and will stage the file to be uploaded to our S3 bucket
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./files_to_upload:/files_to_upload # add this in
- ./files_to_analyse:/files_to_analyse # add this in

Preparing our Snowflake Environment and Employees CSV file

We will now prepare our Snowflake environment.

Go to your Snowflake account and then create the database, table and stage for our S3 bucket.

CREATE DATABASE AIRFLOW;
USE AIRFLOW;
CREATE TABLE PUBLIC.EMPLOYEES (NAME STRING, AGE INT, GENDER STRING);
COPY into AIRFLOW.PUBLIC.EMPLOYEES (NAME,AGE,GENDER)
from @airflow_stage/ force=true;
create or replace stage airflow_stage
url = 's3://<REPLACE_WITH YOUR S3 BUCKET/'
credentials = (aws_key_id = '<Your AWS Key>' aws_secret_key = 'Your AWS Secret Key')
file_format=(type = 'CSV');

We will also prepare our employees.csv file as below and put it inside our files_to_analyse folder that we created in the Set up of our environment

name,age,gender
Brian,23,M

This will be our updated folder structure

Preparing our Airflow environment

We will now prepare our Airflow environment. Head over to the http://localhost:8080/ and go to admin > Connections

We will create a snowflake_conns id of snowflake connection type as shown below and fill in the necessary details.

A sample connection is as shown below.

Key in the below details:

  • Conn id: Unique connection ID such as snowflake_conns
  • Conn Type: Choose Snowflake
  • Host: Snowflake host connection such as demo9.apsoutheast-1.snowflakingcomputing.com
  • Schema: The schema of the database such as Public
  • Login: You login id
  • Account: Account name such as demo9
  • Database: The database which we created earlier which is airflow
  • Region: The region of your snowflake account such as apsoutheast-1
  • Role : The role in your Snowflake account
  • Warehouse: The warehouse in your Snowflake account

We are also going to create variables in our Airflow to store our AWS secret key id and secret access key. Head over to Admin > Variables and create

  • aws_access_key_id: Your AWS access key id
  • aws_secret_access_key: Your AWS secret access key

Writing our DAG

A Dynamic Acyclic Graph (DAG) and it represents the collection of tasks that you want to run. When we have workers set up, our tasks will run on different tasks. We will be writing our DAG in Python.

When you ran your docker-compose up in the first step, there should be a folder called dag created as shown in the picture below.

Let’s go over to our dag folder and create our dag. In this case, I have named it as Process_Data_Snowflake.py

In this Python DAG, we are going to write 3 tasks

  1. process_transform_data_task: This will process our employees.csv file on the files_to_analyse folder and upper case our NAME column. It will then save the updated csv file on the files_to_upload folder.
  2. upload_to_S3_task: This will upload the CSV file from the files_to_upload folder into our S3 bucket which we created in the Set up of environment
  3. upload_to_Snowflake_task: This will perform the copy command to copy the data from the CSV file in the S3 bucket into Snowflake

ETL_Snowflake.py

import pandas as pd
import glob
import os
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
import boto3
import os
from datetime import datetime
import snowflake.connector as sf
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 8, 1),
'retries': 0
}
def aws_session(region_name='ap-southeast-1'):
return boto3.session.Session(aws_access_key_id=Variable.get("aws_access_key_id"),
aws_secret_access_key=Variable.get("aws_secret_access_key"),
region_name=region_name)
query1 = [
"""COPY into AIRFLOW.PUBLIC.EMPLOYEES (NAME,AGE,GENDER)
from @airflow_stage/;""",
]
def upload_files_to_S3(bucket_name,path):
session = aws_session()
print("=== Start uploading S3 files === ")
s3_resource = session.resource('s3')
bucket = s3_resource.Bucket(bucket_name)
for subdir, dirs, files in os.walk(path):
for file in files:
full_path = os.path.join(subdir, file)

with open(full_path, 'rb') as data:
print(full_path[len(path):])
bucket.put_object(Key=full_path[len(path):], Body=data)
print("=== Finish uploading S3 files === ")
def process_transform_data(path, upload_path):
print("=== Start processing files === ")
print(path)
for filename in glob.glob(os.path.join(path, '*.csv')):
with open(os.path.join(os.getcwd(), filename), 'r') as f:
root_ext = os.path.splitext(filename)
date = root_ext[0].rsplit('/', 1)[-1]
print(date)
df = pd.read_csv(f)
# Transform
df['NAME'] = df['NAME'].str.upper()
df.to_csv(upload_path + str(date) +'.csv', index=False, header=None)
print("=== End processing files === ")
# Using the context manager alllows you not to duplicate the dag parameter in each operator
with DAG('ETL_Snowflake', default_args=default_args, schedule_interval='@once') as dag:
process_transform_data_task = PythonOperator(
task_id='process_transform_data',
python_callable=process_transform_data,
op_kwargs={
'path': '/files_to_analyse/',
'upload_path': '/files_to_upload/'
},
dag=dag)
upload_to_S3_task = PythonOperator(
task_id='upload_to_S3',
python_callable=upload_files_to_S3,
op_kwargs={
'path': '/files_to_upload/',
'bucket_name': '<YOUR_BUCKET_NAME>',
},
dag=dag)
upload_to_Snowflake_task = SnowflakeOperator(
task_id="upload_to_Snowflake",
sql=query1,
snowflake_conn_id="snowflake_conns",
dag=dag
)
# Use arrows to set dependencies between tasks
process_transform_data_task >> upload_to_S3_task >> upload_to_Snowflake_task

Running our DAG

Now lets go to the DAG in our Airflow and search for ETL_Snowflake and click on the play button.

If all goes well, you should see that the DAG has run successfully

Our data is successfully uploaded into our table!

--

--