Set up assets related to the staging database and associated assets. These are:

  • Database


  • Staging schema
Code Block
titleSQL Script 01
-- =========================
-- In this script, we are setting up assets related to the staging database
-- and associated assets. These are:
--  - Database
--  - Staging schema
-- The database & schema will be owned by SYSADMIN
-- =========================
set cl_bridge_staging_db = 'CL_BRIDGE_STAGE_DB';
set staging_schema = 'stage_db';
-- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>>
use role sysadmin;
create database if not exists identifier($cl_bridge_staging_db)
   comment = 'used for storing messages received from CirrusLink Bridge'
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>>
use database identifier($cl_bridge_staging_db);
create schema if not exists identifier($staging_schema)
  with managed access
  -- data_retention_time_in_days = 90
  -- max_data_extension_time_in_days = 90
  comment = 'Used for staging data direct from CirrusLink Bridge';
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>>
use schema identifier($staging_schema);
-- =========================
-- Define tables
-- =========================
create or replace table sparkplug_raw (
    msg_id varchar
    ,msg_topic varchar
    ,namespace varchar
    ,group_id varchar
    ,message_type varchar
    ,edge_node_id varchar
    ,device_id varchar
    ,msg variant
    ,inserted_at number
change_tracking = true
cluster by (message_type ,group_id ,edge_node_id ,device_id)
comment = 'Used for storing json messages from sparkplug bridge/gateway'


Set up assets related to the node database which would eventually contain all the device specific views and tables. The following assets are created:

  • Node Database


  • Staging schema
Code Block
titleSQL Script 02
-- =========================
-- In this script, we are setting up assets related to the node database
-- ,which would eventually contain all the device specific views and tables.
-- At the very core, the following assets are created:
--  - Node Database
--  - Staging schema
-- The database & schema will be owned by SYSADMIN
-- =========================
set staged_sparkplug_raw_table = 'cl_bridge_stage_db.stage_db.sparkplug_raw';
set cl_bridge_node_db = 'cl_bridge_node_db';
set staging_schema = 'stage_db';
-- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>>
use role sysadmin;
create database if not exists identifier($cl_bridge_node_db)
   comment = 'used for storing flattened messages processed from the staging database'
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>>
use database identifier($cl_bridge_node_db);
create schema if not exists identifier($staging_schema)
  with managed access
  -- data_retention_time_in_days = 90
  -- max_data_extension_time_in_days = 90
  comment = 'used for storing flattened messages processed from the staging database';
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>>
use schema identifier($staging_schema);
-- =========================
-- Define tables
-- =========================
-- NOTE THE 'cl_bridge_stage_db.stage_db.sparkplug_raw' is hardcoded here; as the identifier
-- staged_sparkplug_raw_table replacement does not work.
create or replace view sparkplug_messages_vw
    change_tracking = true
    comment = 'parses out the core attributes from the message and topic.'
        ,parse_json(msg) as message
        ,message:seq::int as message_sequence
        ,message:timestamp::number as message_timestamp
    from cl_bridge_stage_db.stage_db.sparkplug_raw
-- --  >>>>>>>>>>>>>>>>>>>>>>
create or replace view nbirth_vw
    change_tracking = true
    comment = 'filtered to nbirth messages. This is a mirror'
       group_id ,edge_node_id
    from sparkplug_messages_vw
    where message_type = 'NBIRTH'
create or replace view node_machine_registry_vw
    comment = 'Used to retreive the latest template definitions for a given group and edge_node'
    with base as (
            group_id ,edge_node_id
            ,max_by(message ,message_timestamp) as message
            ,max(message_timestamp) as latest_message_timestamp
        from sparkplug_messages_vw
        where message_type = 'NBIRTH'
        group by group_id ,edge_node_id
        group_id ,edge_node_id
        ,f.value as template_definition
        ,template_definition:name::varchar as machine
        ,template_definition:reference::varchar as reference
        ,template_definition:version::varchar as version
        ,template_definition:timestamp::int as timestamp
    from base as b
        ,lateral flatten (input => b.message:metrics) f
    where template_definition:dataType::varchar = 'Template'
-- --  >>>>>>>>>>>>>>>>>>>>>>
create or replace view node_birth_death_vw
    comment = 'shows the latest node birth & death messages for each device'
        b.* exclude(namespace)
        ,message_type as nbirth_or_ndeath_raw
        ,iff((message_type = 'NBIRTH') ,f.value:value ,null)::number as nbirth_bdSeq_raw
        ,iff((message_type = 'NDEATH') ,f.value:value ,null)::number as ndeath_bdSeq_raw
        ,inserted_at as nbirth_ndeath_inserted_at_raw
    from sparkplug_messages_vw as b
        ,lateral flatten (input => b.message:metrics) as f
    where message_type in ('NBIRTH' ,'NDEATH')
     and f.value:name::varchar = 'bdSeq'
create or replace view device_records_vw
    change_tracking = true
        b.* exclude(namespace)
        ,null as nbirth_or_ndeath_raw
        ,null as nbirth_bdSeq_raw
        ,null as ndeath_bdSeq_raw
        ,null as nbirth_ndeath_inserted_at_raw
    from sparkplug_messages_vw as b
    where message_type in ('DBIRTH' ,'DDATA')
create or replace stream device_records_stream
    on view device_records_vw
    show_initial_rows = true
    comment = 'used for monitoring latest device messages'
create or replace view sparkplug_msgs_nodebirth_contextualized_vw
    with device_node_unioned as (
        select *
        from node_birth_death_vw
        union all
        from device_records_stream
        -- group_id ,message_type ,edge_node_id ,device_id
        -- ,message ,message_sequence ,inserted_at
        * exclude(nbirth_or_ndeath_raw ,nbirth_bdSeq_raw ,ndeath_bdSeq_raw ,nbirth_ndeath_inserted_at_raw)
                ,lag(nbirth_or_ndeath_raw) ignore nulls over (order by inserted_at ,message_sequence)
            ) as nbirth_or_ndeath
                ,lag(nbirth_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence)
            ) as nbirth_bdSeq
                ,lag(ndeath_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence)
            ) as ndeath_bdSeq
                ,lag(nbirth_ndeath_inserted_at_raw) ignore nulls over (order by inserted_at ,message_sequence)
            ) as nbirth_ndeath_inserted_at
        ,case true
            when (nbirth_or_ndeath = 'NBIRTH') then false
            when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq != ndeath_bdSeq) ) then false
            when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq = ndeath_bdSeq) ) then true
            else true
        end as is_last_known_good_reading
        ,case lower(message_type)
                when lower('NBIRTH') then 1
                when lower('DBIRTH') then 2
                when lower('DDATA') then 3
                when lower('DDEATH') then 4
                when lower('NDEATH') then 5
                else 99
            end as message_type_order
        ,(nbirth_or_ndeath = 'NBIRTH') as is_node_alive
    from device_node_unioned
create or replace view sparkplug_messages_flattened_vw
    with base as (
            -- sparkplugb message level
            msg_id ,group_id, edge_node_id ,device_id ,message_type
            ,message_sequence ,inserted_at
            ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
            ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
            ,message_type_order ,is_node_alive 
            ,message_timestamp as root_message_timestamp
            -- attributes related to device data (ddata / dbirth)
            ,f.value:name::varchar as device_name
            ,f.value:value:reference::varchar as template_reference
            ,f.value:value:version::varchar as template_version
            ,f.value:timestamp::number as device_metric_timestamp
            ,f.value as ddata_msg
            -- attributes related to device level metrics
            ,concat(msg_id ,'^' ,f.index ,'::',d.index) as device_measure_uuid
            ,d.value:name::varchar as measure_name
            ,d.value:value as measure_value
            ,d.value:timestamp::number as measure_timestamp
        from sparkplug_msgs_nodebirth_contextualized_vw as b
            ,lateral flatten(input => b.message:metrics) as f
            ,lateral flatten(input => f.value:value:metrics) as d
        where message_type in ('DBIRTH' ,'DDATA')
            and template_reference is not null
        group_id, edge_node_id ,device_id ,message_type
        ,message_sequence ,inserted_at
        ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
        ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
        ,message_type_order ,is_node_alive ,root_message_timestamp
        ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
        ,null as is_historical
        ,object_agg(distinct measure_name ,measure_value) as measures_info
        ,to_timestamp(measure_timestamp/1000) as measure_ts
        ,to_date(measure_ts) as measure_date
        ,hour(measure_ts) as measure_hour
    from base
    group by group_id, edge_node_id ,device_id ,message_type
            ,message_sequence ,inserted_at
            ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
            ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
            ,message_type_order ,is_node_alive ,root_message_timestamp
            ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
            ,is_historical ,device_measure_uuid
create or replace transient table sparkplug_device_messages (
        group_id varchar
        ,edge_node_id varchar
        ,device_id varchar
        ,message_type varchar
        ,message_sequence number
        ,inserted_at number
        ,nbirth_or_ndeath varchar
        ,nbirth_bdseq number
        ,ndeath_bdseq number
        ,nbirth_ndeath_inserted_at number
        ,is_last_known_good_reading boolean
        ,message_type_order number
        ,is_node_alive boolean
        ,root_message_timestamp number
        ,device_name varchar
        ,template_reference varchar
        ,template_version varchar
        ,device_metric_timestamp number
        ,ddata_msg variant
        ,is_historical boolean
        ,device_measure_uuid varchar
        ,measures_info variant
        ,measure_timestamp number
        ,measure_ts timestamp
        ,measure_date date
        ,measure_hour number
    cluster by ( group_id ,edge_node_id ,device_id
        ,template_reference ,template_version ,device_name
        ,measure_date ,measure_hour)
    comment = 'materialized version of the sparkplug_messages_flattened_vw for easier downstream pipelines.'
-- --  >>>>>>>>>>>>>>>>>>>>>>
-- ================
--  NODE BIRTH related assets
-- ================
create or replace stream nbirth_stream
    on view nbirth_vw
    show_initial_rows = true
    comment = 'stream to monitor for nbirth messages, so that assets are created automatically'


Code Block
titleSQL Script 06
set cl_bridge_node_db = 'cl_bridge_node_db';
set staging_schema = 'stage_db';
set reader_role_warehouse = 'compute_wh';
use role sysadmin;
use database identifier($cl_bridge_node_db);
use schema identifier($staging_schema);
create or replace dynamic table D_ACTIVE_NBIRTH
    lag = '1 day'
    warehouse = compute_wh
    -- Tabularize the active nbirth events for easier error handling during
    -- investigations
    with base as (
            group_id ,edge_node_id ,device_id
                ,nvl(nbirth_bdseq_raw ,ndeath_bdseq_raw ) as bdseq
                ,to_timestamp(message_timestamp/1000) as MEASURE_TS
        from NODE_BIRTH_DEATH_VW
    ), nbirth_ndeath_matched as (
        select distinct nb.bdseq
        from base as nb
            join base as nd
                on nb.bdseq = nd.bdseq
        where nb.message_type = 'NBIRTH'
            and nd.message_type = 'NDEATH'
    select b.* exclude(measure_ts ,message_type)
        ,min(measure_ts) as nbirth
    from base as b
    where b.bdseq not in (select bdseq from nbirth_ndeath_matched)
    group by all
create or replace dynamic table D_DEVICE_HEARTBEATS
    lag = '1 day'
    warehouse = compute_wh
    -- tabularize the dbirth/data messages for each devices. Meant to be used
    -- for active investigation on message receipt from devices and error tracking
    with base as (
        select group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type ,MEASURE_TS
                over ( partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type
                        order by MEASURE_TS desc
                ) as row_num
    ), rows_filtered as (
        select *
        from base
        where row_num <= 2
    ), object_constructed as (
        select group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type
                ignore nulls
                over ( partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type
                        order by MEASURE_TS desc
                        --ROWS BETWEEN 1 PRECEDING  AND 1 FOLLOWING
                        as latest_message_received_at
            ,NTH_VALUE( MEASURE_TS , 2 )
                FROM FIRST
                IGNORE NULLS
                over ( partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type
                        order by MEASURE_TS desc
                        --ROWS BETWEEN 1 PRECEDING  AND 1 FOLLOWING
                        ) as prev_latest_message_received_at
            ,timestampdiff('second' ,prev_latest_message_received_at ,latest_message_received_at) as interval_time
                ignore nulls
                over ( partition by group_id ,edge_node_id ,device_id ,message_type
                        order by MEASURE_TS desc
                        --ROWS BETWEEN 1 PRECEDING  AND 1 FOLLOWING
                        as oldest_message_received_at
                'latest_message_received_at' ,latest_message_received_at
                ,'prev_latest_message_received_at' ,prev_latest_message_received_at
            ) as obj
        from rows_filtered
    ), unioned as (
        select group_id ,edge_node_id ,device_id ,DEVICE_NAME
            ,null as dbirth
            ,obj as ddata
        from object_constructed
        where message_type = 'DDATA'
        select group_id ,edge_node_id ,device_id ,DEVICE_NAME
            ,obj as dbirth
            ,null as ddata
        from object_constructed
        where message_type = 'DBIRTH'
    select group_id ,edge_node_id ,device_id ,DEVICE_NAME
                ignore nulls
                over (partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME
                    order by ddata)
            as ddata
                ignore nulls
                over (partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME
                    order by dbirth)
            as dbirth
    from unioned

SQL Script 07

Setup roles specifically requiring help of privileged roles like SYSADMIN. These are:

  • Create a custom role
  • Assign the custom role to create task and execute task
  • Create warehouse specifically used for ingestion
  • Grants
Code Block
titleSQL Script 07
-- =========================
-- In this script, we are setting up roles specifically requiring help of
-- privlilleged roles like SYSADMIN. These are:
--  - Create a custom role
--  - Assign the custom role to create task and execute task
--  - Create warehouse specifically used for ingestion
--  - Grants
-- =========================
set processor_role = 'cl_bridge_process_rl';
set cl_bridge_ingestion_warehouse = 'cl_bridge_ingest_wh';
set staging_db = 'cl_bridge_stage_db';
set staging_db_schema = 'cl_bridge_stage_db.stage_db';
set node_db = 'cl_bridge_node_db';
set node_db_staging_schema = 'cl_bridge_node_db.stage_db';
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> ROLE CREATION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    use role securityadmin;
    create role if not exists identifier($processor_role)
        comment = 'role used by cirruslink bridge to ingest and process mqtt/sparkplug data';
    grant role identifier($processor_role)
        to role sysadmin;
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> WAREHOUSE >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    use role sysadmin;
    create warehouse if not exists identifier($cl_bridge_ingestion_warehouse) with
        warehouse_type = standard
        warehouse_size = xsmall
        --max_cluster_count = 5
        initially_suspended = true
        comment = 'used by cirruslink bridge to ingest'
    grant usage on warehouse identifier($cl_bridge_ingestion_warehouse)
        to role identifier($processor_role);
    grant operate on warehouse identifier($cl_bridge_ingestion_warehouse)
        to role identifier($processor_role);
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS STAGE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    grant usage on database identifier($staging_db)
        to role identifier($processor_role);
    grant usage on schema identifier($staging_db_schema)
        to role identifier($processor_role);
    grant select on all tables in schema identifier($staging_db_schema)
        to role identifier($processor_role);
    grant insert on all tables in schema identifier($staging_db_schema)
        to role identifier($processor_role);
    grant select on future tables in schema identifier($staging_db_schema)
        to role identifier($processor_role);
    grant insert on future tables in schema identifier($staging_db_schema)
        to role identifier($processor_role);
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS NODE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    grant usage on database identifier($node_db)
        to role identifier($processor_role);
    grant usage on schema identifier($node_db_staging_schema)
        to role identifier($processor_role);
    grant select on all tables in schema identifier($node_db_staging_schema)
        to role identifier($processor_role);
    grant insert on all tables in schema identifier($node_db_staging_schema)
        to role identifier($processor_role);
    grant select on all views in schema identifier($node_db_staging_schema)
        to role identifier($processor_role);
    grant usage on all functions in schema identifier($node_db_staging_schema)
        to role identifier($processor_role); 
    grant usage on all procedures in schema identifier($node_db_staging_schema)
        to role identifier($processor_role);
    -- need for:
    --  - creating edge node specific tables and views
    grant create schema on database identifier($node_db)
        to role identifier($processor_role);

SQL Script 08

Setup roles specifically requiring help of privileged roles like SYSADMIN, SECURITYADMIN, ACCOUNTADMIN. These are:

  • Create a custom role
  • Assign the custom role to create task and execute task
  • Grants
Code Block
titleSQL Script 08
-- =========================
-- In this script, we are setting up roles specifically requiring help of
-- privlilleged roles like SYSADMIN, SECURITYADMIN, ACCOUNTADMIN. These are:
--  - Create a custom role
--  - Assign the custom role to create task and execute task
--  - Grants
-- =========================
set reader_role = 'cl_bridge_reader_rl';
set reader_role_warehouse = 'compute_wh';
set staging_db = 'cl_bridge_stage_db';
set staging_db_schema = 'cl_bridge_stage_db.stage_db';
set node_db = 'cl_bridge_node_db';
set node_db_staging_schema = 'cl_bridge_node_db.stage_db';
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> ROLE CREATION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    use role securityadmin;
    create role if not exists identifier($reader_role)
        comment = 'role used by user/process to query and operate on the views and tables managed by cirruslink bridge';
    grant role identifier($reader_role)
        to role sysadmin;
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS WAREHOUSE >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    use role accountadmin;
    grant usage on warehouse identifier($reader_role_warehouse)
        to role sysadmin with grant option;
    grant modify on warehouse identifier($reader_role_warehouse)
        to role sysadmin with grant option;
    grant operate on warehouse identifier($reader_role_warehouse)
        to role sysadmin with grant option;
    use role sysadmin;
    grant usage on warehouse identifier($reader_role_warehouse)
        to role identifier($reader_role);
    grant modify on warehouse identifier($reader_role_warehouse)
        to role identifier($reader_role);
    grant operate on warehouse identifier($reader_role_warehouse)
        to role identifier($reader_role);
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS TASK EXECUTION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    -- Creating and executing tasks require exculated privileges that can be done
    -- only by the accountadmin. Hence we have to switch roles
    -- use role accountadmin;
    -- grant execute managed task on account to role identifier($reader_role);
    -- grant execute task on account to role identifier($reader_role);
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS STAGE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    grant usage on database identifier($staging_db)
        to role identifier($reader_role);
    grant usage on schema identifier($staging_db_schema)
        to role identifier($reader_role);
    grant select on all tables in schema identifier($staging_db_schema)
        to role identifier($reader_role);
    grant select on future tables in schema identifier($staging_db_schema)
        to role identifier($reader_role);
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS NODE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    grant usage on database identifier($node_db)
        to role identifier($reader_role);
    grant usage on schema identifier($node_db_staging_schema)
        to role identifier($reader_role);
    grant select on all tables in schema identifier($node_db_staging_schema)
        to role identifier($reader_role);
    grant select on all views in schema identifier($node_db_staging_schema)
        to role identifier($reader_role);
    grant usage on all functions in schema identifier($node_db_staging_schema)
        to role identifier($reader_role); 
    grant usage on all procedures in schema identifier($node_db_staging_schema)
        to role identifier($reader_role);
    -- need for:
    --  - new generated specific tables and views
    use role securityadmin;
    grant usage on future schemas in database identifier($node_db)
        to role identifier($reader_role);
