Using Snowflake’s new Snowpipe Streaming API with Kafka

Adrian Lee Xinhan
6 min readJul 31, 2023

--

Note: Opinions are of my own and do not represent my employer in any form

Snowpipe Streaming API is a new feature that has been released by Snowflake (currently in preview). The purpose of the Snowpipe Streaming API is to write rowsets of data into Snowflake instead of using the conventional Snowpipe (which does bulk data loads from a stage). Hence, this in returns results in lower latencies. To view more details regarding Snowpipe Streaming, you can go over to Snowflake’s respective resources to take a look

Having had the advantage to test the new Snowpipe Streaming API, I was curious as to how the Snowpipe Streaming API would work in conjunction with Kafka. (Please note: as of the writing of this article, Snowpipe Streaming is currently implemented as a set of APIs for the Snowflake Ingest SDK and supports Java version 8 or higher).

Overview of architecture

A summary of the steps is noted as below

  1. We will have Kafka running on local desktop. You can choose to deploy Kafka in any manner but for the sake of simplicity, I will be running Kafka with Docker from the conductor repository (https://github.com/conduktor/kafka-stack-docker-compose)
  2. Next we will create a Maven Project in VS code and write our Kafka Producer code using Java
  3. Finally, we will write our Consumer code with the Snowpipe Streaming API

For step 2 and 3, I have git push my repo here so you can reference it

Running Kafka on Docker

To run Kafka with Docker, you need to have Docker installed.

  • Install Docker Desktop on Mac
  • Install Docker Desktop on Windows.

We will use Docker Compose to run our Kafka. Please head over to this GitHub project https://github.com/conduktor/kafka-stack-docker-compose and do a git clone or download the files. The step-by-step instructions are noted here: https://www.conduktor.io/kafka/how-to-start-kafka-using-docker

After cloning the repo, cd to the repo and do

docker-compose -f zk-single-kafka-single.yml up -d

Check the services by running the command below

docker-compose -f zk-single-kafka-single.yml ps

We now have our Kafka service up and running and we can now begin coding our producer and consumer.

Creating a maven project and writing your producer code

In your IDE of choice (for myself I use VS code), create a maven project. Inside you maven project, remember to add in the following dependencies

Snowflake ingest SDK and Kafka Client

<!-- Add the Snowflake ingest sdk to Maven project's pom.xml -->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>1.0.2-beta.2</version>
</dependency>
<!-- Add the kafka client to Maven project's pom.xml -->
<dependency>
<groupId>org.apache.kafka<</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.2-beta.2</version>
</dependency>

There are other key dependencies which I have detailed inside my github link: https://github.com/sfc-gh-adlee/snowpipe_streaming_ingest

Now, let’s start writing our code. Our producer code is fairly simple. What it is doing, is that we are writing JSON string messages to the Kafka bootstrap server at localhost:9002.

First, we create a Kafka producer by creating an object of KafkaProducer as seen below

// create the producer 
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

We then create a ProducerRecord to send the message to the required. In this example, we are creating some random stock value tickers and sending them to our Kafka topic. A ProducerRecord can be created as shown below

// create a producer recordint desiredLength = 5;int random_id = (int )(Math.random() * 50 + 1);String random_name = UUID.randomUUID().toString().substring(0, desiredLength);String[] stock_market={“NASDAQ”, “NYSE”, “ASE”, “IEX”, “BSE”};Random r=new Random();int randomNumber=r.nextInt(stock_market.length);String stock_market_name = stock_market[randomNumber];String jsonString = “{‘ID’:’”+ val + “‘,’Details’:{‘ticker_number’:”+random_id+”,’stock_market’:’”+stock_market_name+”’,’ticker_name’:’”+random_name+”’}}”;ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(“demo_java”,jsonString);

For sending the data, we would just need to invoke the ProducerRecord and send as the producer.

// send data — asynchronous 
producer.send(producerRecord);
// flush data — synchronous
producer.flush();
// flush and close producer
producer.close();

The full sample code is shown above and is found inside the attached github link.

We are now ready to run our Producer code. However, before we click run, let us spin up our Consumer to receive the message. To interact with our docker, execute the command below

docker exec -it kafka1 /bin/bash

To interact with the consumer, execute the command

kafka-console-consumer — bootstrap-server localhost:9092 — topic demo_java — from-beginning

Our Kafka consumer looks good and ready to start receiving messages from the producer! Let’s run our producer code and you should start seeing messages propagating to the consumer as shown below.

Writing our Kafka Consumer code and use the Snowpipe Streaming API to save our data into Snowflake

In general, what we are doing is as follows. Our Kafka Consumer code will read from the Kafka Topic for incoming messages. Following this, our code will then read a JSON file (called the profle.json) and this file contains details such as our Snowflake account, User, Role and Private Key details. (Please note that to use the Snowpipe Streaming API, we require the private key. More details can be found here to generate a private and public key).

Let’s first create our Snowflake database, table which is fairly straightforward.

create or replace database snowpipeingest;
create or replace schema public;
create or replace table stockcodes(c1 variant);

We will also update our profile.json to have the relevant details such as our Snowflake account

{“user”: “john”,
“url”: “https://xxxx.snowflakecomputing.com:443",
“private_key”: “”,
“port”: 443,
“host”: “xxx.snowflakecomputing.com”,
“scheme”: “public”,
“role”: “xxxrole”
}

Let us start writing our code.

We then create an open channel request on a table using the SnowflakeStreamingIngestClient.

OpenChannelRequest request1 = OpenChannelRequest.builder(“MY_CHANNEL”)
.setDBName(“snowpipeingest”)
.setSchemaName(“public”)
.setTableName(“stockcodes”)
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE)
.build();

We will then create a SnowflakeStreamingIngestChannel against the database, schema and table. (Please note that the database, schema and table are expected to be present in Snowflake).

// Open a streaming ingest channel from the given clientSnowflakeStreamingIngestChannel channel1 = client.openChannel(request1);

We will convert the data from String into variant and then we will then insert the data into the channel by using the insertRows API on the Channel object.

// poll for new datawhile (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {log.info(“Key: “ + record.key() + “, Value: “ + record.value());
log.info(“Partition: “ + record.partition() + “, Offset:” + record.offset());
Map<String, Object> row = new HashMap<>();
String jsonString = record.value();
JsonElement jsonElement = JsonParser.parseString(jsonString);
JsonObject jsonObject = jsonElement.getAsJsonObject();

// c1 corresponds to the column name in table
row.put(“c1”, jsonObject);
InsertValidationResponse response = channel1.insertRow(row, String.valueOf(record.offset()));
if (response.hasErrors()) {
throw response.getInsertErrors().get(0).getException();}}}

In its entirety, our code will look as below

We are ready to now run the consumer code. In another terminal, run the consumer code. Re-run the producer code and we should see that the consumer is reading from the Kafka Topic and sending the data to our Snowflake table.

Querying Snowflake, we can see that immediately when the data is read from the Kafka Topic, it will insert it immediately into Snowflake.

--

--