Prerequisites
- Install IoT Bridge for Snowflake into your Azure account
- Before being able to access the Virtual Machine you must have completed the installation process here.
- Install an MQTT Server
- Chariot MQTT Server can be installed using this guide. However, any Sparkplug compliant MQTT Server will work. Note Azure IoT Hub is not Sparkplug compliant.
Summary
IoT Bridge for Snowflake (IBSNOW) is an application that connects to an MQTT Server (such as Chariot MQTT Server) and consumes MQTT Sparkplug messages from Edge devices. These messages must be formatted as Sparkplug Templates. Sparkplug Templates are defined in the Sparkplug Specification. These Templates are used to create the data in Snowflake automatically with no additional coding or configuration. Then multiple instances of these Templates generate the Assets and start to populate with real time data sent on change only, thus significantly reducing the amount of data being sent to the cloud. For further details on Snowflake, refer to the documentation here. For further details on Eclipse Sparkplug, refer to the Eclipse Sparkplug resources.
This Quickstart document covers how IoT Bridge can be used to consume MQTT Sparkplug data and create and update data in Snowflake. This will show how to configure IoT Bridge as well as show how to use Inductive Automation's Ignition platform along with Cirrus Link's MQTT modules to publish device data to an MQTT Server. This data will ultimately be consumed by IoT Bridge to create and update the Snowflake components. This tutorial will use the Cirrus Link Chariot MQTT Server implementation. However, IBSNOW does work with any MQTT v3.1.1 compliant MQTT Server including Cirrus Link's MQTT Servers.
It is also important to note that Ignition in conjunction with Cirrus Link's MQTT Transmission module converts Ignition User Defined Types (UDTs) to Sparkplug Templates. This is done automatically by the MQTT Transmission module. So, much of this document will refer to UDTs rather than Sparkplug Templates since that is what they are in Ignition. More information on Inductive Automation's Ignition platform can be found here. Additional information on Cirrus Link's MQTT Transmission module can be found here.
Snowflake Setup
If you don't have a Snowflake account, open a Web Browser and go to https://www.snowflake.com. Follow the instructions there to start a free trial. After creating an account, log in to Snowflake via the Web Console. You should see something like what is shown below.
![](/download/attachments/154796084/image2023-4-19_13-59-21.png?version=1&modificationDate=1688071366546&api=v2)
Create a new 'SQL Worksheet' by clicking the blue + button in the upper right hand corner of the window as shown below.
![](/download/thumbnails/154796084/image2023-4-19_14-3-36.png?version=1&modificationDate=1688071366563&api=v2)
Copy and paste the following SQL script into the center pane. Click the 'Expand source' button on the right to copy the script source code
SQL Script 01 Expand source
After pasting the code into the center pane of the SQL Worksheet, click the drop down arrow next to the blue play button in the upper right corner of the window and click 'Run All' as shown below.
![](/download/thumbnails/154796084/image2023-4-19_14-13-53.png?version=1&modificationDate=1688071366577&api=v2)
After doing so, you should see a message in the 'Results' pane denoting the SPARKPLUG_RAW table was created successfully as shown below.
![](/download/attachments/154796084/image2023-4-19_14-14-55.png?version=1&modificationDate=1688071366595&api=v2)
Now, repeat the process for each of the following scripts in order. Each time, fully replace the contents of the SQL script with the new script and click the 'Run All' button after pasting each script. Make sure no errors are displayed in the Results window after running each script.
Script 02
-- =========================
-- In this script, we are setting up assets related to the node database
-- ,which would eventually contain all the device specific views and tables.
-- At the very core, the following assets are created:
-- - Node Database
-- - Staging schema
-- The database & schema will be owned by SYSADMIN
-- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
-- =========================
set staged_sparkplug_raw_table = 'cl_bridge_stage_db.stage_db.sparkplug_raw';
set cl_bridge_node_db = 'cl_bridge_node_db';
set staging_schema = 'stage_db';
-- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>>
use role sysadmin;
create database if not exists identifier($cl_bridge_node_db)
-- DATA_RETENTION_TIME_IN_DAYS = 90
-- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90
comment = 'used for storing flattened messages processed from the staging database'
;
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>>
use database identifier($cl_bridge_node_db);
create schema if not exists identifier($staging_schema)
with managed access
-- data_retention_time_in_days = 90
-- max_data_extension_time_in_days = 90
comment = 'used for storing flattened messages processed from the staging database';
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>>
use schema identifier($staging_schema);
-- =========================
-- Define tables
-- =========================
-- NOTE THE 'cl_bridge_stage_db.stage_db.sparkplug_raw' is hardcoded here; as the identifier
-- staged_sparkplug_raw_table replacement does not work.
create or replace view sparkplug_messages_vw
change_tracking = true
comment = 'parses out the core attributes from the message and topic.'
as
select
msg_id
,namespace
,group_id
,message_type
,edge_node_id
,device_id
,parse_json(msg) as message
,message:seq::int as message_sequence
,message:timestamp::number as message_timestamp
,inserted_at
from cl_bridge_stage_db.stage_db.sparkplug_raw
;
-- -- >>>>>>>>>>>>>>>>>>>>>>
create or replace view nbirth_vw
change_tracking = true
comment = 'filtered to nbirth messages. This is a mirror'
as
select
group_id ,edge_node_id
from sparkplug_messages_vw
where message_type = 'NBIRTH'
;
create or replace view node_machine_registry_vw
comment = 'Used to retreive the latest template definitions for a given group and edge_node'
as
with base as (
select
group_id ,edge_node_id
,max_by(message ,message_timestamp) as message
,max(message_timestamp) as latest_message_timestamp
from sparkplug_messages_vw
where message_type = 'NBIRTH'
group by group_id ,edge_node_id
)
select
group_id ,edge_node_id
,f.value as template_definition
,template_definition:name::varchar as machine
,template_definition:reference::varchar as reference
,template_definition:version::varchar as version
,template_definition:timestamp::int as timestamp
from base as b
,lateral flatten (input => b.message:metrics) f
where template_definition:dataType::varchar = 'Template'
;
-- -- >>>>>>>>>>>>>>>>>>>>>>
create or replace view node_birth_death_vw
comment = 'shows the latest node birth & death messages for each device'
as
select
b.* exclude(namespace)
,message_type as nbirth_or_ndeath_raw
,iff((message_type = 'NBIRTH') ,f.value:value ,null)::number as nbirth_bdSeq_raw
,iff((message_type = 'NDEATH') ,f.value:value ,null)::number as ndeath_bdSeq_raw
,inserted_at as nbirth_ndeath_inserted_at_raw
from sparkplug_messages_vw as b
,lateral flatten (input => b.message:metrics) as f
where message_type in ('NBIRTH' ,'NDEATH')
and f.value:name::varchar = 'bdSeq'
;
create or replace view device_records_vw
change_tracking = true
as
select
b.* exclude(namespace)
,null as nbirth_or_ndeath_raw
,null as nbirth_bdSeq_raw
,null as ndeath_bdSeq_raw
,null as nbirth_ndeath_inserted_at_raw
from sparkplug_messages_vw as b
where message_type in ('DBIRTH' ,'DDATA')
;
create or replace stream device_records_stream
on view device_records_vw
show_initial_rows = true
comment = 'used for monitoring latest device messages'
;
create or replace view sparkplug_msgs_nodebirth_contextualized_vw
as
with device_node_unioned as (
select *
from node_birth_death_vw
union all
select * exclude(METADATA$ROW_ID ,METADATA$ACTION ,METADATA$ISUPDATE)
from device_records_stream
)
select
-- group_id ,message_type ,edge_node_id ,device_id
-- ,message ,message_sequence ,inserted_at
* exclude(nbirth_or_ndeath_raw ,nbirth_bdSeq_raw ,ndeath_bdSeq_raw ,nbirth_ndeath_inserted_at_raw)
,nvl(nbirth_or_ndeath_raw
,lag(nbirth_or_ndeath_raw) ignore nulls over (order by inserted_at ,message_sequence)
) as nbirth_or_ndeath
,nvl(nbirth_bdSeq_raw
,lag(nbirth_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence)
) as nbirth_bdSeq
,nvl(ndeath_bdSeq_raw
,lag(ndeath_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence)
) as ndeath_bdSeq
,nvl(nbirth_ndeath_inserted_at_raw
,lag(nbirth_ndeath_inserted_at_raw) ignore nulls over (order by inserted_at ,message_sequence)
) as nbirth_ndeath_inserted_at
,case true
when (nbirth_or_ndeath = 'NBIRTH') then false
when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq != ndeath_bdSeq) ) then false
when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq = ndeath_bdSeq) ) then true
else true
end as is_last_known_good_reading
,case lower(message_type)
when lower('NBIRTH') then 1
when lower('DBIRTH') then 2
when lower('DDATA') then 3
when lower('DDEATH') then 4
when lower('NDEATH') then 5
else 99
end as message_type_order
,(nbirth_or_ndeath = 'NBIRTH') as is_node_alive
from device_node_unioned
;
create or replace view sparkplug_messages_flattened_vw
as
with base as (
select
-- sparkplugb message level
msg_id ,group_id, edge_node_id ,device_id ,message_type
,message_sequence ,inserted_at
,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
,nbirth_ndeath_inserted_at ,is_last_known_good_reading
,message_type_order ,is_node_alive
,message_timestamp as root_message_timestamp
-- attributes related to device data (ddata / dbirth)
,f.value:name::varchar as device_name
,f.value:value:reference::varchar as template_reference
,f.value:value:version::varchar as template_version
,f.value:timestamp::number as device_metric_timestamp
,f.value as ddata_msg
-- attributes related to device level metrics
,concat(msg_id ,'^' ,f.index ,'::',d.index) as device_measure_uuid
,d.value:name::varchar as measure_name
,d.value:value as measure_value
,d.value:timestamp::number as measure_timestamp
from sparkplug_msgs_nodebirth_contextualized_vw as b
,lateral flatten(input => b.message:metrics) as f
,lateral flatten(input => f.value:value:metrics) as d
where message_type in ('DBIRTH' ,'DDATA')
and template_reference is not null
)
select
group_id, edge_node_id ,device_id ,message_type
,message_sequence ,inserted_at
,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
,nbirth_ndeath_inserted_at ,is_last_known_good_reading
,message_type_order ,is_node_alive ,root_message_timestamp
,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
,null as is_historical
,device_measure_uuid
,object_agg(distinct measure_name ,measure_value) as measures_info
,measure_timestamp
,to_timestamp(measure_timestamp/1000) as measure_ts
,to_date(measure_ts) as measure_date
,hour(measure_ts) as measure_hour
from base
group by group_id, edge_node_id ,device_id ,message_type
,message_sequence ,inserted_at
,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
,nbirth_ndeath_inserted_at ,is_last_known_good_reading
,message_type_order ,is_node_alive ,root_message_timestamp
,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
,is_historical ,device_measure_uuid
,measure_timestamp
;
create or replace transient table sparkplug_device_messages (
group_id varchar
,edge_node_id varchar
,device_id varchar
,message_type varchar
,message_sequence number
,inserted_at number
,nbirth_or_ndeath varchar
,nbirth_bdseq number
,ndeath_bdseq number
,nbirth_ndeath_inserted_at number
,is_last_known_good_reading boolean
,message_type_order number
,is_node_alive boolean
,root_message_timestamp number
,device_name varchar
,template_reference varchar
,template_version varchar
,device_metric_timestamp number
,ddata_msg variant
,is_historical boolean
,device_measure_uuid varchar
,measures_info variant
,measure_timestamp number
,measure_ts timestamp
,measure_date date
,measure_hour number
)
cluster by ( group_id ,edge_node_id ,device_id
,template_reference ,template_version ,device_name
,measure_date ,measure_hour)
comment = 'materialized version of the sparkplug_messages_flattened_vw for easier downstream pipelines.'
;
-- -- >>>>>>>>>>>>>>>>>>>>>>
-- ================
-- NODE BIRTH related assets
-- ================
create or replace stream nbirth_stream
on view nbirth_vw
show_initial_rows = true
comment = 'stream to monitor for nbirth messages, so that assets are created automatically'
;
- Expected Result: Stream NBIRTH_STREAM successfully created.
Script 03
SQL Script 03 Expand source
- Expected Result: Function GENERATE_DEVICE_ASOF_VIEW_DDL successfully created.
Script 04
SQL Script 04 Expand source
- Expected Result: Function CREATE_EDGE_NODE_SCHEMA successfully created.
Script 05
SQL Script 05 Expand source
- Expected Result: Function CREATE_ALL_EDGE_NODE_SCHEMAS successfully created.
Script 10
SQL Script 10 Expand source
- Expected Result: Statement executed successfully.
Script 11
SQL Script 11 Expand source
- Expected Result: Statement executed successfully.
After all of the scripts have successfully executed, create a new user in Snowflake. This user will be used by IoT Bridge for Snowflake to push data into Snowflake. In the Snowflake Web UI, go to Admin → Users & Roles and then click '+ User' in the upper right hand corner. Give it a username of your choice and a secure password as shown below. For this example we're calling the user IBSNOW_INGEST so we know this user is for ingest purposes. See below for an example and then click 'Create User'.
![](/download/attachments/154796084/image2023-4-20_14-4-2.png?version=2&modificationDate=1715105446184&api=v2)
In addition, the user must have a specific role to be able to stream data into Snowflake. Click the newly created user to see the following.
![](/download/attachments/154796084/image2023-4-21_7-54-15.png?version=1&modificationDate=1688071366646&api=v2)
In the bottom of the center 'Granted Roles' pane you will see this user has no roles. Click 'Grant Role' to set up a new role. Then, select the 'CL_BRIDGE_PROCESS_RL' role and click 'Grant' as shown below.
![](/download/attachments/154796084/image2023-4-21_7-55-54.png?version=1&modificationDate=1688071366668&api=v2)
After this has been done successfully you will see the role now associated with the new user as shown below.
![](/download/attachments/154796084/image2023-4-21_7-57-0.png?version=1&modificationDate=1688071366690&api=v2)
Now a key pair must be generated and uploaded to Snowflake. This will be used for authentication by the IoT Bridge for Snowflake application to push data to Snowflake via the Snowflake Streaming API. See this document for details on how to generate this and assign this to a user in your snowflake account: https://docs.snowflake.com/en/user-guide/key-pair-auth. Step 6 (Configuring the Snowflake Client to User Key Pair Authentication) in the linked tutorial can be skipped. This tutorial will cover configuring IoT Bridge for Snowflake with the generated key. Attach the public key to the user that we just created for Snowflake ingest purposes.
The generated key MUST NOT be encrypted
IoT Bridge Setup
First you will need access to the Azure Virtual Machine via SSH. See this document for information on how to access the VM.
Modify the file /opt/ibsnow/conf/ibsnow.properties file. Set the following:
- mqtt_server_url
- mqtt_server_name
- Give it a meaningful name if desired
- mqtt_username
- The username for the MQTT connection if required
- mqtt_password
- The password for the MQTT connection if required
- mqtt_ca_cert_chain_path
- The path to the Root Certificate if required
- mqtt_client_cert_path
- The path to the Client Certificate if required
- mqtt_client_private_key_path
- The path to the Client Private Key if required
- primary_host_id
- Set it to a text string such as 'IamHost'
- snowflake_streaming_client_name
- Some text string such as 'MY_CLIENT'
- snowflake_streaming_table_name
- This must be 'SPARKPLUG_RAW' based on the scripts we previously used to provision Snowflake
- snowflake_notify_db_name
- This must be 'cl_bridge_node_db' based on the scripts we previously used to provision Snowflake
- snowflake_notify_schema_name
- This must be 'stage_db' based on the scripts we previously used to provision Snowflake
- snowflake_notify_warehouse_name
- This must be 'cl_bridge_ingest_wh' based on the scripts we previously used to provision Snowflake
When complete, it should look similar to what is shown below.
ibsnow.properties
mqtt_server_url = ssl: //55.23.12.33 :8883
mqtt_server_name = Chariot MQTT Server
#mqtt_ca_cert_chain_path =
#mqtt_client_cert_path =
#mqtt_client_private_key_path =
sequence_reordering_timeout = 5000
primary_host_id = IamHost
snowflake_streaming_client_name = MY_CLIENT
snowflake_streaming_table_name = SPARKPLUG_RAW
snowflake_notify_db_name = cl_bridge_node_db
snowflake_notify_schema_name = stage_db
snowflake_notify_warehouse_name = cl_bridge_ingest_wh
snowflake_notify_task_enabled = true
snowflake_notify_nbirth_task_delay = 10000
snowflake_notify_data_task_delay = 5000
|
Now, modify the file /opt/ibsnow/conf/snowflake_streaming_profile.json file. Set the following:
- user
- This must be 'IBSNOW_INGEST' based on the user we provisioned in Snowflake earlier in this tutorial
- url
- Replace 'ACCOUNT_ID' with your Snowflake account id. Leave the other parts of the URL the same.
- account
- Replace 'ACCOUNT_ID' with your Snowflake account id
- private_key
- Replace with the text string that is the private key you generated earlier in this tutorial
- host
- Replace 'ACCOUNT_ID' with your Snowflake account id. Leave the other parts of the hostname the same.
- schema
- Set this to 'stage_db' based on the scripts we previously used to provision Snowflake
- database
- Set this to 'cl_bridge_stage_db' based on the scripts we previously used to provision Snowflake
- connect_string
- Replace 'ACCOUNT_ID' with your Snowflake account id. Leave the other parts of the connection string the same.
- warehouse
- Set this to 'cl_bridge_ingest_wh' based on the scripts we previously used to provision Snowflake
- role
- Set this to 'cl_bridge_process_rl' based on the scripts we previously used to provision Snowflake
When complete, it should look similar to what is shown below.
snowflake_streaming_profile.json
{
"user" : "IBSNOW_INGEST" ,
"url" : "https://RBC48284.snowflakecomputing.com:443" ,
"account" : "RBC48284" ,
"private_key" : "MIIEwAIBADANBgkqhkiG9w0BAQEFAASCBKowggSmAgEAAoIBAQDN6NOoaoVVZSz/AIUohNn9oJThwDZg2/qASsIRYFjy0pSNKh+XsG6yp4kteII900lEgt5koroU+8oQrX7vnTI/69mvc5o+xJBfGogd+qcdw9tEEUZHEfBxBtlpZvfMY/HHyrilQBvWVrFqB3hYt9n15lE/wVi1LDII378yh2p+QcwEaKhKD1aWBYUlpOoA0d2214/UQiU6ytI18jJNPN3yQv9Jx3+/DRldlYh5fLIQ0AWbBqRnQoyLvLaYRIgynxDhrQpVtw8QN2M/XQErT3OxZzti7CKeI9M4xLchO3VZozsde5kcQwCIcop05PX6dtdSsqheQBRrhytf1K9GnfGdAgMBAAECggEBAKO8auLXoaMgS0GTlk98JSRL11gU0qj/BBmUWPIcXV7qGPqP7oNe5wfltW2VEGw9YVu7fUElLTeWaT4N2IyNwfGWiIm+MX+MKwmVPXwpX06J+ggMfIfzOfGG8sef+5hqOU8YYu/1JK2yTm3z9r0FpaqmNSGvi+y1ciwgUBfMGuC93pySQCHuNXkWw2njxvaltpOSm9H08aQtXsA+1JL31kP4WZAexk2EqzRzEka8hrGYfNIs9qQimKI9XznNoqmlSN6ZIO+A5e+kSUg0viyY/cZLwVj5FYV/wKN49WDDkB3dthCbx1Z1VuIw7rub9VU609eoTXDxEgMMBEDqbE5BXkECgYEA/ZsySGO2QouCihpPp0VtNrNh7PhA/OLch5zZt2iJBMhbjPn4SAyrzgi6lQc7b4oZOznOK5jOL4NQijt7SGz8rfrwfTd5Rl84lHlN7Cu3V0lBC8IN6JcXWBuTedmF7ShlL5ATbpXTsgaaqPm3H8VCS4fkoQ54bDZnCjI5/GtI5OkCgYEAz9pgvqXCXyJQxj5bM0uihZV3lZzvwpqlEuT0GvT9XqHM+LNKtf2kQ958qRq5Nh381oeRLyVbZTFrr0eNCCEA5YesbmxE7d/5vlWszfW5e70TUJEWbk0rrGNmqVUlAfEZKfK6ms87W3peHqqMyXqnmMwwecMl2c013XZaLKYxRpUCgYEAnWgHdKDXDkSTGG6uQ882sz3xqOiJRaz1XgK/qzPp35sQH9dDAE1FEZOfY0Ji5J8dfAIr8ilcyGbDxZiXs2NaDg5z1/RnhIMzlgwYjl6v5DBmfArNITEuXxR2m6mkk4eADl5pgTjjdVreAcVEoSaJOGI3SLO3kMrPd6enEAHy84kCgYEAsL2BjDtI1zpHsvqs9CY5URuybt7epPx4p2NWCmIN3Fz6/PL/8VZ3SlqyZ9zYZqMDLqxiENPULmzio03VJ3dg2swOHGsmBZtxMp6JbSyoBwbUmKp2h15JZ7GyRwSmjksj2Z6TfDYAxB1+UNc3Fc+dGXlvMup0kgpD5kfQD61Vsy0CgYEAn9QCQG+lcPG5GXXu3EAeVzqgy+gOXpyd4ys0fdssFF93AM+/Dd9F31sSSfdasEQ8+jFKjunEeQAOiecVQA4Vu9GGQAykrK/m8nD0zf02L1QpADTBA8TymkpD1yFEMo+T5DrZ24SRCl/zREb0hLn//ZOA=" ,
"port" : 443,
"host" : "RBC48284.snowflakecomputing.com" ,
"schema" : "stage_db" ,
"scheme" : "https" ,
"database" : "cl_bridge_stage_db" ,
"connect_string" : "jdbc:snowflake://RBC48284.snowflakecomputing.com:443" ,
"ssl" : "on" ,
"warehouse" : "cl_bridge_ingest_wh" ,
"role" : "cl_bridge_process_rl"
}
|
Now the service can be restarted to pick up the new configuration. Do so by running the following command.
sudo /etc/init .d /ibsnow restart
|
At this point, IBSNOW should connect to the MQTT Server and be ready to receive MQTT Sparkplug messages. Verify by running the following command.
tail -f /opt/ibsnow/log/wrapper .log
|
After doing so, you should see something similar to what is shown below. Note the last line is 'MQTT Client connected to ...'. That denotes we have successfully configured IBSNOW and properly provisioned MQTT Server.
INFO|7263/0||23-06-29 20:19:32|20:19:32.932 [Thread-2] INFO org.eclipse.tahu.mqtt.TahuClient - IBSNOW-8bc00095-9265-41: Creating the MQTT Client to tcp://54.236.16.39:1883 on thread Thread-2 INFO|7263/0||23-06-29 20:19:33|20:19:33.275 [MQTT Call: IBSNOW-8bc00095-9265-41] INFO org.eclipse.tahu.mqtt.TahuClient - IBSNOW-8bc00095-9265-41: connect with retry succeeded INFO|7263/0||23-06-29 20:19:33|20:19:33.280 [MQTT Call: IBSNOW-8bc00095-9265-41] INFO org.eclipse.tahu.mqtt.TahuClient - IBSNOW-8bc00095-9265-41: Connected to tcp://54.236.16.39:1883 INFO|7263/0||23-06-29 20:19:33|20:19:33.294 [MQTT Call: IBSNOW-8bc00095-9265-41] INFO o.eclipse.tahu.host.TahuHostCallback - This is a offline STATE message from IamHost - correcting with new online STATE message FINEST|7263/0||23-06-29 20:19:33|20:19:33.297 [MQTT Call: IBSNOW-8bc00095-9265-41] INFO o.eclipse.tahu.host.TahuHostCallback - This is a offline STATE message from IamHost - correcting with new online STATE message FINEST|7263/0||23-06-29 20:19:33|20:19:33.957 [Thread-2] INFO org.eclipse.tahu.mqtt.TahuClient - IBSNOW-8bc00095-9265-41: MQTT Client connected to tcp://54.236.16.39:1883 on thread Thread-2 |
Edge Setup with Ignition and MQTT Transmission
At this point IoT Bridge is configured and ready to receive data. To get data flowing into IBSNOW we'll set up Inductive Automation's Ignition platform along with the MQTT Transmission module from Cirrus Link. Begin by downloading Ignition here.
https://inductiveautomation.com/downloads
Installation of Ignition is very straightforward and fast. There is a guide to do so here.
https://docs.inductiveautomation.com/display/DOC80/Installing+and+Upgrading+Ignition
With Ignition installed, MQTT Transmission must be installed as well as a plugin to Ignition. Get MQTT Transmission for your version of Ignition here.
https://inductiveautomation.com/downloads/third-party-modules
Now use the procedure below to install the MQTT Transmission module.
https://docs.inductiveautomation.com/display/DOC80/Installing+or+Upgrading+a+Module
With Ignition and MQTT Transmission installed, we can configure the MQTT Transmission module to connect to Chariot MQTT Server that we provisioned earlier. Begin by clicking 'Get Desginer' in the upper right hand corner of the Ignition Gateway Web UI as shown below.
![](/download/attachments/154796084/image2020-10-19_16-51-46.png?version=1&modificationDate=1688071366735&api=v2)
Now launch the Ignition Designer using the Designer Launcher as shown below.
![](/download/attachments/154796084/image2020-10-19_16-53-4.png?version=1&modificationDate=1688071366755&api=v2)
Once it is launched, you should see something similar to what is shown below. Note the Tag Browser has been expanded and the automatically created example tags have been highlighted.
![](/download/attachments/154796084/image2020-10-19_16-50-49.png?version=1&modificationDate=1688071366771&api=v2)
Begin by deleting these two tags (Example Tag and MQTT Quickstart). Then click the 'UDT Definitions' tab as shown below. We will use this view to create a very simple UDT definition.
![](/download/attachments/154796084/image2020-10-19_16-56-33.png?version=1&modificationDate=1688071366792&api=v2)
Now, click the '+' icon in the upper left corner of the tag browser as shown below and select 'New Data Type'
![](/download/attachments/154796084/image2020-10-19_16-58-18.png?version=1&modificationDate=1688071366807&api=v2)
This will open the following dialog box.
![](/download/attachments/154796084/image2020-10-19_16-59-36.png?version=1&modificationDate=1688071366819&api=v2)
Change the name of the tag to Motor as shown below. Also, note the highlighted 'new member tag' icon in the middle of the dialog. We'll use this to create some member tags.
![](/download/attachments/154796084/image2020-10-19_17-1-3.png?version=1&modificationDate=1688071366838&api=v2)
Now use the 'new member tag' button to create a new 'memory tag' as shown below.
![](/download/thumbnails/154796084/image2020-10-19_17-6-56.png?version=1&modificationDate=1688071366853&api=v2)
Then, set the following parameters for the new memory tag.
- Name
- Date Type
- Engineering Units
![](/download/attachments/154796084/image2020-10-19_17-7-46.png?version=1&modificationDate=1688071366867&api=v2)
Now create two additional member tags with the following configuration.
- Amps
- Memory tag
- Data Type = Integer
- RPMs
- Memory tag
- Data Type = Integer
When complete, the UDT definition should look as follows.
![](/download/attachments/154796084/image2020-10-19_17-38-15.png?version=1&modificationDate=1688071366883&api=v2)
Now switch back to the 'Tags' tab of the Tag Browser. Right click on the 'PLC 1' folder and select 'New Tag → Data Type Instance → Motor' as shown below.
![](/download/attachments/154796084/image2020-10-19_17-39-24.png?version=1&modificationDate=1688071366895&api=v2)
Now set the name to 'My Motor' as shown below and click OK.
![](/download/attachments/154796084/image2020-10-19_17-40-56.png?version=1&modificationDate=1688071366906&api=v2)
Now, set some values under the instance as shown below.
![](/download/attachments/154796084/image2020-10-19_17-42-53.png?version=1&modificationDate=1688071366916&api=v2)
At this point, our tags are configured. A UDT definition will map to a model in Snowflake and UDT instances in Ignition will map to Snowflake. But, before this will happen we need to point MQTT Transmission to the Chariot MQTT Server. To do so, browse back to the Ignition Gateway Web UI and select MQTT Transmission → Settings from the left navigation panel as shown below.
![](/download/attachments/154796084/image2020-10-19_17-45-16.png?version=1&modificationDate=1688071366928&api=v2)
Now select the 'Transmitters' tab as shown below.
![](/download/attachments/154796084/image2020-10-19_17-55-36.png?version=1&modificationDate=1688071366942&api=v2)
Now click the 'edit' button to the right of the 'Example Transmitter'. Scroll down to the 'Convert UDTs' option and uncheck it as shown below. This will also un-grey the 'Publish UDT Defintions' option. Leave it selected as shown below.
![](/download/attachments/154796084/image2020-10-19_17-56-27.png?version=1&modificationDate=1688071366970&api=v2)
Now switch to the 'Servers' and 'Settings' tab. Delete the existing 'Chariot SCADA' pre-seeded MQTT Server Definition. Then create a new one with the following configuration.
- Name
- URL
- Username
- Your username for the Chariot MQTT Server connection
- Password
- Your password for the Chariot MQTT Server connection
When complete, you should see something similar to the following. However, the 'Connected' state should show '1 of 1' if everything was configured properly.
![](/download/attachments/154796084/image2023-6-29_14-23-6.png?version=1&modificationDate=1688073786859&api=v2)
At this point, data should be flowing into Snowflake. By tailing the log in IBSNOW you should see something similar to what is shown below. This shows IBSNOW receiving the messages published from Ignition/MQTT Transmission. When IBSNOW receives the Sparkplug MQTT messages, it creates and updates asset models and assets in Snowflake. The log below is also a useful debugging tool if things don't appear to work as they should.
Successful Insert
FINEST|199857 /0 ||23-04-21 15:46:22|15:46:22.951 [TahuHostCallback--3deac7a5] INFO o.e.tahu.host.TahuPayloadHandler - Handling NBIRTH from My MQTT Group /Edge Node ee38b1
FINEST|199857 /0 ||23-04-21 15:46:22|15:46:22.953 [TahuHostCallback--3deac7a5] INFO o.e.t.host.manager.SparkplugEdgeNode - Edge Node My MQTT Group /Edge Node ee38b1 set online at Fri Apr 21 15:46:22 UTC 2023
FINEST|199857 /0 ||23-04-21 15:46:23|15:46:23.072 [TahuHostCallback--3deac7a5] INFO o.e.tahu.host.TahuPayloadHandler - Handling DBIRTH from My MQTT Group /Edge Node ee38b1 /PLC 1
FINEST|199857 /0 ||23-04-21 15:46:23|15:46:23.075 [TahuHostCallback--3deac7a5] INFO o.e.t.host.manager.SparkplugDevice - Device My MQTT Group /Edge Node ee38b1 /PLC 1 set online at Fri Apr 21 15:46:22 UTC 2023
FINEST|199857 /0 ||23-04-21 15:46:23|15:46:23.759 [ingest-flush-thread] INFO n.s.i.s.internal.FlushService - [SF_INGEST] buildAndUpload task added for client=MY_CLIENT, blob=2023 /4/21/15/46/rth2hb_eSKU3AAtxudYKnPFztPjrokzP29ZXzv5JFbbj0YUnqUUCC_1049_48_1 .bdec, buildUploadWorkers stats=java.util.concurrent.ThreadPoolExecutor@32321763[Running, pool size = 2, active threads = 1, queued tasks = 0, completed tasks = 1]
FINEST|199857 /0 ||23-04-21 15:46:23|15:46:23.774 [ingest-build-upload-thread-1] INFO n.s.i.i.a.h.io.compress.CodecPool - Got brand-new compressor [.gz]
FINEST|199857 /0 ||23-04-21 15:46:23|15:46:23.822 [ingest-build-upload-thread-1] INFO n.s.i.streaming.internal.BlobBuilder - [SF_INGEST] Finish building chunk in blob=2023 /4/21/15/46/rth2hb_eSKU3AAtxudYKnPFztPjrokzP29ZXzv5JFbbj0YUnqUUCC_1049_48_1 .bdec, table=CL_BRIDGE_STAGE_DB.STAGE_DB.SPARKPLUG_RAW, rowCount=2, startOffset=0, uncompressedSize=5888, compressedChunkLength=5872, encryptedCompressedSize=5888, bdecVersion=THREE
FINEST|199857 /0 ||23-04-21 15:46:23|15:46:23.839 [ingest-build-upload-thread-1] INFO n.s.i.s.internal.FlushService - [SF_INGEST] Start uploading file =2023 /4/21/15/46/rth2hb_eSKU3AAtxudYKnPFztPjrokzP29ZXzv5JFbbj0YUnqUUCC_1049_48_1 .bdec, size=5888
FINEST|199857 /0 ||23-04-21 15:46:24|15:46:24.132 [ingest-build-upload-thread-1] INFO n.s.i.s.internal.FlushService - [SF_INGEST] Finish uploading file =2023 /4/21/15/46/rth2hb_eSKU3AAtxudYKnPFztPjrokzP29ZXzv5JFbbj0YUnqUUCC_1049_48_1 .bdec, size=5888, timeInMillis=292
FINEST|199857 /0 ||23-04-21 15:46:24|15:46:24.148 [ingest-register-thread] INFO n.s.i.s.internal.RegisterService - [SF_INGEST] Start registering blobs in client=MY_CLIENT, totalBlobListSize=1, currentBlobListSize=1, idx=1
FINEST|199857 /0 ||23-04-21 15:46:24|15:46:24.148 [ingest-register-thread] INFO n.s.i.s.i.SnowflakeStreamingIngestClientInternal - [SF_INGEST] Register blob request preparing for blob=[2023 /4/21/15/46/rth2hb_eSKU3AAtxudYKnPFztPjrokzP29ZXzv5JFbbj0YUnqUUCC_1049_48_1 .bdec], client=MY_CLIENT, executionCount=0
FINEST|199857 /0 ||23-04-21 15:46:24|15:46:24.301 [ingest-register-thread] INFO n.s.i.s.i.SnowflakeStreamingIngestClientInternal - [SF_INGEST] Register blob request returned for blob=[2023 /4/21/15/46/rth2hb_eSKU3AAtxudYKnPFztPjrokzP29ZXzv5JFbbj0YUnqUUCC_1049_48_1 .bdec], client=MY_CLIENT, executionCount=0
|
Data will also be visible in Snowflake at this point. See below for an example. By changing data values in the UDT tags in Ignition DDATA Sparkplug messages will be produced. Every time the Edge Node connects, it will produce NBIRTH and DBIRTH messages. All of these will now appear in Snowflake with their values, timestamps, and qualities
![](/download/attachments/154796084/image2023-4-21_9-23-6.png?version=1&modificationDate=1688071367029&api=v2)
Additional Resources