Written by Todd Hunter
This tutorial provides a complete walk through for deploying a real-time data pipeline using Apache NiFi and Snowflake.
In this walk through, you will learn how to:
Complete an end-to-end deployment of the necessary components in a BYOC (Bring Your Own Cloud) Openflow environment on AWS.
Build and configure a NiFi workflow to consume the live Bluesky social media firehose.
Stream, transform, and load the ingested data into a Snowflake table using Snowpipe Streaming.
The objective is to equip you with a solid understanding of NiFi's data flow mechanics so you can confidently use and customise Snowflake's powerful data connectors.
Please note that this demo will create an m7i.2xlarge and an m7i.xlarge instance within your VPC, the cost associated with this will be approximately $USD20 per day.
Requirements
A snowflake account with a user with sufficient privileges to create databases, roles etc.
An AWS account within the same region, and a user with a sufficient role to create IAM roles, VPC, EKS and EC2 resources, and to be able to deploy them via CloudFormation.
Part 1: Deploying the Openflow Environment.
Initial Snowflake Configuration
A lot of the initial details on setting up Openflow are documented in the Snowflake documentation for it, located here: https://docs.snowflake.com/en/user-guide/data-integration/openflow/setup-openflow
We create the database and image repositories for OpenFlow:
CREATE DATABASE IF NOT EXISTS OPENFLOW;
USE OPENFLOW;
CREATE SCHEMA IF NOT EXISTS OPENFLOW;
USE SCHEMA OPENFLOW;
CREATE IMAGE REPOSITORY IF NOT EXISTS OPENFLOW;
grant usage on database OPENFLOW to role public;
grant usage on schema OPENFLOW to role public;
grant read on image repository OPENFLOW.OPENFLOW.OPENFLOW to role public;
We also create an openflow admin role and grant a set of privileges to it:
create role openflow_admin;
grant usage on database OPENFLOW to role openflow_admin;
grant usage on schema OPENFLOW to role openflow_admin;
grant read on image repository OPENFLOW.OPENFLOW.OPENFLOW to role openflow_admin;
grant create openflow data plane integration on account to role openflow_admin;
grant create openflow runtime integration on account to role openflow_admin;
This role will need to be granted to the admin of openflow:
grant role openflow_admin to user <username>
One last thing we do for the demonstration is to create a schema and user for Openflow to use to connect to Snowflake. This user will have permissions on a separate schema named OPENFLOW_DEMO. They will also use a key-pair to authenticate to snowflake, see the snowflake documentation here on how to generate the keypair.
USE DATABASE OPENFLOW;
CREATE SCHEMA IF NOT EXISTS OPENFLOW_DEMO;
CREATE ROLE IF NOT EXISTS OPENFLOW_DEMO_ROLE;
-- Grant database usage
GRANT USAGE ON DATABASE OPENFLOW TO ROLE OPENFLOW_DEMO_ROLE;
-- Grant schema permissions (full control)
GRANT ALL PRIVILEGES ON SCHEMA OPENFLOW.OPENFLOW_DEMO TO ROLE OPENFLOW_DEMO_ROLE;
-- Grant permissions on current and future tables
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA OPENFLOW.OPENFLOW_DEMO TO ROLE OPENFLOW_DEMO_ROLE;
GRANT ALL PRIVILEGES ON FUTURE TABLES IN SCHEMA OPENFLOW.OPENFLOW_DEMO TO ROLE OPENFLOW_DEMO_ROLE;
-- Grant permissions on current and future views
GRANT ALL PRIVILEGES ON ALL VIEWS IN SCHEMA OPENFLOW.OPENFLOW_DEMO TO ROLE OPENFLOW_DEMO_ROLE;
GRANT ALL PRIVILEGES ON FUTURE VIEWS IN SCHEMA OPENFLOW.OPENFLOW_DEMO TO ROLE OPENFLOW_DEMO_ROLE;
-- Grant permissions on current and future sequences
GRANT ALL PRIVILEGES ON ALL SEQUENCES IN SCHEMA OPENFLOW.OPENFLOW_DEMO TO ROLE OPENFLOW_DEMO_ROLE;
GRANT ALL PRIVILEGES ON FUTURE SEQUENCES IN SCHEMA OPENFLOW.OPENFLOW_DEMO TO ROLE OPENFLOW_DEMO_ROLE;
-- Grant permissions on current and future functions
GRANT ALL PRIVILEGES ON ALL FUNCTIONS IN SCHEMA OPENFLOW.OPENFLOW_DEMO TO ROLE OPENFLOW_DEMO_ROLE;
GRANT ALL PRIVILEGES ON FUTURE FUNCTIONS IN SCHEMA OPENFLOW.OPENFLOW_DEMO TO ROLE OPENFLOW_DEMO_ROLE;
-- Grant permissions on current and future procedures
GRANT ALL PRIVILEGES ON ALL PROCEDURES IN SCHEMA OPENFLOW.OPENFLOW_DEMO TO ROLE OPENFLOW_DEMO_ROLE;
GRANT ALL PRIVILEGES ON FUTURE PROCEDURES IN SCHEMA OPENFLOW.OPENFLOW_DEMO TO ROLE OPENFLOW_DEMO_ROLE;
-- Create the service account user
CREATE USER IF NOT EXISTS OPENFLOW_DEMO_SVC_USER
COMMENT = 'Service account for OpenFlow POC operations'
DEFAULT_ROLE = 'OPENFLOW_DEMO_ROLE'
DEFAULT_WAREHOUSE = 'OPENFLOW_WAREHOUSE' -- Adjust to your warehouse name
DEFAULT_NAMESPACE = 'OPENFLOW.OPENFLOW_DEMO'
MUST_CHANGE_PASSWORD = FALSE
RSA_PUBLIC_KEY = '-----BEGIN PUBLIC KEY-----
<public key material generated above>
-----END PUBLIC KEY-----'; -- Replace with actual public key
-- Grant the role to the service account
GRANT ROLE OPENFLOW_DEMO_ROLE TO USER OPENFLOW_DEMO_SVC_USER;
-- Optional - Grant role to other users/roles for administration
GRANT ROLE OPENFLOW_DEMO_ROLE TO ROLE SYSADMIN;
Creating an Openflow Environment on AWS
From the Snowsight console, under the Data menu select Openflow and click "Launch Openflow", you will need to log in with the user who was granted the openflow_admin role above. This will launch the openflow console. The first time you log into the console you will be presented with this:
You are required to create 2 components for an Openflow environment, the first is a Deployment. This is the base infrastructure required in order to run the openflow environment. If running Openflow with a BYOC model, this will create all the infrastructure within AWS to support Openflow. This includes roles, VPC's, an EC2 management instance and an EKS cluster. The second component is the 'Runtime', this is an environment within the deployment where the actual NiFi components, scripts and pipelines will run.
When you select 'Create a deployment', there is a short workflow to create your deployment:
Currently the only available option is to deploy with a BYOC on AWS, this will be updated in the future to other clouds and also Snowpark Container Services.
Finally, Managed VPC is the simplest deployment as Snowflake will build all the infrastructure required, however if you are required to run within existing VPC's that option is available as well. Also, select the role created above for the owner role of the deployment, in this case "OPENFLOW_ADMIN".
When you create a deployment, openflow will generate a YAML CloudFormation template which will be used to deploy the AWS infrastructure required. Download the template.
Log into the AWS console and go to the CloudFormation panel and select 'Create Stack' -> With new resources.
Choose an existing template and Upload a template file. Use the YAML which was downloaded from Openflow and Upload it to CloudFormation.
On the 'Specify Stack details' screen, the only required field is the Stack name at the top of the page. Click 'Next' at the bottom of the screen.
On the 'Configure stack options' screen, select the 'Create IAM resources' Capability at the bottom of the screen and select 'Next'.
The final screen show a summary to review, go to the bottom and select submit.
This will take you back to the CloudFormation console and the resources required will start to be deployed. From here it is an automatic process, CloudFormation will build all the resources required. Once the CloudFormation has completed deploying, the EC2 instance running within the VPC will start configuring the EKS cluster to run Openflow.
In total, this will take between 45-60 minutes to complete building. Once the Openflow service has started, the STATE within the Openflow console will change to Active.
Go to the Runtimes tab within Openflow and select 'Create Runtime'. Fill in the runtime name, make sure the Node type is 'small' and click 'Create'.
This will start up a new node, and deploy the runtime for Openflow.
Once the deployment of the runtime is complete, go to the dropdown menu and select 'View Canvas'.
You will be prompted again to login, and now congratulations, you now have a running Openflow environment.
Part 2: Building your first Flow
Snowflake ships Openflow with a number of connectors. Connectors are simply a NiFi “process group” that Snowflake has pre-configured. We will explain more about NiFi objects and terminology including Process Groups later in this blog. Using the database connectors we can OOTB replicate data in real time from many popular databases. These connectors process groups themselves are also editable allowing us to tweak their default configuration.
However we want to help you to understand how to build connectors from scratch so this tutorial will guide you through building a data pipeline to consume Bluesky's real-time firehose of public social network updates. We'll connect to a WebSocket stream providing JSON objects (representing "skeets"), filter these messages to retain only English-language content, and then transform the data into a structure compatible with Snowflake's Snowpipe Streaming service. The ultimate goal is to land this curated data into a Snowflake table for future analysis and use.
At the end of this tutorial, you should be able to understand the main components of Openflow and Nifi and be able to understand what is happening with the connectors supplied by Snowflake.
Our goal is to consume messages from the social media site Bluesky. Bluesky sends messages using a binary protocol AT Protocol and the Jetstream service converts these into JSON objects and serves them via a WebSocket. Messages coming through the Jetstream WebSocket look like as follows:
{
"did" : "did:plc:2fw2iqlwc7hnwiwpvmspyz57",
"time_us" : 1749166646713618,
"kind" : "commit",
"commit" : {
"rev" : "3lqvigdrohe2i",
"operation" : "create",
"collection" : "app.bsky.feed.post",
"rkey" : "3lqvig2f42c2e",
"record" : {
"$type" : "app.bsky.feed.post",
"createdAt" : "2025-06-05T23:37:16.500Z",
"langs" : [ "en" ],
"reply" : {
"parent" : {
"cid" : "bafyreibb3fhgm3uiq3adsq7qvkreogqsouutalls66vpa76oiaeurfwte4",
"uri" : "at://did:plc:2fctsdpquhlikmnwfwnb3epm/app.bsky.feed.post/3lqvec5gq3s2l"
},
"root" : {
"cid" : "bafyreiaj2iqzgoo5kidsdvdwnnbdzxhgbfyhxef5ab73rqorrr4ejf5qne",
"uri" : "at://did:plc:2fw2iqlwc7hnwiwpvmspyz57/app.bsky.feed.post/3lqnd2dndrs24"
}
},
"text" : "I played and watched FNF on YouTube, so I know a few mods. Not many but a few."
},
"cid" : "bafyreihtxdyw2uf3v4bv5w6w6okcsvpgzqmv334qtkenxpcdcn4wnpx74m"
}
}
To get started there are few concepts which are useful to know when starting to work with NiFi.
FlowFiles: Think of a FlowFile as a single piece of data moving through your NiFi data flow. This is made up of two parts, firstly the content of the data traversing the system. And a set of attributes, key-value pairs that provide metadata or information about the content.
Processors: These are the core parts of the compute within NiFi. It's a component that performs a specific action on a FlowFile. For example, it can fetch data from a remote system, transform data, route data and send data to other systems. A NiFi data flow consists of a series of processors which act on a FlowFile
Controller Services : These are shared resources which the processors can use. For example, these could be shared database connection pools or provide credentials to cloud services.
Process Groups: These are a way to organise a segment of the data flow. They can be used to keep complex flows tidy by grouping related processes together. In some ways they can be seen as a reusable template for a flow, or a black box hiding some complexity behind it. These can also be managed with version control and can be supplied by third parties. Snowflake connectors are supplied as Process Groups and they have committed to expanding the 20 or so pre packaged process groups that are available now
I have highlighted the 2 main components within the UI which we will be using for the majority of the creation of the DataFlow. The open area underneath is called the canvas. Adding components involves dragging them from the toolbar at the top of the screen into the canvas.
Begin by dragging a process group into the canvas and you will get a dialog box "Create Process Group". Name it Bluesky Demo
and click Add. Double Click to enter the process group to edit it. At the bottom of the window there is a trail back to the parent group. Process groups can be nested to any level of depth and FlowFiles can pass in and out of them via Input and Output ports. They form a logical grouping of processors and controllers and allow you to keep complicated flows tidy.
Configure Controller Services
The first thing we are going to configure are the Controller Services required for the Processors. Remember these are the components which manage the configuration to other services and manage shared resources between Nifi. We require 3 in order to perform the processing through the Flow.
To configure these, right click on the canvas to bring up the menu and select 'Controller Services'. In the controller services, click the + to add them. The three we need to add are:
JettyWebSocketClient
JsonTreeReader
StandardPrivateKeyService
NiFi configuration is really helpful in that it will make visible any required configuration for the components. You should see that the JettyWebSocket and StandardPrivateKeyService require some extra configuration.
Starting with the JettyWebSocketClient, click on the menu on the right and 'Edit' it. This will bring up the configuration screen for this controller. Here you can rename the components to make the names more specific to their use. The middle tab, 'Properties', you need to set the WebSocket URI to wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post
and click the Verification (✔️) box to make sure that its correctly configured. If everything is good, click apply and close the window.
The second controller we need to configure is the StandardPrivateKeyService. This service holds the private key of the user created above in the previous step to authenticate to Snowflake. Again, click the menu and Edit the controller, and in the Properties, paste the private key of the Snowflake user you created in the first step into the 'Key' field and click OK. Note that the key is stored as a Sensitive value. Users are not able to retrieve the value once it has been set here. Verify that the configuration is correct and click Apply.
The third controller we need is the JsonTreeReader, this provides processors a way of understanding JSON data. It is used within the process to read a JSON structure and convert it to what is required within a processor.
One last step we need to perform is enabling these services. Because Controller Services often manage resource-intensive assets like database connections, we may not want them to activate the moment they are created. This design allows us to build and configure an entire flow before we commit to connecting to external services. The 'enable' step, therefore, acts as our deliberate decision to start the service.
Furthermore, once a service is running, its configuration is locked. This is a key safety feature. It ensures changes are managed in a controlled and deliberate manner—by disabling, modifying, and re-enabling the service—rather than potentially disrupting a flow while it's live.
Services can be enable one-by-one from the menu next to the services, however a quicker way to enable all the services is to return back to the canvas (via the 'Back to Process Group' link), right click the canvas and select 'Enable All Services'.
Configuring the Flow
Now we need to build the flow with the 5 processors required to build the final flow.
ConnectWebSocket
Lets start with the web socket connection, this first processor acts as the endpoint to interact with the remote WebSocket. Drag the processor icon from the menu into the canvas and search for ConnectWebSocket. Double click the processor to enter the configuration. Here configuration here is relatively straightforward and the Controller we configured earlier provides all the connection information. For the Client Controller service, select the WebSocketClient that was created earlier, and add your own name for the WebSocket Client Id. Validate that the configuration is correct and click Apply.
One thing you will notice back on the canvas is the warning next to the name of the processor. If you hover over the warning you will see the following warning, this is one of the core design principals of NiFi which is no data loss by accident.
Each Processor may have multiple ways of dealing with output from itself. For example the ConnectWebSocket will emit 5 different FlowFiles depending on events which occur within itself. These are connected, disconnected, failure, success, binary message and text message. The warning is NiFi's way of saying "A Flowfile may come out of the 'text message' but I haven't been told how to deal with it, and I'm not going to guess cause I REALLY don't want to lose data". This warning will actually block the processor from starting so it needs to be dealt with.
There are 2 options for dealing with this. If you don't need the data you can explicitly terminate the relationship. As we only care about the JSON messages in this tutorial we will terminate the 'binary message' and 'connected' relationships. Go back into the configuration of the ConnectWebSocket processor and go to the 'Relationships' tab. From here we can terminate all the connections we don't require.
EvaluateJsonPath
Now that we have a connection to a stream of JSON messages from the WebSocket, our next goal is to filter them based on language. The problem is that the language information is buried deep inside the JSON content of the FlowFile. To make decisions based on the language, we first need to pull that specific piece of data out and make it easily accessible. To do this we will use the EvaluateJsonPath processor. Its purpose is to look inside a FlowFile's JSON content, find specific values using a path, and then do something with them.
Drag a new processor onto the canvas and select the EvaluateJsonPath.
Now hover over the ConnectWebSocket processor and a blue arrow will appear.
Drag the arrow from the ConnectWebsocket and drop it onto the EvaluateJsonPath processor and a new dialog will show to create the connection.
Select 'text message' and add the connection. Now the warning will go away from the ConnectWebSocket as all its relationships have been either connected or terminated and the warning is now a red stop symbol. Your flow should now look like this.
Now we will need to configure the EvaluateJsonPath processor. Open the EvaluateJsonPath processor and go to the Properties and made the following changes:
Change 'Destination' from 'flowfile-content' to 'flowfile-attribute'
Click the + to add a new attribute. Name this langs and set its value to
$.commit.record.langs[0]
So what is happening here with the Destination being set to 'flowfile-attribute' is that this is configuring the processor to not change the original content JSON within the FlowFile, instead write the data as an attribute in the metadata of the FlowFile. This is useful as it means that metadata can be used in downstream systems, for example for filtering or routing, without worrying about changing the data contained within the FlowFile.
We also define the attribute we are creating, in this case this is the langs
attribute. As the data within the file is JSON, we need to describe where we want to get the value from. This processor uses the JSONPath expression to find the data to populate the attribute with. The structure of JSONPath is a little outside the scope of the tutorial, but i find https://jsonpath.com/ and https://jsonpathfinder.com/ both useful tools for determining the correct expressions for the JSONPath value.
In addition we need to update the relationships, on the relationship tab terminate the 'failure' and 'unmatched' relationships as we are not interested in messages which are either unable to be parsed due to it not being JSON, or for messages where we cannot extract the language of the message
RouteOnAttribute
The next step in our pipeline is to select only the messages written in English. To accomplish this, we will use the RouteOnAttribute processor. While NiFi offers many routing options, RouteOnAttribute is perfect for demonstrating a powerful concept: English "skeets" can be sent directly to the database, while non-English ones could be diverted to a translation service before being re-introduced to the flow. For this tutorial, we will set up the routing logic for both paths, but leave the implementation of the translation service as an exercise for the reader.
Drag a RouteOnAttribute processor into the canvas and configure it as follows.
The settings are:
Routing strategy, set to 'Route to Property name'
Add a new property named 'english' with the value
${lang:equals('en')}
Add a new property named 'non-english' with the value
${lang:equals('en'):not()}
These property values are defined in what is called the NiFi expression language. The NiFi Expression Language is a powerful and lightweight mini-language that gives you the ability to reference, manipulate, and evaluate FlowFile attributes and system properties directly within processor properties. As we've demonstrated with our routing rule ${lang:equals('en')}, it is the key to creating dynamic and intelligent data flows. It can be used for everything from simple attribute retrieval to complex comparisons and modifications. A full exploration of its capabilities is beyond the scope of this tutorial, but understanding its basics is essential for unlocking NiFi's full potential. For a comprehensive guide on all available functions, I highly recommend reviewing the official documentation: Apache NiFi Expression Language Guide.
So the outcome of this will route all messages which have a langs set to “en” to the english relationship, and the rest to non-english. If you jump to the Relationship tab, you will see that NiFi has picked up the new relationships which have just been generated. As we will not be using the unmatched or non-english relationships we can terminate them here. Click Apply and return to the canvas.
Now we will create the connection between the EvaluateJsonPath and the RouteOnAttribute processors. Drag the connection between them and we want FlowFiles which have been successfully matched to pass through the connection. Select matched and click Add.
Troubleshooting
At this point you may be wondering "How do I know if everything I have setup is working correctly?". One way to check this is to terminate the english route on terminate and run the pipeline by right clicking the canvas and selecting 'Start'. If everything is setup correctly you should start seeing all the processors having messages travelling through it, so in this case the ConnectWebSocket has sent out 932 messages which have been sent through each of the subsequent Processors.
If you right click on the RouteOnAttribute there is an option to 'View Data Provenance', if you select this you will see all the FlowFiles which have been processed by that Processor. This will give you insights into what is happening at each stage of the flow. If you select the menu on the right side of one of the Files and select 'Show Lineage' you can see each step that the FlowFile has processed through.
If you right click on each of the nodes you can see the before and after state of both the Content and Attributes of the individual FlowFile. For example if you right click the 'Attributes Modified' node you can see that the green + next to the 'langs' attribute shows that it was added, and the value which was determined by the JSONPath query.
On the 'Content' tab we can see the Input and Output of the processor, this is useful if you want to see what effect any transforms on the content of the file has.
One thing to be aware of when viewing the content of the FlowFiles is that NiFi is not aware of the type of data unless you explicitly set an attribute on the FlowFile to indicate its MIME type. By default it will be displayed as a 'hex' format, if you select the View you can force Nifi to show the data correctly, an example of the View of the content of a FlowFile through the flow is:
The end goal of the pipeline is to push this data into a Snowflake table via Snowpipe Streaming. In order for Snowpipe streaming to be able to process the data, we need to take this nested JSON structure and flatten it out. In order to do this we will use the JoltTransformJSON processor.
JoltTransformJSON
This processor is used to reshape JSON from one structure to another. Rather than writing procedural code, you provide a declarative JSON "specification" that acts as a blueprint, mapping elements from the input document to their desired locations and structures in the output. This makes Jolt ideal for complex tasks such as flattening or nesting objects, renaming fields, and restructuring deep JSON structures to match a required target schema.
JOLT uses a DSL to specify how data in JSON format should be transformed. In general, the specification below shows how the input data of the JSON (the keys) should be located in the output JSON (the values). For example the parent cid field, will take the field located in the JSON at $.commit.record.reply.parent.cid
in the source, and locate it at the root in a field named parent_cid
.
{
"did": "did",
"commit": {
"record": {
"\\$type": "type",
"createdAt": "createdAt",
"text": "text",
"reply": {
"parent": {
"cid": "parent_cid",
"uri": "parent_uri"
},
"root": {
"cid": "root_cid",
"uri": "root_uri"
}
}
}
}
}
Further details on the JOLT transforms are outside the scope of this tutorial, but more documentation can be found at the Jolt library github page. Another useful tool I have found is a demo where you can quickly try transforms, the Jolt Transform Demo
Drag a JoltTransformJSONÂ processor onto the canvas under the RouteOnAttribute processor. Drag a connection from RouteOnAttribute to JoltTransformJSON and select the 'english' relationship.
Configure the JoltTransformJSON processor with the following settings:
Jolt Transform - set this to 'Shift', there are other types, see the documentation for their usage.
Jolt Specification - copy the transform from above into this field
Pretty Print - set to true.
The result of the transform will transform a message like this:
{
"did" : "did:plc:7ex7jt6hu5qcl33qcjhbsuu3",
"time_us" : 1749370767788843,
"kind" : "commit",
"commit" : {
"rev" : "3lr3gjmv5cc2p",
"operation" : "create",
"collection" : "app.bsky.feed.post",
"rkey" : "3lr3gjmes2s2n",
"record" : {
"$type" : "app.bsky.feed.post",
"createdAt" : "2025-06-08T08:19:26.975Z",
"langs" : [ "en" ],
"reply" : {
"parent" : {
"cid" : "bafyreihndrgbduc6ixzsnwo4fxiycmlkwa5wdlng73mv7ypwv4barussra",
"uri" : "at://did:plc:e6bx3v3wtu76sc7uknsqrz2z/app.bsky.feed.post/3lr3gbzbcpc2r"
},
"root" : {
"cid" : "bafyreieevuk62wa3y43a3ufcqwtlzy4weagxpbaryrpyy4iiotnetpv5ui",
"uri" : "at://did:plc:ttnjjwyfpplpl6qyynvt3tbw/app.bsky.feed.post/3lr3anyti3k2t"
}
},
"text" : "I appreciate the enthusiasm but that last sentence is the kind of thing that goes without saying haha"
},
"cid" : "bafyreicdo4j6aa245w5jcnefyi4sfcaepul7rx4crbffqn4tno74g5ffpe"
}
}
to this:
{
"did" : "did:plc:7ex7jt6hu5qcl33qcjhbsuu3",
"type" : "app.bsky.feed.post",
"createdAt" : "2025-06-08T08:19:26.975Z",
"text" : "I appreciate the enthusiasm but that last sentence is the kind of thing that goes without saying haha",
"parent_cid" : "bafyreihndrgbduc6ixzsnwo4fxiycmlkwa5wdlng73mv7ypwv4barussra",
"parent_uri" : "at://did:plc:e6bx3v3wtu76sc7uknsqrz2z/app.bsky.feed.post/3lr3gbzbcpc2r",
"root_cid" : "bafyreieevuk62wa3y43a3ufcqwtlzy4weagxpbaryrpyy4iiotnetpv5ui",
"root_uri" : "at://did:plc:ttnjjwyfpplpl6qyynvt3tbw/app.bsky.feed.post/3lr3anyti3k2t"
}
Which is then suitable for inserting into Snowflake via Snowpipe Streaming.
PutSnowpipeStreaming
Before building this final step with Openflow, we need to build the target table within Snowflake. Go back to the snowflake console and create the table as follows:
USE ROLE OPENFLOW_DEMO_ROLE;
USE DATABASE OPENFLOW;
USE SCHEMA OPENFLOW_DEMO;
CREATE TABLE BLUESKY_POSTS (
DID STRING,
TYPE STRING,
CREATEDAT TIMESTAMP,
TEXT STRING,
PARENT_CID STRING,
PARENT_URI STRING,
ROOT_CID STRING,
ROOT_URI STRING
);
Drag a PutSnowpipeStreaming processor onto the canvas, and connect from the JoltTransformJSON to the PutSnowpipeStreaming processor with the 'success' relationship.
Configure the processor as follows:
Authentication Strategy - Key Pair
Account - your snowflake account identifier
User - The snowflake username of the user you created above -
OPENFLOW_DEMO_SVC_USER
Role - The snowflake role which you created above -
OPENFLOW_DEMO_ROLE
PrivateKeyService - This dropdown should be populated with the private key controller which you configured at the start of the tutorial.
Database - Set to
OPENFLOW
Schema - Set to
OPENFLOW_DEMO
Table - Set to
BLUESKY_POSTS
Record Reader - set to the JsonTreeReader. This is a drop down and should be populated by the controller which was created at the start of the tutorial.
When you perform the verification on this Processor it will verify both that the processor is configured correctly and the authentication of the user to Snowflake is also correct.
As we will not be using any of the outputs from this processor, terminate both the failure and success relationships.
Once you have configured all the processors, the flow should look like this:
You should be able to right click on the canvas and select 'Run' to start the flows. After a couple of seconds you will see that counters start going up. These counters give you the metrics of the number of events occurring over the last 5 minutes. You should also be able to see the records streaming into Snowflake. In our testing we have seen roughly 10 to 20 messages per second being written to Snowflake.
And we should start seeing the messages flow into the Snowflake table.
Summary
In this tutorial, we have successfully constructed a complete, end-to-end data pipeline using Snowflake's Openflow, powered by Apache NiFi. Starting from scratch, we configured the necessary roles, permissions, and objects within Snowflake. We then provisioned the required cloud infrastructure in AWS using a Bring-Your-Own-Cloud (BYOC) model, deploying the Openflow environment via a CloudFormation template. This provided us with a powerful, managed NiFi canvas running within our own VPC, ready for development.
The core of our project was building the data flow itself. We established a real-time connection to the Bluesky social media firehose using a WebSocket, capturing a continuous stream of JSON-formatted data. Within our NiFi flow, we demonstrated key data processing techniques: extracting a specific attribute (langs) from the nested JSON, routing the data based on this attribute to filter for only English-language posts, and finally, performing a significant structural transformation using a Jolt specification. This last step was crucial to flatten the complex JSON into a clean, relational format suitable for direct ingestion into our target Snowflake table via the high-throughput Snowpipe Streaming service.
Next Steps and Further Exploration
The pipeline you've built can serve as a foundation, and there are many directions you could take it from here. The most obvious next step is to build out the "non-english" route we terminated in the tutorial. This branch could be routed to a translation service (many of which have REST APIs that NiFi can call) before being transformed and merged back into the main flow to be loaded into Snowflake. This would create a more complete dataset for global sentiment analysis.
You could also enhance the existing flow. For instance, consider adding processors to enrich the data further, perhaps by calling an external API to perform sentiment analysis on the text of each post and adding that score as a new field. You could also explore more advanced error handling and monitoring. NiFi provides processors to send alerts via email or Slack if a part of the flow fails, ensuring the pipeline is robust and production-ready. On the Snowflake side, you could now build Materialized Views or Dynamic Tables on the incoming BLUESKY_POSTS data to perform real-time aggregations and analytics.
Snowflake also offers a set of Connectors from the Openflow administration console. These are pre-defined Nifi flows created by Snowflake. You can add these to your runtime and they will show up as new Process Groups. They provide a great way to see full examples of NiFi flows in different contexts as well as providing connectors to services such as Kafka, Sharepoint and Slack.
We encourage you to continue experimenting with this powerful toolset and discover what else you can build.