Using Glue to do ETL/ELT into Snowflake
Note: Opinions expressed are solely my own and do not express the views or opinions of Snowflake, AWS or any other technology involved in this post
Introduction
Extract, Transform and load are central tools to any data warehousing project. There have been many discussions into which is better? Leveraging code or using available ETL or ELT integration tools.
With AWS Glue, software engineers are able to have a combination of customisable code alongside easy-to-use data transformation
This guide is to showcase how to use Glue, connect with MySQL, doing data transformation and storing the data into Snowflake
Pre-requisites
At the time of this deployment, I have
1. Deployed MySQL 8.0 onto an EC2 instance in AWS. For this you will need have a customer jar file to connect to MySQL 8.0 as MySQL 8.0 is not supported on AWS RDS as yet. You can choose to use a MySQL RDS instead with 5.0 engine. Once you have deployed the MySQL database, create the below table and insert in some data
CREATE database sample;USE table sample;CREATE TABLE test_table1 (id INT, name VARCHAR(45));INSERT INTO test_table1 VALUES(1, ‘Joe’), (2, ‘Jane’), (3, ‘Jack’), (4, ‘Jessica’);
2. Deployed Snowflake account and created the below database and table inside Snowflake. You can get a free trial account here: https://signup.snowflake.com/
Create database glue_demo;use schema public;CREATE TABLE test_table1 (id INT, name VARCHAR(45));
3. Snowflake spark connector : spark-snowflake_2.11–2.8.3-spark_2.4.jar
4. Snowflake spark connector : snowflake-jdbc-3.12.16
5. MySQL JAR connector : mysql-connector-java-8.0.25
Overall Architecture
From the architecture above, what happens is that we are going to save some data into our test_table1 in MySQL. What happens is that a Glue job will be used to extract that data from our test_table1 in MySQL, perform some data manipulation (in our case we are going to capitalise our Name column) and then save that data into our Snowflake table test_table1.
Setting up MySQL on AWS EC2
In our MySQL instance on EC2, perform the below commands
ubuntu@ip-xx-xx-xx-xx:~$ sudo apt updateubuntu@ip-xx-xx-xx-xx:~$ sudo apt install mysql-serverubuntu@ip-xx-xx-xx-xx:~$ sudo mysql -u root -pEnter password:Welcome to the MySQL monitor. Commands end with ; or \g.Your MySQL connection id is 9Server version: 8.0.25-0ubuntu0.20.04.1 (Ubuntu)mysql> CREATE USER 'adrian'@'%' IDENTIFIED BY '<YOUR_PASSWORD>';Query OK, 0 rows affected (0.01 sec)mysql> GRANT ALL ON *.* TO 'adrian'@'%';Query OK, 0 rows affected (0.01 sec)mysql> flush privileges;Query OK, 0 rows affected (0.01 sec)mysql> exitubuntu@ip-xx-xx-xx-xx:~$: sudo mysql -u adrian -pEnter password:Welcome to the MySQL monitor. Commands end with ; or \g.Your MySQL connection id is 15Server version: 8.0.25-0ubuntu0.20.04.1 (Ubuntu)mysql> create database sample;mysql> create database sample;Query OK, 1 row affected (0.00 sec)mysql> show databases;+--------------------+| Database |+--------------------+| information_schema || mysql || mysql_test || performance_schema || sample || sys |+--------------------+mysql> use sample;Database changedmysql> CREATE TABLE test_table1 (id INT, name VARCHAR(45));Query OK, 0 rows affected (0.02 sec)mysql> INSERT INTO test_table1 VALUES(1, 'Joe'), (2, 'Jane'), (3, 'Jack'), (4, 'Jessica');Query OK, 4 rows affected (0.01 sec)mysql> exit
Make sure you can connect to your MySQL instance using DBeaver o whichever tool u prefer.
Setting up Glue on AWS
Uploading our JAR files onto our S3 Bucket
Create an S3 Bucket and then upload the below items
- Snowflake spark connector : spark-snowflake_2.11–2.8.3-spark_2.4.jar
- Snowflake spark connector : snowflake-jdbc-3.12.16
- MySQL JAR connector : mysql-connector-java-8.0.25
- Our Python script (gluescript.py)
Create an IAM role
Create an IAM Policy called glue-policy.
Create a new IAM role and name it glue-mysql-role, choose the Glue service and attach the policy as above. Your role should look like this as shown below
Running through our script
import sysfrom awsglue.transforms import *from awsglue.utils import getResolvedOptionsfrom pyspark.context import SparkContext, SparkConffrom pyspark.sql import SQLContextfrom pyspark.sql.types import *from awsglue.context import GlueContextfrom awsglue.job import Jobimport timefrom pyspark.sql.types import StructType, StructField, IntegerType, StringTypefrom py4j.java_gateway import java_importSNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
args = getResolvedOptions(sys.argv, ['JOB_NAME','mysql', 'mysqldbtable', 'mysqluser', 'mysqlpassword', 'sfURL', 'sfUser', 'sfPassword', 'sfDatabase', 'sfSchema', 'sfWarehouse'])sc = SparkContext()glueContext = GlueContext(sc)spark = glueContext.spark_sessionjob = Job(glueContext)job.init(args['JOB_NAME'], args)java_import(spark._jvm, SNOWFLAKE_SOURCE_NAME)spark._jvm.net.snowflake.spark.snowflake.SnowflakeConnectorUtils.enablePushdownSession(spark._jvm.org.apache.spark.sql.SparkSession.builder().getOrCreate())columns = ['ID', 'NAME']df_result=[]
connection_mysql8_options_source_emp = {"url": args['mysql'],"dbtable": args['mysqldbtable'],"user": args['mysqluser'],"password": args['mysqlpassword'],"customJdbcDriverS3Path": "s3://adrian-jdbc-bucket/mysql-connector-java-8.0.25.jar","customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"}sfOptions = {"sfURL" : args['sfURL'],"sfUser" : args['sfUser'],"sfPassword" : args['sfPassword'],"sfDatabase" : args['sfDatabase'],"sfSchema" : args['sfSchema'],"sfWarehouse" : args['sfWarehouse']}# Read from MySQL database with custom driverdf_emp = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options_source_emp)df_transform = df_emp.toDF()for iteration in df_transform.collect():result=(iteration['id'], iteration['name'].upper())df_result.append(result)rdd = spark.sparkContext.parallelize(df_result)dfFromRDD1 = rdd.toDF(columns)dfFromRDD1.write.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("dbtable", "test_table1").mode("append").save()job.commit()
In our script what is happening is that we are doing a connection via the the MySQL connection parameters and we are then converting that to a dataframe. After which, we are performing the transformations on it in which we converted the name column values to upper case. Finally, we connect to Snowflake via the Snowflake connection parameters and store the data.
Configuring our Glue job
Now head over to AWS and search for Glue. Click on ETL > Jobs as shown in the image below
Click Add job and fill in a name of your choice, choose the IAM role we created above. Leave the Type as Spark and leave the Glue version as Spark 2.4, Python 3 with Improved job startup times
Click on an existing script that you provide and browse over to the S3 bucket that you had.
Now browse over to Security configuration, script libraries, and job parameters (optional) and under the Dependent jars path , place the paths of the jars files in our S3 bucket. For example in my case,
s3://adrian-bucket/mysql-connector-java-8.0.25.jar,s3://adrian-bucket/snowflake-jdbc-3.12.16.jar,s3://adrian-bucket/spark-snowflake_2.11–2.8.3-spark_2.4.jar
Under the Job Parameters fill in the below parameters
Now click on run under Action and the job should be triggered. If we toggle back to our Snowflake console, we can see that the data has been inserted and the Name column values are capitalised.