Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Now, repeat the process for each of the following scripts in order. Each time, fully replace the contents of the SQL script with the new script and click the 'Run All' button after pasting each script. Make sure no errors are displayed in the Results window after running each script.

  • Script 02SQL Script 02  Expand source

    Code Block
    languagesql
    titleSQL Script 02
    -- =========================
    -- In this script, we are setting up assets related to the node database
    -- ,which would eventually contain all the device specific views and tables.
    -- At the very core, the following assets are created:
    --  - Node Database
    --  - Staging schema
    
    -- The database & schema will be owned by SYSADMIN
    
    -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
    -- =========================
    
    set staged_sparkplug_raw_table = 'cl_bridge_stage_db.stage_db.sparkplug_raw';
    set cl_bridge_node_db = 'cl_bridge_node_db';
    set staging_schema = 'stage_db';
    
    -- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>>
    
    use role sysadmin;
    
    create database if not exists identifier($cl_bridge_node_db)
       -- DATA_RETENTION_TIME_IN_DAYS = 90
       -- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90
       comment = 'used for storing flattened messages processed from the staging database'
    ;
    
    -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>>
    
    use database identifier($cl_bridge_node_db);
    
    create schema if not exists identifier($staging_schema)
      with managed access
      -- data_retention_time_in_days = 90
      -- max_data_extension_time_in_days = 90
      comment = 'used for storing flattened messages processed from the staging database';
    
    -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>>
    
    use schema identifier($staging_schema);
    
    -- =========================
    -- Define tables
    -- =========================
    
    -- NOTE THE 'cl_bridge_stage_db.stage_db.sparkplug_raw' is hardcoded here; as the identifier
    -- staged_sparkplug_raw_table replacement does not work.
    
    create or replace view sparkplug_messages_vw
        change_tracking = true
        comment = 'parses out the core attributes from the message and topic.'
        as 
        select 
            msg_id
            ,namespace
            ,group_id
            ,message_type
            ,edge_node_id
            ,device_id
            ,parse_json(msg) as message
            ,message:seq::int as message_sequence
            ,message:timestamp::number as message_timestamp
            ,inserted_at
        from cl_bridge_stage_db.stage_db.sparkplug_raw
        ;
    
    -- --  >>>>>>>>>>>>>>>>>>>>>>
    
    create or replace view nbirth_vw 
        change_tracking = true
        comment = 'filtered to nbirth messages. This is a mirror'
        as 
        select
           group_id ,edge_node_id 
        from sparkplug_messages_vw
        where message_type = 'NBIRTH'
        
        ;
    
    create or replace view node_machine_registry_vw 
        comment = 'Used to retreive the latest template definitions for a given group and edge_node'
        as 
        with base as (
            select
                group_id ,edge_node_id 
                ,max_by(message ,message_timestamp) as message
                ,max(message_timestamp) as latest_message_timestamp
            from sparkplug_messages_vw
            where message_type = 'NBIRTH'
            group by group_id ,edge_node_id
        )
        select 
            group_id ,edge_node_id
            ,f.value as template_definition
            ,template_definition:name::varchar as machine
            ,template_definition:reference::varchar as reference
            ,template_definition:version::varchar as version
            ,template_definition:timestamp::int as timestamp
        from base as b
            ,lateral flatten (input => b.message:metrics) f
        where template_definition:dataType::varchar = 'Template'
        ;
    
    -- --  >>>>>>>>>>>>>>>>>>>>>>
    
    create or replace view node_birth_death_vw
        comment = 'shows the latest node birth & death messages for each device'
        as
        select 
            b.* exclude(namespace)
            ,message_type as nbirth_or_ndeath_raw
            ,iff((message_type = 'NBIRTH') ,f.value:value ,null)::number as nbirth_bdSeq_raw
            ,iff((message_type = 'NDEATH') ,f.value:value ,null)::number as ndeath_bdSeq_raw
            ,inserted_at as nbirth_ndeath_inserted_at_raw
        from sparkplug_messages_vw as b
            ,lateral flatten (input => b.message:metrics) as f
        where message_type in ('NBIRTH' ,'NDEATH')
         and f.value:name::varchar = 'bdSeq'
         ;
    
    create or replace view device_records_vw
        change_tracking = true
        as
        select 
            b.* exclude(namespace)
        	,null as nbirth_or_ndeath_raw
        	,null as nbirth_bdSeq_raw
            ,null as ndeath_bdSeq_raw
            ,null as nbirth_ndeath_inserted_at_raw
        from sparkplug_messages_vw as b
        where message_type in ('DBIRTH' ,'DDATA')
        ;
    
    create or replace stream device_records_stream
        on view device_records_vw
    
        show_initial_rows = true
        comment = 'used for monitoring latest device messages'
        ;
    
    create or replace view sparkplug_msgs_nodebirth_contextualized_vw
        as
        with device_node_unioned as (
            select *
            from node_birth_death_vw
            union all
            select * exclude(METADATA$ROW_ID ,METADATA$ACTION ,METADATA$ISUPDATE)
            from device_records_stream
        )
        select 
            -- group_id ,message_type ,edge_node_id ,device_id 
            -- ,message ,message_sequence ,inserted_at
            * exclude(nbirth_or_ndeath_raw ,nbirth_bdSeq_raw ,ndeath_bdSeq_raw ,nbirth_ndeath_inserted_at_raw)
            ,nvl(nbirth_or_ndeath_raw
                    ,lag(nbirth_or_ndeath_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as nbirth_or_ndeath
    
            ,nvl(nbirth_bdSeq_raw
                    ,lag(nbirth_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as nbirth_bdSeq
    
            ,nvl(ndeath_bdSeq_raw
                    ,lag(ndeath_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as ndeath_bdSeq
    
            ,nvl(nbirth_ndeath_inserted_at_raw
                    ,lag(nbirth_ndeath_inserted_at_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as nbirth_ndeath_inserted_at
    
            ,case true
                when (nbirth_or_ndeath = 'NBIRTH') then false
                when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq != ndeath_bdSeq) ) then false
                when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq = ndeath_bdSeq) ) then true
                else true
            end as is_last_known_good_reading
    
            ,case lower(message_type)
                    when lower('NBIRTH') then 1
                    when lower('DBIRTH') then 2
                    when lower('DDATA') then 3
                    when lower('DDEATH') then 4
                    when lower('NDEATH') then 5
                    else 99
                end as message_type_order
    
            ,(nbirth_or_ndeath = 'NBIRTH') as is_node_alive
    
        from device_node_unioned
        ;
    
    create or replace view sparkplug_messages_flattened_vw
        as
        with base as (
            select 
                -- sparkplugb message level
                msg_id ,group_id, edge_node_id ,device_id ,message_type 
                ,message_sequence ,inserted_at 
                ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
                ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
                ,message_type_order ,is_node_alive	
                ,message_timestamp as root_message_timestamp
    
                -- attributes related to device data (ddata / dbirth)
                ,f.value:name::varchar as device_name
                ,f.value:value:reference::varchar as template_reference
                ,f.value:value:version::varchar as template_version
                ,f.value:timestamp::number as device_metric_timestamp
                ,f.value as ddata_msg
    
                -- attributes related to device level metrics
                ,concat(msg_id ,'^' ,f.index ,'::',d.index) as device_measure_uuid
                ,d.value:name::varchar as measure_name
                ,d.value:value as measure_value
                ,d.value:timestamp::number as measure_timestamp
                
            from sparkplug_msgs_nodebirth_contextualized_vw as b
                ,lateral flatten(input => b.message:metrics) as f
                ,lateral flatten(input => f.value:value:metrics) as d
            where message_type in ('DBIRTH' ,'DDATA')
                and template_reference is not null
        )
        select
            group_id, edge_node_id ,device_id ,message_type 
            ,message_sequence ,inserted_at 
            ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
            ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
            ,message_type_order ,is_node_alive ,root_message_timestamp
                
            ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
            ,null as is_historical
    
            ,device_measure_uuid
            ,object_agg(distinct measure_name ,measure_value) as measures_info
            ,measure_timestamp
            
            ,to_timestamp(measure_timestamp/1000) as measure_ts 
            ,to_date(measure_ts) as measure_date
            ,hour(measure_ts) as measure_hour
        from base
        group by group_id, edge_node_id ,device_id ,message_type 
                ,message_sequence ,inserted_at 
                ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
                ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
                ,message_type_order ,is_node_alive ,root_message_timestamp
    
                ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
                ,is_historical ,device_measure_uuid
                ,measure_timestamp
        
        ;
    
    create or replace transient table sparkplug_device_messages (
            group_id varchar 
            ,edge_node_id varchar
            ,device_id varchar
            ,message_type varchar
            ,message_sequence number
            
            ,inserted_at number
            ,nbirth_or_ndeath varchar
            ,nbirth_bdseq number
            ,ndeath_bdseq number
            ,nbirth_ndeath_inserted_at number
            ,is_last_known_good_reading boolean
            ,message_type_order number
            ,is_node_alive boolean
    
        	,root_message_timestamp number
            ,device_name varchar
            ,template_reference varchar
            ,template_version varchar
            ,device_metric_timestamp number
            ,ddata_msg variant
            ,is_historical boolean
    
            ,device_measure_uuid varchar
            ,measures_info variant
            ,measure_timestamp number
    
            ,measure_ts timestamp
            ,measure_date date
            ,measure_hour number
        )
        cluster by ( group_id ,edge_node_id ,device_id 
            ,template_reference ,template_version ,device_name 
            ,measure_date ,measure_hour)
        comment = 'materialized version of the sparkplug_messages_flattened_vw for easier downstream pipelines.'
        ;
        
    
    -- --  >>>>>>>>>>>>>>>>>>>>>>
    -- ================
    --  NODE BIRTH related assets
    -- ================
    
    create or replace stream nbirth_stream
        on view nbirth_vw
    
        show_initial_rows = true
        comment = 'stream to monitor for nbirth messages, so that assets are created automatically'
        ;


    • Expected Result: Stream NBIRTH_STREAM successfully created.
  • Script 03
    SQL Script 03  Expand source

    • Expected Result: Function GENERATE_DEVICE_ASOF_VIEW_DDL successfully created.
  • Script 04
    SQL Script 04  Expand source

    • Expected Result: Function CREATE_EDGE_NODE_SCHEMA successfully created.
  • Script 05
    SQL Script 05  Expand source

    • Expected Result: Function CREATE_ALL_EDGE_NODE_SCHEMAS successfully created.
  • Script 10
    SQL Script 10  Expand source

    • Expected Result: Statement executed successfully.
  • Script 11
    SQL Script 11  Expand source

    • Expected Result: Statement executed successfully.

...