Contents
Cirrus Link Resources
Cirrus Link Website
Contact Us (Sales/Support)
Forum
Cirrus Link Modules Docs for Ignition 7.9.x
Inductive Resources
Ignition User Manual
Knowledge Base Articles
Inductive University
Forum
...
Set up assets related to the staging database and associated assets.These These are:
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
-- ========================= -- In this script, we are setting up assets related to the staging database -- and associated assets. These are: -- - Database -- - Staging schema -- The database & schema will be owned by SYSADMIN -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT -- ========================= set cl_bridge_staging_db = 'CL_BRIDGE_STAGE_DB'; set staging_schema = 'stage_db'; -- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>> use role sysadmin; create database if not exists identifier($cl_bridge_staging_db) -- DATA_RETENTION_TIME_IN_DAYS = 90 -- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90 comment = 'used for storing messages received from CirrusLink Bridge' ; -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>> use database identifier($cl_bridge_staging_db); create schema if not exists identifier($staging_schema) with managed access -- data_retention_time_in_days = 90 -- max_data_extension_time_in_days = 90 comment = 'Used for staging data direct from CirrusLink Bridge'; -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>> use schema identifier($staging_schema); -- ========================= -- Define tables -- ========================= create or replace table sparkplug_raw ( msg_id varchar ,msg_topic varchar ,namespace varchar ,group_id varchar ,message_type varchar ,edge_node_id varchar ,device_id varchar ,msg variant ,inserted_at number ) change_tracking = true cluster by (message_type ,group_id ,edge_node_id ,device_id) comment = 'Used for storing json messages from sparkplug bridge/gateway' ; |
...
Set up assets related to the node database which would eventually contain all the device specific views and tables. The following assets These are created:
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
-- ========================= -- In this script, we are setting up assets related to the node database -- ,which would eventually contain all the device specific views and tables. -- At the very core, the following assets are created: -- - Node Database -- - Staging schema -- The database & schema will be owned by SYSADMIN -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT -- ========================= set staged_sparkplug_raw_table = 'cl_bridge_stage_db.stage_db.sparkplug_raw'; set cl_bridge_node_db = 'cl_bridge_node_db'; set staging_schema = 'stage_db'; -- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>> use role sysadmin; create database if not exists identifier($cl_bridge_node_db) -- DATA_RETENTION_TIME_IN_DAYS = 90 -- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90 comment = 'used for storing flattened messages processed from the staging database' ; -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>> use database identifier($cl_bridge_node_db); create schema if not exists identifier($staging_schema) with managed access -- data_retention_time_in_days = 90 -- max_data_extension_time_in_days = 90 comment = 'used for storing flattened messages processed from the staging database'; -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>> use schema identifier($staging_schema); -- ========================= -- Define tables -- ========================= -- NOTE THE 'cl_bridge_stage_db.stage_db.sparkplug_raw' is hardcoded here; as the identifier -- staged_sparkplug_raw_table replacement does not work. create or replace view sparkplug_messages_vw change_tracking = true comment = 'parses out the core attributes from the message and topic.' as select msg_id ,namespace ,group_id ,message_type ,edge_node_id ,device_id ,parse_json(msg) as message ,message:seq::int as message_sequence ,message:timestamp::number as message_timestamp ,inserted_at from cl_bridge_stage_db.stage_db.sparkplug_raw ; -- -- >>>>>>>>>>>>>>>>>>>>>> create or replace view nbirth_vw change_tracking = true comment = 'filtered to nbirth messages. This is a mirror' as select group_id ,edge_node_id from sparkplug_messages_vw where message_type = 'NBIRTH' ; create or replace view node_machine_registry_vw comment = 'Used to retreive the latest template definitions for a given group and edge_node' as with base as ( select group_id ,edge_node_id ,max_by(message ,message_timestamp) as message ,max(message_timestamp) as latest_message_timestamp from sparkplug_messages_vw where message_type = 'NBIRTH' group by group_id ,edge_node_id ) select group_id ,edge_node_id ,f.value as template_definition ,template_definition:name::varchar as machine ,template_definition:reference::varchar as reference ,template_definition:version::varchar as version ,template_definition:timestamp::int as timestamp from base as b ,lateral flatten (input => b.message:metrics) f where template_definition:dataType::varchar = 'Template' ; -- -- >>>>>>>>>>>>>>>>>>>>>> create or replace view node_birth_death_vw comment = 'shows the latest node birth & death messages for each device' as select b.* exclude(namespace) ,message_type as nbirth_or_ndeath_raw ,iff((message_type = 'NBIRTH') ,f.value:value ,null)::number as nbirth_bdSeq_raw ,iff((message_type = 'NDEATH') ,f.value:value ,null)::number as ndeath_bdSeq_raw ,inserted_at as nbirth_ndeath_inserted_at_raw from sparkplug_messages_vw as b ,lateral flatten (input => b.message:metrics) as f where message_type in ('NBIRTH' ,'NDEATH') and f.value:name::varchar = 'bdSeq' ; create or replace view device_records_vw change_tracking = true as select b.* exclude(namespace) ,null as nbirth_or_ndeath_raw ,null as nbirth_bdSeq_raw ,null as ndeath_bdSeq_raw ,null as nbirth_ndeath_inserted_at_raw from sparkplug_messages_vw as b where message_type in ('DBIRTH' ,'DDATA') ; create or replace stream device_records_stream on view device_records_vw show_initial_rows = true comment = 'used for monitoring latest device messages' ; create or replace view sparkplug_msgs_nodebirth_contextualized_vw as with device_node_unioned as ( select * from node_birth_death_vw union all select * exclude(METADATA$ROW_ID ,METADATA$ACTION ,METADATA$ISUPDATE) from device_records_stream ) select -- group_id ,message_type ,edge_node_id ,device_id -- ,message ,message_sequence ,inserted_at * exclude(nbirth_or_ndeath_raw ,nbirth_bdSeq_raw ,ndeath_bdSeq_raw ,nbirth_ndeath_inserted_at_raw) ,nvl(nbirth_or_ndeath_raw ,lag(nbirth_or_ndeath_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as nbirth_or_ndeath ,nvl(nbirth_bdSeq_raw ,lag(nbirth_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as nbirth_bdSeq ,nvl(ndeath_bdSeq_raw ,lag(ndeath_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as ndeath_bdSeq ,nvl(nbirth_ndeath_inserted_at_raw ,lag(nbirth_ndeath_inserted_at_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as nbirth_ndeath_inserted_at ,case true when (nbirth_or_ndeath = 'NBIRTH') then false when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq != ndeath_bdSeq) ) then false when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq = ndeath_bdSeq) ) then true else true end as is_last_known_good_reading ,case lower(message_type) when lower('NBIRTH') then 1 when lower('DBIRTH') then 2 when lower('DDATA') then 3 when lower('DDEATH') then 4 when lower('NDEATH') then 5 else 99 end as message_type_order ,(nbirth_or_ndeath = 'NBIRTH') as is_node_alive from device_node_unioned ; create or replace view sparkplug_messages_flattened_vw as with base as ( select -- sparkplugb message level msg_id ,group_id, edge_node_id ,device_id ,message_type ,message_sequence ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,message_timestamp as root_message_timestamp -- attributes related to device data (ddata / dbirth) ,f.value:name::varchar as device_name ,f.value:value:reference::varchar as template_reference ,f.value:value:version::varchar as template_version ,f.value:timestamp::number as device_metric_timestamp ,f.value as ddata_msg -- attributes related to device level metrics ,concat(msg_id ,'^' ,f.index ,'::',d.index) as device_measure_uuid ,d.value:name::varchar as measure_name ,d.value:value as measure_value ,d.value:timestamp::number as measure_timestamp from sparkplug_msgs_nodebirth_contextualized_vw as b ,lateral flatten(input => b.message:metrics) as f ,lateral flatten(input => f.value:value:metrics) as d where message_type in ('DBIRTH' ,'DDATA') and template_reference is not null ) select group_id, edge_node_id ,device_id ,message_type ,message_sequence ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,root_message_timestamp ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg ,null as is_historical ,device_measure_uuid ,object_agg(distinct measure_name ,measure_value) as measures_info ,measure_timestamp ,to_timestamp(measure_timestamp/1000) as measure_ts ,to_date(measure_ts) as measure_date ,hour(measure_ts) as measure_hour from base group by group_id, edge_node_id ,device_id ,message_type ,message_sequence ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,root_message_timestamp ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg ,is_historical ,device_measure_uuid ,measure_timestamp ; create or replace transient table sparkplug_device_messages ( group_id varchar ,edge_node_id varchar ,device_id varchar ,message_type varchar ,message_sequence number ,inserted_at number ,nbirth_or_ndeath varchar ,nbirth_bdseq number ,ndeath_bdseq number ,nbirth_ndeath_inserted_at number ,is_last_known_good_reading boolean ,message_type_order number ,is_node_alive boolean ,root_message_timestamp number ,device_name varchar ,template_reference varchar ,template_version varchar ,device_metric_timestamp number ,ddata_msg variant ,is_historical boolean ,device_measure_uuid varchar ,measures_info variant ,measure_timestamp number ,measure_ts timestamp ,measure_date date ,measure_hour number ) cluster by ( group_id ,edge_node_id ,device_id ,template_reference ,template_version ,device_name ,measure_date ,measure_hour) comment = 'materialized version of the sparkplug_messages_flattened_vw for easier downstream pipelines.' ; -- -- >>>>>>>>>>>>>>>>>>>>>> -- ================ -- NODE BIRTH related assets -- ================ create or replace stream nbirth_stream on view nbirth_vw show_initial_rows = true comment = 'stream to monitor for nbirth messages, so that assets are created automatically' ; |
...
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
set cl_bridge_node_db = 'cl_bridge_node_db'; set staging_schema = 'stage_db'; set reader_role_warehouse = 'compute_wh'; use role sysadmin; use database identifier($cl_bridge_node_db); use schema identifier($staging_schema); create or replace dynamic table D_ACTIVE_NBIRTH lag = '1 day' warehouse = compute_wh as -- Tabularize the active nbirth events for easier error handling during -- investigations with base as ( select group_id ,edge_node_id ,device_id ,nvl(nbirth_bdseq_raw ,ndeath_bdseq_raw ) as bdseq ,message_type ,to_timestamp(message_timestamp/1000) as MEASURE_TS from NODE_BIRTH_DEATH_VW ), nbirth_ndeath_matched as ( select distinct nb.bdseq from base as nb join base as nd on nb.bdseq = nd.bdseq where nb.message_type = 'NBIRTH' and nd.message_type = 'NDEATH' ) select b.* exclude(measure_ts ,message_type) ,min(measure_ts) as nbirth from base as b where b.bdseq not in (select bdseq from nbirth_ndeath_matched) group by all ; create or replace dynamic table D_DEVICE_HEARTBEATS lag = '1 day' warehouse = compute_wh as -- tabularize the dbirth/data messages for each devices. Meant to be used -- for active investigation on message receipt from devices and error tracking with base as ( select group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type ,MEASURE_TS ,row_number() over ( partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type order by MEASURE_TS desc ) as row_num from SPARKPLUG_DEVICE_MESSAGES ), rows_filtered as ( select * from base where row_num <= 2 ), object_constructed as ( select group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type ,first_value(MEASURE_TS) ignore nulls over ( partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type order by MEASURE_TS desc --ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING ) as latest_message_received_at ,NTH_VALUE( MEASURE_TS , 2 ) FROM FIRST IGNORE NULLS over ( partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type order by MEASURE_TS desc --ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING ) as prev_latest_message_received_at ,timestampdiff('second' ,prev_latest_message_received_at ,latest_message_received_at) as interval_time ,last_value(MEASURE_TS) ignore nulls over ( partition by group_id ,edge_node_id ,device_id ,message_type order by MEASURE_TS desc --ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING ) as oldest_message_received_at ,object_construct( 'latest_message_received_at' ,latest_message_received_at ,'prev_latest_message_received_at' ,prev_latest_message_received_at ,'interval_seconds',interval_time ,'oldest_message_received_at',oldest_message_received_at ) as obj from rows_filtered ), unioned as ( select group_id ,edge_node_id ,device_id ,DEVICE_NAME ,null as dbirth ,obj as ddata from object_constructed where message_type = 'DDATA' union select group_id ,edge_node_id ,device_id ,DEVICE_NAME ,obj as dbirth ,null as ddata from object_constructed where message_type = 'DBIRTH' ) select group_id ,edge_node_id ,device_id ,DEVICE_NAME ,first_value(ddata) ignore nulls over (partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME order by ddata) as ddata ,first_value(dbirth) ignore nulls over (partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME order by dbirth) as dbirth from unioned ; |
Setup roles specifically requiring help of privileged roles like SYSADMIN. These are:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
-- ========================= -- In this script, we are setting up roles specifically requiring help of -- privlilleged roles like SYSADMIN. These are: -- - Create a custom role -- - Assign the custom role to create task and execute task -- - Create warehouse specifically used for ingestion -- - Grants -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT -- ========================= set processor_role = 'cl_bridge_process_rl'; set cl_bridge_ingestion_warehouse = 'cl_bridge_ingest_wh'; set staging_db = 'cl_bridge_stage_db'; set staging_db_schema = 'cl_bridge_stage_db.stage_db'; set node_db = 'cl_bridge_node_db'; set node_db_staging_schema = 'cl_bridge_node_db.stage_db'; -- >>>>>>>>>>>>>>>>>>>>>>>>>>> ROLE CREATION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use role securityadmin; create role if not exists identifier($processor_role) comment = 'role used by cirruslink bridge to ingest and process mqtt/sparkplug data'; grant role identifier($processor_role) to role sysadmin; -- >>>>>>>>>>>>>>>>>>>>>>>>>>> WAREHOUSE >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use role sysadmin; create warehouse if not exists identifier($cl_bridge_ingestion_warehouse) with warehouse_type = standard warehouse_size = xsmall --max_cluster_count = 5 initially_suspended = true comment = 'used by cirruslink bridge to ingest' ; grant usage on warehouse identifier($cl_bridge_ingestion_warehouse) to role identifier($processor_role); grant operate on warehouse identifier($cl_bridge_ingestion_warehouse) to role identifier($processor_role); -- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS STAGE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> grant usage on database identifier($staging_db) to role identifier($processor_role); grant usage on schema identifier($staging_db_schema) to role identifier($processor_role); grant select on all tables in schema identifier($staging_db_schema) to role identifier($processor_role); grant insert on all tables in schema identifier($staging_db_schema) to role identifier($processor_role); grant select on future tables in schema identifier($staging_db_schema) to role identifier($processor_role); grant insert on future tables in schema identifier($staging_db_schema) to role identifier($processor_role); -- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS NODE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> grant usage on database identifier($node_db) to role identifier($processor_role); grant usage on schema identifier($node_db_staging_schema) to role identifier($processor_role); grant select on all tables in schema identifier($node_db_staging_schema) to role identifier($processor_role); grant insert on all tables in schema identifier($node_db_staging_schema) to role identifier($processor_role); grant select on all views in schema identifier($node_db_staging_schema) to role identifier($processor_role); grant usage on all functions in schema identifier($node_db_staging_schema) to role identifier($processor_role); grant usage on all procedures in schema identifier($node_db_staging_schema) to role identifier($processor_role); -- need for: -- - creating edge node specific tables and views grant create schema on database identifier($node_db) to role identifier($processor_role); |
Setup roles specifically requiring help of privileged roles like SYSADMIN, SECURITYADMIN, ACCOUNTADMIN. These are:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
-- ========================= -- In this script, we are setting up roles specifically requiring help of -- privlilleged roles like SYSADMIN, SECURITYADMIN, ACCOUNTADMIN. These are: -- - Create a custom role -- - Assign the custom role to create task and execute task -- - Grants -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT -- ========================= set reader_role = 'cl_bridge_reader_rl'; set reader_role_warehouse = 'compute_wh'; set staging_db = 'cl_bridge_stage_db'; set staging_db_schema = 'cl_bridge_stage_db.stage_db'; set node_db = 'cl_bridge_node_db'; set node_db_staging_schema = 'cl_bridge_node_db.stage_db'; -- >>>>>>>>>>>>>>>>>>>>>>>>>>> ROLE CREATION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use role securityadmin; create role if not exists identifier($reader_role) comment = 'role used by user/process to query and operate on the views and tables managed by cirruslink bridge'; grant role identifier($reader_role) to role sysadmin; -- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS WAREHOUSE >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use role accountadmin; grant usage on warehouse identifier($reader_role_warehouse) to role sysadmin with grant option; grant modify on warehouse identifier($reader_role_warehouse) to role sysadmin with grant option; grant operate on warehouse identifier($reader_role_warehouse) to role sysadmin with grant option; use role sysadmin; grant usage on warehouse identifier($reader_role_warehouse) to role identifier($reader_role); grant modify on warehouse identifier($reader_role_warehouse) to role identifier($reader_role); grant operate on warehouse identifier($reader_role_warehouse) to role identifier($reader_role); -- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS TASK EXECUTION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> -- Creating and executing tasks require exculated privileges that can be done -- only by the accountadmin. Hence we have to switch roles -- use role accountadmin; -- grant execute managed task on account to role identifier($reader_role); -- grant execute task on account to role identifier($reader_role); -- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS STAGE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> grant usage on database identifier($staging_db) to role identifier($reader_role); grant usage on schema identifier($staging_db_schema) to role identifier($reader_role); grant select on all tables in schema identifier($staging_db_schema) to role identifier($reader_role); grant select on future tables in schema identifier($staging_db_schema) to role identifier($reader_role); -- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS NODE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> grant usage on database identifier($node_db) to role identifier($reader_role); grant usage on schema identifier($node_db_staging_schema) to role identifier($reader_role); grant select on all tables in schema identifier($node_db_staging_schema) to role identifier($reader_role); grant select on all views in schema identifier($node_db_staging_schema) to role identifier($reader_role); grant usage on all functions in schema identifier($node_db_staging_schema) to role identifier($reader_role); grant usage on all procedures in schema identifier($node_db_staging_schema) to role identifier($reader_role); -- need for: -- - new generated specific tables and views use role securityadmin; grant usage on future schemas in database identifier($node_db) to role identifier($reader_role); |
...