Processing unstructured data in Snowflake
Recently, unstructured data was launched in Snowflake. With this, it Snowflake has really become the central Data Warehouse where you can store your structured, semi-structured and unstructured data.
In this article, what we are going to do here is to explore storing unstructured data in Snowflake. This is done in which whenever pdf files are loaded into the internal stage of Snowflake, we will create a stream to pick up the new pdf files, process it and save the pdf data into our table.
Note:
- At the point of this article, we would need to manually refresh a stage by running alter stage <STAGE_NAME> refresh
Creating our stage and stream
After you have signed up for a Snowflake account, go to the console and run the statements below. What we are doing here is to create a database, schema and a warehouse. We then create stage with directory table. After which, we create a stream on the stage.
Finally, we are also going to create a jars_stage to host our JAR file from our Java function below.
CREATE OR REPLACE DATABASE UNSTRUCTURED_DEMO;
CREATE WAREHOUSE UNSTRUCTURED_DEMO_WH;
CREATE OR RELACE SCHEMA UNSTRUCTURED_DEMO_SCHEMA; CREATE OR REPLACE stage document_stage encryption= (type = ‘SNOWFLAKE_SSE’) directory=(ENABLE=true);CREATE OR REPLACE stage jars_stage;
Writing our Java app and Java User Defined Function
Open up Visual Studio Code and let’s write a simple function which takes in a PDF document and reads the data. We are going to use pdfbox-app-2.0.24.jar file to read an InputStream and then output the data
I have written a simple Java function to read PDF documents
In case you are not sure how to deploy Java projects in VS code, refer to this link https://www.youtube.com/watch?v=2WeYJrGbyIg
Our VS code should look like this as shown below
Once we are done, we are going to compile our Java code into a Jar file and load it into the jars_stage that we created earlier using SnowSQL.
put file:///Users/adlee/java_udf_snowflake/java_udf_snowflake.jar @j
ars_stage AUTO_COMPRESS=false;
If all is done correctly, we should see our JAR file uploaded to our stage
Creating our JAVA UDF in Snowflake console
We are going to create a function called par_doc that returns a String of of pdf file.
CREATE OR REPLACE function par_doc(file string)
returns string
language java
handler = ‘myapp.myapp.read_file’
imports = (‘@jars_stage/java_udf_snowflake.jar’);
Let’s try to see if it works.
Let’s toggle back to our snowsql command line and upload a sample pdf document into the document_stage
put file:///Users/Data_files/pdfs/sample.pdf @document_stage/files AUTO_COMPRESS=false;
After which we will run our Java UDF function by doing
SELECT par_doc('@document_stage/files/sample.pdf');
Great! Our Java UDF is running fine
Creating our streams, task and procedure
Now, we want to make it as seamlessly as possible as running a UDF every time manually is tedious. We also want to store the data into a table
First, we will create a stream on the stage.
CREATE OR REPLACE stream documents_stream on stage document_stage APPEND_ONLY = FALSE;
We will also create a table to hold our pdf data that has been read by our UDF
CREATE OR REPLACE TABLE pdf_data(TEXT VARCHAR(16777216));
Now we will write a store procedure which will read from the document_stream where it is an insert action, create a temporary table to insert the stream data (this is so that the stream can be cleared for new data) and insert the final PDF into out pdf_data table.
CREATE OR REPLACE PROCEDURE insert_pdf_data()
returns ARRAY
language javascript
as
$$
var results_array = [];
var cmd = “select relative_path from documents_stream where METADATA$ACTION=’INSERT’”;
var result = “select * from pdf_data”;
var rs = snowflake.createStatement({ sqlText: cmd }).execute();
while (rs.next()) {
var schema_name = rs.getColumnValue(1);
results_array.push([schema_name]);
}
for (var i = 0; i < results_array.length; i++) {
snowflake.execute( { sqlText: “insert into pdf_data (text) select par_doc(concat(‘@document_stage/’, ?));”, binds: [results_array[i].toString()] });
}
var create_temp_table = “CREATE OR REPLACE temporary table document_stream_data(relative_path VARCHAR(50),METADATA_Action VARCHAR(10),METADATA_IsUpdate BOOLEAN,METADATA_ROWID VARCHAR(100));”
var insert_stream = “INSERT into document_stream_data(relative_path,METADATA_Action,METADATA_IsUpdate,METADATA_ROWID)SELECT relative_path,METADATA$Action,METADATA$IsUpdate,METADATA$ROW_ID from documents_stream;”
snowflake.createStatement({ sqlText: create_temp_table }).execute();
snowflake.createStatement({ sqlText: insert_stream }).execute();
return results_array;
$$;
Finally, we will create a task in which when there is data in the streams it will call the procedure that we created above.
CREATE TASK analyse_pdf_data_task
WAREHOUSE = DEMO_WH
SCHEDULE = ‘5 minute’
WHEN
SYSTEM$STREAM_HAS_DATA(‘documents_stream’)
AS
call insert_pdf_data();Alter task analyse_pdf_data_task resume;
Remove the pdf file from document_stage and let’s upload a new file. Once done, remember to do
alter stage document_stage refresh
We can see the files are being added and saved into our table
And we are done!