![]()
Contents
Cirrus Link Resources
Cirrus Link Website![]()
Contact Us (Sales/Support)![]()
Forum![]()
Cirrus Link Modules Docs for Ignition 7.9.x![]()
Inductive Resources
Ignition User Manual![]()
Knowledge Base Articles![]()
Inductive University![]()
Forum![]()
...
Set up assets related to the staging database and associated assets.These These are:
...
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
-- =========================
-- In this script, we are setting up assets related to the staging database
-- and associated assets. These are:
-- - Database
-- - Staging schema
-- The database & schema will be owned by SYSADMIN
-- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
-- =========================
set cl_bridge_staging_db = 'CL_BRIDGE_STAGE_DB';
set staging_schema = 'stage_db';
-- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>>
use role sysadmin;
create database if not exists identifier($cl_bridge_staging_db)
-- DATA_RETENTION_TIME_IN_DAYS = 90
-- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90
comment = 'used for storing messages received from CirrusLink Bridge'
;
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>>
use database identifier($cl_bridge_staging_db);
create schema if not exists identifier($staging_schema)
with managed access
-- data_retention_time_in_days = 90
-- max_data_extension_time_in_days = 90
comment = 'Used for staging data direct from CirrusLink Bridge';
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>>
use schema identifier($staging_schema);
-- =========================
-- Define tables
-- =========================
create or replace table sparkplug_raw (
msg_id varchar
,msg_topic varchar
,namespace varchar
,group_id varchar
,message_type varchar
,edge_node_id varchar
,device_id varchar
,msg variant
,inserted_at number
)
change_tracking = true
cluster by (message_type ,group_id ,edge_node_id ,device_id)
comment = 'Used for storing json messages from sparkplug bridge/gateway'
; |
...
Set up assets related to the node database which would eventually contain all the device specific views and tables. The following assets These are created:
...
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
-- =========================
-- In this script, we are setting up assets related to the node database
-- ,which would eventually contain all the device specific views and tables.
-- At the very core, the following assets are created:
-- - Node Database
-- - Staging schema
-- The database & schema will be owned by SYSADMIN
-- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
-- =========================
set staged_sparkplug_raw_table = 'cl_bridge_stage_db.stage_db.sparkplug_raw';
set cl_bridge_node_db = 'cl_bridge_node_db';
set staging_schema = 'stage_db';
-- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>>
use role sysadmin;
create database if not exists identifier($cl_bridge_node_db)
-- DATA_RETENTION_TIME_IN_DAYS = 90
-- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90
comment = 'used for storing flattened messages processed from the staging database'
;
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>>
use database identifier($cl_bridge_node_db);
create schema if not exists identifier($staging_schema)
with managed access
-- data_retention_time_in_days = 90
-- max_data_extension_time_in_days = 90
comment = 'used for storing flattened messages processed from the staging database';
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>>
use schema identifier($staging_schema);
-- =========================
-- Define tables
-- =========================
-- NOTE THE 'cl_bridge_stage_db.stage_db.sparkplug_raw' is hardcoded here; as the identifier
-- staged_sparkplug_raw_table replacement does not work.
create or replace view sparkplug_messages_vw
change_tracking = true
comment = 'parses out the core attributes from the message and topic.'
as
select
msg_id
,namespace
,group_id
,message_type
,edge_node_id
,device_id
,parse_json(msg) as message
,message:seq::int as message_sequence
,message:timestamp::number as message_timestamp
,inserted_at
from cl_bridge_stage_db.stage_db.sparkplug_raw
;
-- -- >>>>>>>>>>>>>>>>>>>>>>
create or replace view nbirth_vw
change_tracking = true
comment = 'filtered to nbirth messages. This is a mirror'
as
select
group_id ,edge_node_id
from sparkplug_messages_vw
where message_type = 'NBIRTH'
;
create or replace view node_machine_registry_vw
comment = 'Used to retreive the latest template definitions for a given group and edge_node'
as
with base as (
select
group_id ,edge_node_id
,max_by(message ,message_timestamp) as message
,max(message_timestamp) as latest_message_timestamp
from sparkplug_messages_vw
where message_type = 'NBIRTH'
group by group_id ,edge_node_id
)
select
group_id ,edge_node_id
,f.value as template_definition
,template_definition:name::varchar as machine
,template_definition:reference::varchar as reference
,template_definition:version::varchar as version
,template_definition:timestamp::int as timestamp
from base as b
,lateral flatten (input => b.message:metrics) f
where template_definition:dataType::varchar = 'Template'
;
-- -- >>>>>>>>>>>>>>>>>>>>>>
create or replace view node_birth_death_vw
comment = 'shows the latest node birth & death messages for each device'
as
select
b.* exclude(namespace)
,message_type as nbirth_or_ndeath_raw
,iff((message_type = 'NBIRTH') ,f.value:value ,null)::number as nbirth_bdSeq_raw
,iff((message_type = 'NDEATH') ,f.value:value ,null)::number as ndeath_bdSeq_raw
,inserted_at as nbirth_ndeath_inserted_at_raw
from sparkplug_messages_vw as b
,lateral flatten (input => b.message:metrics) as f
where message_type in ('NBIRTH' ,'NDEATH')
and f.value:name::varchar = 'bdSeq'
;
create or replace view device_records_vw
change_tracking = true
as
select
b.* exclude(namespace)
,null as nbirth_or_ndeath_raw
,null as nbirth_bdSeq_raw
,null as ndeath_bdSeq_raw
,null as nbirth_ndeath_inserted_at_raw
from sparkplug_messages_vw as b
where message_type in ('DBIRTH' ,'DDATA')
;
create or replace stream device_records_stream
on view device_records_vw
show_initial_rows = true
comment = 'used for monitoring latest device messages'
;
create or replace view sparkplug_msgs_nodebirth_contextualized_vw
as
with device_node_unioned as (
select *
from node_birth_death_vw
union all
select * exclude(METADATA$ROW_ID ,METADATA$ACTION ,METADATA$ISUPDATE)
from device_records_stream
)
select
-- group_id ,message_type ,edge_node_id ,device_id
-- ,message ,message_sequence ,inserted_at
* exclude(nbirth_or_ndeath_raw ,nbirth_bdSeq_raw ,ndeath_bdSeq_raw ,nbirth_ndeath_inserted_at_raw)
,nvl(nbirth_or_ndeath_raw
,lag(nbirth_or_ndeath_raw) ignore nulls over (order by inserted_at ,message_sequence)
) as nbirth_or_ndeath
,nvl(nbirth_bdSeq_raw
,lag(nbirth_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence)
) as nbirth_bdSeq
,nvl(ndeath_bdSeq_raw
,lag(ndeath_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence)
) as ndeath_bdSeq
,nvl(nbirth_ndeath_inserted_at_raw
,lag(nbirth_ndeath_inserted_at_raw) ignore nulls over (order by inserted_at ,message_sequence)
) as nbirth_ndeath_inserted_at
,case true
when (nbirth_or_ndeath = 'NBIRTH') then false
when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq != ndeath_bdSeq) ) then false
when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq = ndeath_bdSeq) ) then true
else true
end as is_last_known_good_reading
,case lower(message_type)
when lower('NBIRTH') then 1
when lower('DBIRTH') then 2
when lower('DDATA') then 3
when lower('DDEATH') then 4
when lower('NDEATH') then 5
else 99
end as message_type_order
,(nbirth_or_ndeath = 'NBIRTH') as is_node_alive
from device_node_unioned
;
create or replace view sparkplug_messages_flattened_vw
as
with base as (
select
-- sparkplugb message level
msg_id ,group_id, edge_node_id ,device_id ,message_type
,message_sequence ,inserted_at
,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
,nbirth_ndeath_inserted_at ,is_last_known_good_reading
,message_type_order ,is_node_alive
,message_timestamp as root_message_timestamp
-- attributes related to device data (ddata / dbirth)
,f.value:name::varchar as device_name
,f.value:value:reference::varchar as template_reference
,f.value:value:version::varchar as template_version
,f.value:timestamp::number as device_metric_timestamp
,f.value as ddata_msg
-- attributes related to device level metrics
,concat(msg_id ,'^' ,f.index ,'::',d.index) as device_measure_uuid
,d.value:name::varchar as measure_name
,d.value:value as measure_value
,d.value:timestamp::number as measure_timestamp
from sparkplug_msgs_nodebirth_contextualized_vw as b
,lateral flatten(input => b.message:metrics) as f
,lateral flatten(input => f.value:value:metrics) as d
where message_type in ('DBIRTH' ,'DDATA')
and template_reference is not null
)
select
group_id, edge_node_id ,device_id ,message_type
,message_sequence ,inserted_at
,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
,nbirth_ndeath_inserted_at ,is_last_known_good_reading
,message_type_order ,is_node_alive ,root_message_timestamp
,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
,null as is_historical
,device_measure_uuid
,object_agg(distinct measure_name ,measure_value) as measures_info
,measure_timestamp
,to_timestamp(measure_timestamp/1000) as measure_ts
,to_date(measure_ts) as measure_date
,hour(measure_ts) as measure_hour
from base
group by group_id, edge_node_id ,device_id ,message_type
,message_sequence ,inserted_at
,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
,nbirth_ndeath_inserted_at ,is_last_known_good_reading
,message_type_order ,is_node_alive ,root_message_timestamp
,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
,is_historical ,device_measure_uuid
,measure_timestamp
;
create or replace transient table sparkplug_device_messages (
group_id varchar
,edge_node_id varchar
,device_id varchar
,message_type varchar
,message_sequence number
,inserted_at number
,nbirth_or_ndeath varchar
,nbirth_bdseq number
,ndeath_bdseq number
,nbirth_ndeath_inserted_at number
,is_last_known_good_reading boolean
,message_type_order number
,is_node_alive boolean
,root_message_timestamp number
,device_name varchar
,template_reference varchar
,template_version varchar
,device_metric_timestamp number
,ddata_msg variant
,is_historical boolean
,device_measure_uuid varchar
,measures_info variant
,measure_timestamp number
,measure_ts timestamp
,measure_date date
,measure_hour number
)
cluster by ( group_id ,edge_node_id ,device_id
,template_reference ,template_version ,device_name
,measure_date ,measure_hour)
comment = 'materialized version of the sparkplug_messages_flattened_vw for easier downstream pipelines.'
;
-- -- >>>>>>>>>>>>>>>>>>>>>>
-- ================
-- NODE BIRTH related assets
-- ================
create or replace stream nbirth_stream
on view nbirth_vw
show_initial_rows = true
comment = 'stream to monitor for nbirth messages, so that assets are created automatically'
; |
...
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
set cl_bridge_node_db = 'cl_bridge_node_db';
set staging_schema = 'stage_db';
set reader_role_warehouse = 'compute_wh';
use role sysadmin;
use database identifier($cl_bridge_node_db);
use schema identifier($staging_schema);
create or replace dynamic table D_ACTIVE_NBIRTH
lag = '1 day'
warehouse = compute_wh
as
-- Tabularize the active nbirth events for easier error handling during
-- investigations
with base as (
select
group_id ,edge_node_id ,device_id
,nvl(nbirth_bdseq_raw ,ndeath_bdseq_raw ) as bdseq
,message_type
,to_timestamp(message_timestamp/1000) as MEASURE_TS
from NODE_BIRTH_DEATH_VW
), nbirth_ndeath_matched as (
select distinct nb.bdseq
from base as nb
join base as nd
on nb.bdseq = nd.bdseq
where nb.message_type = 'NBIRTH'
and nd.message_type = 'NDEATH'
)
select b.* exclude(measure_ts ,message_type)
,min(measure_ts) as nbirth
from base as b
where b.bdseq not in (select bdseq from nbirth_ndeath_matched)
group by all
;
create or replace dynamic table D_DEVICE_HEARTBEATS
lag = '1 day'
warehouse = compute_wh
as
-- tabularize the dbirth/data messages for each devices. Meant to be used
-- for active investigation on message receipt from devices and error tracking
with base as (
select group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type ,MEASURE_TS
,row_number()
over ( partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type
order by MEASURE_TS desc
) as row_num
from SPARKPLUG_DEVICE_MESSAGES
), rows_filtered as (
select *
from base
where row_num <= 2
), object_constructed as (
select group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type
,first_value(MEASURE_TS)
ignore nulls
over ( partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type
order by MEASURE_TS desc
--ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
)
as latest_message_received_at
,NTH_VALUE( MEASURE_TS , 2 )
FROM FIRST
IGNORE NULLS
over ( partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type
order by MEASURE_TS desc
--ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
) as prev_latest_message_received_at
,timestampdiff('second' ,prev_latest_message_received_at ,latest_message_received_at) as interval_time
,last_value(MEASURE_TS)
ignore nulls
over ( partition by group_id ,edge_node_id ,device_id ,message_type
order by MEASURE_TS desc
--ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING
)
as oldest_message_received_at
,object_construct(
'latest_message_received_at' ,latest_message_received_at
,'prev_latest_message_received_at' ,prev_latest_message_received_at
,'interval_seconds',interval_time
,'oldest_message_received_at',oldest_message_received_at
) as obj
from rows_filtered
), unioned as (
select group_id ,edge_node_id ,device_id ,DEVICE_NAME
,null as dbirth
,obj as ddata
from object_constructed
where message_type = 'DDATA'
union
select group_id ,edge_node_id ,device_id ,DEVICE_NAME
,obj as dbirth
,null as ddata
from object_constructed
where message_type = 'DBIRTH'
)
select group_id ,edge_node_id ,device_id ,DEVICE_NAME
,first_value(ddata)
ignore nulls
over (partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME
order by ddata)
as ddata
,first_value(dbirth)
ignore nulls
over (partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME
order by dbirth)
as dbirth
from unioned
; |
Setup roles specifically requiring help of privileged roles like SYSADMIN. These are:
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
-- =========================
-- In this script, we are setting up roles specifically requiring help of
-- privlilleged roles like SYSADMIN. These are:
-- - Create a custom role
-- - Assign the custom role to create task and execute task
-- - Create warehouse specifically used for ingestion
-- - Grants
-- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
-- =========================
set processor_role = 'cl_bridge_process_rl';
set cl_bridge_ingestion_warehouse = 'cl_bridge_ingest_wh';
set staging_db = 'cl_bridge_stage_db';
set staging_db_schema = 'cl_bridge_stage_db.stage_db';
set node_db = 'cl_bridge_node_db';
set node_db_staging_schema = 'cl_bridge_node_db.stage_db';
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> ROLE CREATION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
use role securityadmin;
create role if not exists identifier($processor_role)
comment = 'role used by cirruslink bridge to ingest and process mqtt/sparkplug data';
grant role identifier($processor_role)
to role sysadmin;
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> WAREHOUSE >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
use role sysadmin;
create warehouse if not exists identifier($cl_bridge_ingestion_warehouse) with
warehouse_type = standard
warehouse_size = xsmall
--max_cluster_count = 5
initially_suspended = true
comment = 'used by cirruslink bridge to ingest'
;
grant usage on warehouse identifier($cl_bridge_ingestion_warehouse)
to role identifier($processor_role);
grant operate on warehouse identifier($cl_bridge_ingestion_warehouse)
to role identifier($processor_role);
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS STAGE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
grant usage on database identifier($staging_db)
to role identifier($processor_role);
grant usage on schema identifier($staging_db_schema)
to role identifier($processor_role);
grant select on all tables in schema identifier($staging_db_schema)
to role identifier($processor_role);
grant insert on all tables in schema identifier($staging_db_schema)
to role identifier($processor_role);
grant select on future tables in schema identifier($staging_db_schema)
to role identifier($processor_role);
grant insert on future tables in schema identifier($staging_db_schema)
to role identifier($processor_role);
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS NODE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
grant usage on database identifier($node_db)
to role identifier($processor_role);
grant usage on schema identifier($node_db_staging_schema)
to role identifier($processor_role);
grant select on all tables in schema identifier($node_db_staging_schema)
to role identifier($processor_role);
grant insert on all tables in schema identifier($node_db_staging_schema)
to role identifier($processor_role);
grant select on all views in schema identifier($node_db_staging_schema)
to role identifier($processor_role);
grant usage on all functions in schema identifier($node_db_staging_schema)
to role identifier($processor_role);
grant usage on all procedures in schema identifier($node_db_staging_schema)
to role identifier($processor_role);
-- need for:
-- - creating edge node specific tables and views
grant create schema on database identifier($node_db)
to role identifier($processor_role); |
Setup roles specifically requiring help of privileged roles like SYSADMIN, SECURITYADMIN, ACCOUNTADMIN. These are:
| Code Block | ||||||
|---|---|---|---|---|---|---|
| ||||||
-- =========================
-- In this script, we are setting up roles specifically requiring help of
-- privlilleged roles like SYSADMIN, SECURITYADMIN, ACCOUNTADMIN. These are:
-- - Create a custom role
-- - Assign the custom role to create task and execute task
-- - Grants
-- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
-- =========================
set reader_role = 'cl_bridge_reader_rl';
set reader_role_warehouse = 'compute_wh';
set staging_db = 'cl_bridge_stage_db';
set staging_db_schema = 'cl_bridge_stage_db.stage_db';
set node_db = 'cl_bridge_node_db';
set node_db_staging_schema = 'cl_bridge_node_db.stage_db';
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> ROLE CREATION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
use role securityadmin;
create role if not exists identifier($reader_role)
comment = 'role used by user/process to query and operate on the views and tables managed by cirruslink bridge';
grant role identifier($reader_role)
to role sysadmin;
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS WAREHOUSE >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
use role accountadmin;
grant usage on warehouse identifier($reader_role_warehouse)
to role sysadmin with grant option;
grant modify on warehouse identifier($reader_role_warehouse)
to role sysadmin with grant option;
grant operate on warehouse identifier($reader_role_warehouse)
to role sysadmin with grant option;
use role sysadmin;
grant usage on warehouse identifier($reader_role_warehouse)
to role identifier($reader_role);
grant modify on warehouse identifier($reader_role_warehouse)
to role identifier($reader_role);
grant operate on warehouse identifier($reader_role_warehouse)
to role identifier($reader_role);
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS TASK EXECUTION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
-- Creating and executing tasks require exculated privileges that can be done
-- only by the accountadmin. Hence we have to switch roles
-- use role accountadmin;
-- grant execute managed task on account to role identifier($reader_role);
-- grant execute task on account to role identifier($reader_role);
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS STAGE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
grant usage on database identifier($staging_db)
to role identifier($reader_role);
grant usage on schema identifier($staging_db_schema)
to role identifier($reader_role);
grant select on all tables in schema identifier($staging_db_schema)
to role identifier($reader_role);
grant select on future tables in schema identifier($staging_db_schema)
to role identifier($reader_role);
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS NODE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
grant usage on database identifier($node_db)
to role identifier($reader_role);
grant usage on schema identifier($node_db_staging_schema)
to role identifier($reader_role);
grant select on all tables in schema identifier($node_db_staging_schema)
to role identifier($reader_role);
grant select on all views in schema identifier($node_db_staging_schema)
to role identifier($reader_role);
grant usage on all functions in schema identifier($node_db_staging_schema)
to role identifier($reader_role);
grant usage on all procedures in schema identifier($node_db_staging_schema)
to role identifier($reader_role);
-- need for:
-- - new generated specific tables and views
use role securityadmin;
grant usage on future schemas in database identifier($node_db)
to role identifier($reader_role); |
...