Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Script 02

    Code Block
    languagesql
    titleSQL Script 02
    collapsetrue
    -- =========================
    -- In this script, we are setting up assets related to the node database
    -- ,which would eventually contain all the device specific views and tables.
    -- At the very core, the following assets are created:
    --  - Node Database
    --  - Staging schema
    
    -- The database & schema will be owned by SYSADMIN
    
    -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
    -- =========================
    
    set staged_sparkplug_raw_table = 'cl_bridge_stage_db.stage_db.sparkplug_raw';
    set cl_bridge_node_db = 'cl_bridge_node_db';
    set staging_schema = 'stage_db';
    
    -- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>>
    
    use role sysadmin;
    
    create database if not exists identifier($cl_bridge_node_db)
       -- DATA_RETENTION_TIME_IN_DAYS = 90
       -- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90
       comment = 'used for storing flattened messages processed from the staging database'
    ;
    
    -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>>
    
    use database identifier($cl_bridge_node_db);
    
    create schema if not exists identifier($staging_schema)
      with managed access
      -- data_retention_time_in_days = 90
      -- max_data_extension_time_in_days = 90
      comment = 'used for storing flattened messages processed from the staging database';
    
    -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>>
    
    use schema identifier($staging_schema);
    
    -- =========================
    -- Define tables
    -- =========================
    
    -- NOTE THE 'cl_bridge_stage_db.stage_db.sparkplug_raw' is hardcoded here; as the identifier
    -- staged_sparkplug_raw_table replacement does not work.
    
    create or replace view sparkplug_messages_vw
        change_tracking = true
        comment = 'parses out the core attributes from the message and topic.'
        as 
        select 
            msg_id
            ,namespace
            ,group_id
            ,message_type
            ,edge_node_id
            ,device_id
            ,parse_json(msg) as message
            ,message:seq::int as message_sequence
            ,message:timestamp::number as message_timestamp
            ,inserted_at
        from cl_bridge_stage_db.stage_db.sparkplug_raw
        ;
    
    -- --  >>>>>>>>>>>>>>>>>>>>>>
    
    create or replace view nbirth_vw 
        change_tracking = true
        comment = 'filtered to nbirth messages. This is a mirror'
        as 
        select
           group_id ,edge_node_id 
        from sparkplug_messages_vw
        where message_type = 'NBIRTH'
        
        ;
    
    create or replace view node_machine_registry_vw 
        comment = 'Used to retreive the latest template definitions for a given group and edge_node'
        as 
        with base as (
            select
                group_id ,edge_node_id 
                ,max_by(message ,message_timestamp) as message
                ,max(message_timestamp) as latest_message_timestamp
            from sparkplug_messages_vw
            where message_type = 'NBIRTH'
            group by group_id ,edge_node_id
        )
        select 
            group_id ,edge_node_id
            ,f.value as template_definition
            ,template_definition:name::varchar as machine
            ,template_definition:reference::varchar as reference
            ,template_definition:version::varchar as version
            ,template_definition:timestamp::int as timestamp
        from base as b
            ,lateral flatten (input => b.message:metrics) f
        where template_definition:dataType::varchar = 'Template'
        ;
    
    -- --  >>>>>>>>>>>>>>>>>>>>>>
    
    create or replace view node_birth_death_vw
        comment = 'shows the latest node birth & death messages for each device'
        as
        select 
            b.* exclude(namespace)
            ,message_type as nbirth_or_ndeath_raw
            ,iff((message_type = 'NBIRTH') ,f.value:value ,null)::number as nbirth_bdSeq_raw
            ,iff((message_type = 'NDEATH') ,f.value:value ,null)::number as ndeath_bdSeq_raw
            ,inserted_at as nbirth_ndeath_inserted_at_raw
        from sparkplug_messages_vw as b
            ,lateral flatten (input => b.message:metrics) as f
        where message_type in ('NBIRTH' ,'NDEATH')
         and f.value:name::varchar = 'bdSeq'
         ;
    
    create or replace view device_records_vw
        change_tracking = true
        as
        select 
            b.* exclude(namespace)
        	,null as nbirth_or_ndeath_raw
        	,null as nbirth_bdSeq_raw
            ,null as ndeath_bdSeq_raw
            ,null as nbirth_ndeath_inserted_at_raw
        from sparkplug_messages_vw as b
        where message_type in ('DBIRTH' ,'DDATA')
        ;
    
    create or replace stream device_records_stream
        on view device_records_vw
    
        show_initial_rows = true
        comment = 'used for monitoring latest device messages'
        ;
    
    create or replace view sparkplug_msgs_nodebirth_contextualized_vw
        as
        with device_node_unioned as (
            select *
            from node_birth_death_vw
            union all
            select * exclude(METADATA$ROW_ID ,METADATA$ACTION ,METADATA$ISUPDATE)
            from device_records_stream
        )
        select 
            -- group_id ,message_type ,edge_node_id ,device_id 
            -- ,message ,message_sequence ,inserted_at
            * exclude(nbirth_or_ndeath_raw ,nbirth_bdSeq_raw ,ndeath_bdSeq_raw ,nbirth_ndeath_inserted_at_raw)
            ,nvl(nbirth_or_ndeath_raw
                    ,lag(nbirth_or_ndeath_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as nbirth_or_ndeath
    
            ,nvl(nbirth_bdSeq_raw
                    ,lag(nbirth_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as nbirth_bdSeq
    
            ,nvl(ndeath_bdSeq_raw
                    ,lag(ndeath_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as ndeath_bdSeq
    
            ,nvl(nbirth_ndeath_inserted_at_raw
                    ,lag(nbirth_ndeath_inserted_at_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as nbirth_ndeath_inserted_at
    
            ,case true
                when (nbirth_or_ndeath = 'NBIRTH') then false
                when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq != ndeath_bdSeq) ) then false
                when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq = ndeath_bdSeq) ) then true
                else true
            end as is_last_known_good_reading
    
            ,case lower(message_type)
                    when lower('NBIRTH') then 1
                    when lower('DBIRTH') then 2
                    when lower('DDATA') then 3
                    when lower('DDEATH') then 4
                    when lower('NDEATH') then 5
                    else 99
                end as message_type_order
    
            ,(nbirth_or_ndeath = 'NBIRTH') as is_node_alive
    
        from device_node_unioned
        ;
    
    create or replace view sparkplug_messages_flattened_vw
        as
        with base as (
            select 
                -- sparkplugb message level
                msg_id ,group_id, edge_node_id ,device_id ,message_type 
                ,message_sequence ,inserted_at 
                ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
                ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
                ,message_type_order ,is_node_alive	
                ,message_timestamp as root_message_timestamp
    
                -- attributes related to device data (ddata / dbirth)
                ,f.value:name::varchar as device_name
                ,f.value:value:reference::varchar as template_reference
                ,f.value:value:version::varchar as template_version
                ,f.value:timestamp::number as device_metric_timestamp
                ,f.value as ddata_msg
    
                -- attributes related to device level metrics
                ,concat(msg_id ,'^' ,f.index ,'::',d.index) as device_measure_uuid
                ,d.value:name::varchar as measure_name
                ,d.value:value as measure_value
                ,d.value:timestamp::number as measure_timestamp
                
            from sparkplug_msgs_nodebirth_contextualized_vw as b
                ,lateral flatten(input => b.message:metrics) as f
                ,lateral flatten(input => f.value:value:metrics) as d
            where message_type in ('DBIRTH' ,'DDATA')
                and template_reference is not null
        )
        select
            group_id, edge_node_id ,device_id ,message_type 
            ,message_sequence ,inserted_at 
            ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
            ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
            ,message_type_order ,is_node_alive ,root_message_timestamp
                
            ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
            ,null as is_historical
    
            ,device_measure_uuid
            ,object_agg(distinct measure_name ,measure_value) as measures_info
            ,measure_timestamp
            
            ,to_timestamp(measure_timestamp/1000) as measure_ts 
            ,to_date(measure_ts) as measure_date
            ,hour(measure_ts) as measure_hour
        from base
        group by group_id, edge_node_id ,device_id ,message_type 
                ,message_sequence ,inserted_at 
                ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
                ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
                ,message_type_order ,is_node_alive ,root_message_timestamp
    
                ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
                ,is_historical ,device_measure_uuid
                ,measure_timestamp
        
        ;
    
    create or replace transient table sparkplug_device_messages (
            group_id varchar 
            ,edge_node_id varchar
            ,device_id varchar
            ,message_type varchar
            ,message_sequence number
            
            ,inserted_at number
            ,nbirth_or_ndeath varchar
            ,nbirth_bdseq number
            ,ndeath_bdseq number
            ,nbirth_ndeath_inserted_at number
            ,is_last_known_good_reading boolean
            ,message_type_order number
            ,is_node_alive boolean
    
        	,root_message_timestamp number
            ,device_name varchar
            ,template_reference varchar
            ,template_version varchar
            ,device_metric_timestamp number
            ,ddata_msg variant
            ,is_historical boolean
    
            ,device_measure_uuid varchar
            ,measures_info variant
            ,measure_timestamp number
    
            ,measure_ts timestamp
            ,measure_date date
            ,measure_hour number
        )
        cluster by ( group_id ,edge_node_id ,device_id 
            ,template_reference ,template_version ,device_name 
            ,measure_date ,measure_hour)
        comment = 'materialized version of the sparkplug_messages_flattened_vw for easier downstream pipelines.'
        ;
        
    
    -- --  >>>>>>>>>>>>>>>>>>>>>>
    -- ================
    --  NODE BIRTH related assets
    -- ================
    
    create or replace stream nbirth_stream
        on view nbirth_vw
    
        show_initial_rows = true
        comment = 'stream to monitor for nbirth messages, so that assets are created automatically'
        ;


    • Expected Result: Stream NBIRTH_STREAM successfully created.
  • Script 03
    SQL Script 03  Expand source

    • Expected Result: Function GENERATE_DEVICE_ASOF_VIEW_DDL successfully created.
  • Script 04
    SQL Script 04  Expand source

    • Expected Result: Function CREATE_EDGE_NODE_SCHEMA successfully created.
  • Script 05
    SQL Script 05  Expand source

    • Expected Result: Function CREATE_ALL_EDGE_NODE_SCHEMAS successfully created.
  • Script 10
    SQL Script 10  Expand source

    • Expected Result: Statement executed successfully.
  • Script 11
    SQL Script 11  Expand source

    • Expected Result: Statement executed successfully.

...