Contents
Cirrus Link Resources
Cirrus Link Website
Contact Us (Sales/Support)
Forum
Cirrus Link Modules Docs for Ignition 7.9.x
Inductive Resources
Ignition User Manual
Knowledge Base Articles
Inductive University
Forum
...
Script 02
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
-- ========================= -- In this script, we are setting up assets related to the node database -- ,which would eventually contain all the device specific views and tables. -- At the very core, the following assets are created: -- - Node Database -- - Staging schema -- The database & schema will be owned by SYSADMIN -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT -- ========================= set staged_sparkplug_raw_table = 'cl_bridge_stage_db.stage_db.sparkplug_raw'; set cl_bridge_node_db = 'cl_bridge_node_db'; set staging_schema = 'stage_db'; -- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>> use role sysadmin; create database if not exists identifier($cl_bridge_node_db) -- DATA_RETENTION_TIME_IN_DAYS = 90 -- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90 comment = 'used for storing flattened messages processed from the staging database' ; -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>> use database identifier($cl_bridge_node_db); create schema if not exists identifier($staging_schema) with managed access -- data_retention_time_in_days = 90 -- max_data_extension_time_in_days = 90 comment = 'used for storing flattened messages processed from the staging database'; -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>> use schema identifier($staging_schema); -- ========================= -- Define tables -- ========================= -- NOTE THE 'cl_bridge_stage_db.stage_db.sparkplug_raw' is hardcoded here; as the identifier -- staged_sparkplug_raw_table replacement does not work. create or replace view sparkplug_messages_vw change_tracking = true comment = 'parses out the core attributes from the message and topic.' as select msg_id ,namespace ,group_id ,message_type ,edge_node_id ,device_id ,parse_json(msg) as message ,message:seq::int as message_sequence ,message:timestamp::number as message_timestamp ,inserted_at from cl_bridge_stage_db.stage_db.sparkplug_raw ; -- -- >>>>>>>>>>>>>>>>>>>>>> create or replace view nbirth_vw change_tracking = true comment = 'filtered to nbirth messages. This is a mirror' as select group_id ,edge_node_id from sparkplug_messages_vw where message_type = 'NBIRTH' ; create or replace view node_machine_registry_vw comment = 'Used to retreive the latest template definitions for a given group and edge_node' as with base as ( select group_id ,edge_node_id ,max_by(message ,message_timestamp) as message ,max(message_timestamp) as latest_message_timestamp from sparkplug_messages_vw where message_type = 'NBIRTH' group by group_id ,edge_node_id ) select group_id ,edge_node_id ,f.value as template_definition ,template_definition:name::varchar as machine ,template_definition:reference::varchar as reference ,template_definition:version::varchar as version ,template_definition:timestamp::int as timestamp from base as b ,lateral flatten (input => b.message:metrics) f where template_definition:dataType::varchar = 'Template' ; -- -- >>>>>>>>>>>>>>>>>>>>>> create or replace view node_birth_death_vw comment = 'shows the latest node birth & death messages for each device' as select b.* exclude(namespace) ,message_type as nbirth_or_ndeath_raw ,iff((message_type = 'NBIRTH') ,f.value:value ,null)::number as nbirth_bdSeq_raw ,iff((message_type = 'NDEATH') ,f.value:value ,null)::number as ndeath_bdSeq_raw ,inserted_at as nbirth_ndeath_inserted_at_raw from sparkplug_messages_vw as b ,lateral flatten (input => b.message:metrics) as f where message_type in ('NBIRTH' ,'NDEATH') and f.value:name::varchar = 'bdSeq' ; create or replace view device_records_vw change_tracking = true as select b.* exclude(namespace) ,null as nbirth_or_ndeath_raw ,null as nbirth_bdSeq_raw ,null as ndeath_bdSeq_raw ,null as nbirth_ndeath_inserted_at_raw from sparkplug_messages_vw as b where message_type in ('DBIRTH' ,'DDATA') ; create or replace stream device_records_stream on view device_records_vw show_initial_rows = true comment = 'used for monitoring latest device messages' ; create or replace view sparkplug_msgs_nodebirth_contextualized_vw as with device_node_unioned as ( select * from node_birth_death_vw union all select * exclude(METADATA$ROW_ID ,METADATA$ACTION ,METADATA$ISUPDATE) from device_records_stream ) select -- group_id ,message_type ,edge_node_id ,device_id -- ,message ,message_sequence ,inserted_at * exclude(nbirth_or_ndeath_raw ,nbirth_bdSeq_raw ,ndeath_bdSeq_raw ,nbirth_ndeath_inserted_at_raw) ,nvl(nbirth_or_ndeath_raw ,lag(nbirth_or_ndeath_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as nbirth_or_ndeath ,nvl(nbirth_bdSeq_raw ,lag(nbirth_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as nbirth_bdSeq ,nvl(ndeath_bdSeq_raw ,lag(ndeath_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as ndeath_bdSeq ,nvl(nbirth_ndeath_inserted_at_raw ,lag(nbirth_ndeath_inserted_at_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as nbirth_ndeath_inserted_at ,case true when (nbirth_or_ndeath = 'NBIRTH') then false when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq != ndeath_bdSeq) ) then false when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq = ndeath_bdSeq) ) then true else true end as is_last_known_good_reading ,case lower(message_type) when lower('NBIRTH') then 1 when lower('DBIRTH') then 2 when lower('DDATA') then 3 when lower('DDEATH') then 4 when lower('NDEATH') then 5 else 99 end as message_type_order ,(nbirth_or_ndeath = 'NBIRTH') as is_node_alive from device_node_unioned ; create or replace view sparkplug_messages_flattened_vw as with base as ( select -- sparkplugb message level msg_id ,group_id, edge_node_id ,device_id ,message_type ,message_sequence ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,message_timestamp as root_message_timestamp -- attributes related to device data (ddata / dbirth) ,f.value:name::varchar as device_name ,f.value:value:reference::varchar as template_reference ,f.value:value:version::varchar as template_version ,f.value:timestamp::number as device_metric_timestamp ,f.value as ddata_msg -- attributes related to device level metrics ,concat(msg_id ,'^' ,f.index ,'::',d.index) as device_measure_uuid ,d.value:name::varchar as measure_name ,d.value:value as measure_value ,d.value:timestamp::number as measure_timestamp from sparkplug_msgs_nodebirth_contextualized_vw as b ,lateral flatten(input => b.message:metrics) as f ,lateral flatten(input => f.value:value:metrics) as d where message_type in ('DBIRTH' ,'DDATA') and template_reference is not null ) select group_id, edge_node_id ,device_id ,message_type ,message_sequence ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,root_message_timestamp ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg ,null as is_historical ,device_measure_uuid ,object_agg(distinct measure_name ,measure_value) as measures_info ,measure_timestamp ,to_timestamp(measure_timestamp/1000) as measure_ts ,to_date(measure_ts) as measure_date ,hour(measure_ts) as measure_hour from base group by group_id, edge_node_id ,device_id ,message_type ,message_sequence ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,root_message_timestamp ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg ,is_historical ,device_measure_uuid ,measure_timestamp ; create or replace transient table sparkplug_device_messages ( group_id varchar ,edge_node_id varchar ,device_id varchar ,message_type varchar ,message_sequence number ,inserted_at number ,nbirth_or_ndeath varchar ,nbirth_bdseq number ,ndeath_bdseq number ,nbirth_ndeath_inserted_at number ,is_last_known_good_reading boolean ,message_type_order number ,is_node_alive boolean ,root_message_timestamp number ,device_name varchar ,template_reference varchar ,template_version varchar ,device_metric_timestamp number ,ddata_msg variant ,is_historical boolean ,device_measure_uuid varchar ,measures_info variant ,measure_timestamp number ,measure_ts timestamp ,measure_date date ,measure_hour number ) cluster by ( group_id ,edge_node_id ,device_id ,template_reference ,template_version ,device_name ,measure_date ,measure_hour) comment = 'materialized version of the sparkplug_messages_flattened_vw for easier downstream pipelines.' ; -- -- >>>>>>>>>>>>>>>>>>>>>> -- ================ -- NODE BIRTH related assets -- ================ create or replace stream nbirth_stream on view nbirth_vw show_initial_rows = true comment = 'stream to monitor for nbirth messages, so that assets are created automatically' ; |
Script 03
SQL Script 03
Script 04
SQL Script 04
Script 05
SQL Script 05
Script 10
SQL Script 10
Script 11
SQL Script 11
...