Apache Airflow as an ETL/ELT tool to Snowflake
In this post, we will be looking at how we can use Apache Airflow to allow us to perform ETL and ELT jobs into any database/data warehouse. In this case, we will be using Snowflake as our data warehouse to store our data.
A general architecture is shown as below.
Set up of environment
First, let’s get our environment set up. We would need to have Docker install on your local computer: https://www.docker.com/products/docker-desktop
We will now get the docker-compose file or Airflow. To do so lets do a curl of the file curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.1.2/docker-compose.yaml'
As we will be installing our snowflake connectors the pip packages for Snowflake, we would first need to create 2 additional files
- requirements.txt: This is requirements file which will contain the necessary snowflake libraries and connectors for airflow
requirements.txt
pandas
apache-airflow-providers-snowflake==2.1.0
snowflake-connector-python==2.5.1
snowflake-sqlalchemy==1.2.5
2. Dockerfile: This is requirements file which will contain the necessary snowflake libraries and connectors for airflow
Dockerfile
FROM apache/airflow:2.1.2
COPY requirements.txt .
RUN pip install -r requirements.txt
We would now need to create 2 folders files_to_analyse
and files_to_upload
Your tree repository should look like this
Let’s run our docker-compose up
and go to http://localhost:8080/
. The default username is airflow
and password is airflow
Adjusting our docker-compose file
We will be now adjusting our docker-compose file. There are 2 parts which we will be adjusting
- We will be commenting out the
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.2}
and include in thebuild: .
in the line as below
version: '3'
x-airflow-common:
&airflow-common
build: . # add this in
#image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.2}
environment:
- We will be add in our 2 folders as volumes. The
files_to_upload
is the folder where the csv are placed for Airflow to pick up and analyse. Thefiles_to_analyse
is the folder after the ETL workflow has processed the file and will stage the file to be uploaded to our S3 bucket
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
- ./files_to_upload:/files_to_upload # add this in
- ./files_to_analyse:/files_to_analyse # add this in
Preparing our Snowflake Environment and Employees CSV file
We will now prepare our Snowflake environment.
Go to your Snowflake account and then create the database, table and stage for our S3 bucket.
CREATE DATABASE AIRFLOW;
USE AIRFLOW;
CREATE TABLE PUBLIC.EMPLOYEES (NAME STRING, AGE INT, GENDER STRING);COPY into AIRFLOW.PUBLIC.EMPLOYEES (NAME,AGE,GENDER)
from @airflow_stage/ force=true;create or replace stage airflow_stage
url = 's3://<REPLACE_WITH YOUR S3 BUCKET/'
credentials = (aws_key_id = '<Your AWS Key>' aws_secret_key = 'Your AWS Secret Key')
file_format=(type = 'CSV');
We will also prepare our employees.csv
file as below and put it inside our files_to_analyse folder
that we created in the Set up of our environment
name,age,gender
Brian,23,M
This will be our updated folder structure
Preparing our Airflow environment
We will now prepare our Airflow environment. Head over to the http://localhost:8080/
and go to admin > Connections
We will create a snowflake_conns
id of snowflake
connection type as shown below and fill in the necessary details.
A sample connection is as shown below.
Key in the below details:
- Conn id: Unique connection ID such as
snowflake_conns
- Conn Type: Choose Snowflake
- Host: Snowflake host connection such as
demo9.apsoutheast-1.snowflakingcomputing.com
- Schema: The schema of the database such as
Public
- Login: You login id
- Account: Account name such as
demo9
- Database: The database which we created earlier which is
airflow
- Region: The region of your snowflake account such as
apsoutheast-1
- Role : The role in your Snowflake account
- Warehouse: The warehouse in your Snowflake account
We are also going to create variables in our Airflow to store our AWS secret key id and secret access key. Head over to Admin
> Variables
and create
- aws_access_key_id: Your AWS access key id
- aws_secret_access_key: Your AWS secret access key
Writing our DAG
A Dynamic Acyclic Graph (DAG) and it represents the collection of tasks that you want to run. When we have workers set up, our tasks will run on different tasks. We will be writing our DAG in Python.
When you ran your docker-compose up
in the first step, there should be a folder called dag
created as shown in the picture below.
Let’s go over to our dag
folder and create our dag. In this case, I have named it as Process_Data_Snowflake.py
In this Python DAG, we are going to write 3 tasks
- process_transform_data_task: This will process our employees.csv file on the
files_to_analyse
folder and upper case our NAME column. It will then save the updated csv file on thefiles_to_upload
folder. - upload_to_S3_task: This will upload the CSV file from the
files_to_upload
folder into our S3 bucket which we created in theSet up of environment
- upload_to_Snowflake_task: This will perform the copy command to copy the data from the CSV file in the S3 bucket into Snowflake
ETL_Snowflake.py
import pandas as pd
import glob
import os
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
import boto3
import os
from datetime import datetime
import snowflake.connector as sf
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
from airflow.models import Variabledefault_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 8, 1),
'retries': 0
}def aws_session(region_name='ap-southeast-1'):
return boto3.session.Session(aws_access_key_id=Variable.get("aws_access_key_id"),
aws_secret_access_key=Variable.get("aws_secret_access_key"),
region_name=region_name)
query1 = [
"""COPY into AIRFLOW.PUBLIC.EMPLOYEES (NAME,AGE,GENDER)
from @airflow_stage/;""",
]
def upload_files_to_S3(bucket_name,path):
session = aws_session()
print("=== Start uploading S3 files === ")
s3_resource = session.resource('s3')
bucket = s3_resource.Bucket(bucket_name)
for subdir, dirs, files in os.walk(path):
for file in files:
full_path = os.path.join(subdir, file)
with open(full_path, 'rb') as data:
print(full_path[len(path):])
bucket.put_object(Key=full_path[len(path):], Body=data)
print("=== Finish uploading S3 files === ")def process_transform_data(path, upload_path):
print("=== Start processing files === ")
print(path)
for filename in glob.glob(os.path.join(path, '*.csv')):
with open(os.path.join(os.getcwd(), filename), 'r') as f:
root_ext = os.path.splitext(filename)
date = root_ext[0].rsplit('/', 1)[-1]
print(date)
df = pd.read_csv(f)
# Transform
df['NAME'] = df['NAME'].str.upper() df.to_csv(upload_path + str(date) +'.csv', index=False, header=None)
print("=== End processing files === ")# Using the context manager alllows you not to duplicate the dag parameter in each operator
with DAG('ETL_Snowflake', default_args=default_args, schedule_interval='@once') as dag: process_transform_data_task = PythonOperator(
task_id='process_transform_data',
python_callable=process_transform_data,
op_kwargs={
'path': '/files_to_analyse/',
'upload_path': '/files_to_upload/'
},
dag=dag) upload_to_S3_task = PythonOperator(
task_id='upload_to_S3',
python_callable=upload_files_to_S3,
op_kwargs={
'path': '/files_to_upload/',
'bucket_name': '<YOUR_BUCKET_NAME>',
},
dag=dag)
upload_to_Snowflake_task = SnowflakeOperator(
task_id="upload_to_Snowflake",
sql=query1,
snowflake_conn_id="snowflake_conns",
dag=dag
) # Use arrows to set dependencies between tasks
process_transform_data_task >> upload_to_S3_task >> upload_to_Snowflake_task
Running our DAG
Now lets go to the DAG in our Airflow and search for ETL_Snowflake
and click on the play button.
If all goes well, you should see that the DAG has run successfully
Our data is successfully uploaded into our table!