Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Script 02

    Code Block
    languagesql
    titleSQL Script 02
    collapsetrue
    -- =========================
    -- In this script, we are setting up assets related to the node database
    -- ,which would eventually contain all the device specific views and tables.
    -- At the very core, the following assets are created:
    --  - Node Database
    --  - Staging schema
    
    -- The database & schema will be owned by SYSADMIN
    
    -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
    -- =========================
    
    set staged_sparkplug_raw_table = 'cl_bridge_stage_db.stage_db.sparkplug_raw';
    set cl_bridge_node_db = 'cl_bridge_node_db';
    set staging_schema = 'stage_db';
    
    -- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>>
    
    use role sysadmin;
    
    create database if not exists identifier($cl_bridge_node_db)
       -- DATA_RETENTION_TIME_IN_DAYS = 90
       -- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90
       comment = 'used for storing flattened messages processed from the staging database'
    ;
    
    -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>>
    
    use database identifier($cl_bridge_node_db);
    
    create schema if not exists identifier($staging_schema)
      with managed access
      -- data_retention_time_in_days = 90
      -- max_data_extension_time_in_days = 90
      comment = 'used for storing flattened messages processed from the staging database';
    
    -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>>
    
    use schema identifier($staging_schema);
    
    -- =========================
    -- Define tables
    -- =========================
    
    -- NOTE THE 'cl_bridge_stage_db.stage_db.sparkplug_raw' is hardcoded here; as the identifier
    -- staged_sparkplug_raw_table replacement does not work.
    
    create or replace view sparkplug_messages_vw
        change_tracking = true
        comment = 'parses out the core attributes from the message and topic.'
        as 
        select 
            msg_id
            ,namespace
            ,group_id
            ,message_type
            ,edge_node_id
            ,device_id
            ,parse_json(msg) as message
            ,message:seq::int as message_sequence
            ,message:timestamp::number as message_timestamp
            ,inserted_at
        from cl_bridge_stage_db.stage_db.sparkplug_raw
        ;
    
    -- --  >>>>>>>>>>>>>>>>>>>>>>
    
    create or replace view nbirth_vw 
        change_tracking = true
        comment = 'filtered to nbirth messages. This is a mirror'
        as 
        select
           group_id ,edge_node_id 
        from sparkplug_messages_vw
        where message_type = 'NBIRTH'
        
        ;
    
    create or replace view node_machine_registry_vw 
        comment = 'Used to retreive the latest template definitions for a given group and edge_node'
        as 
        with base as (
            select
                group_id ,edge_node_id 
                ,max_by(message ,message_timestamp) as message
                ,max(message_timestamp) as latest_message_timestamp
            from sparkplug_messages_vw
            where message_type = 'NBIRTH'
            group by group_id ,edge_node_id
        )
        select 
            group_id ,edge_node_id
            ,f.value as template_definition
            ,template_definition:name::varchar as machine
            ,template_definition:reference::varchar as reference
            ,template_definition:version::varchar as version
            ,template_definition:timestamp::int as timestamp
        from base as b
            ,lateral flatten (input => b.message:metrics) f
        where template_definition:dataType::varchar = 'Template'
        ;
    
    -- --  >>>>>>>>>>>>>>>>>>>>>>
    
    create or replace view node_birth_death_vw
        comment = 'shows the latest node birth & death messages for each device'
        as
        select 
            b.* exclude(namespace)
            ,message_type as nbirth_or_ndeath_raw
            ,iff((message_type = 'NBIRTH') ,f.value:value ,null)::number as nbirth_bdSeq_raw
            ,iff((message_type = 'NDEATH') ,f.value:value ,null)::number as ndeath_bdSeq_raw
            ,inserted_at as nbirth_ndeath_inserted_at_raw
        from sparkplug_messages_vw as b
            ,lateral flatten (input => b.message:metrics) as f
        where message_type in ('NBIRTH' ,'NDEATH')
         and f.value:name::varchar = 'bdSeq'
         ;
    
    create or replace view device_records_vw
        change_tracking = true
        as
        select 
            b.* exclude(namespace)
        	,null as nbirth_or_ndeath_raw
        	,null as nbirth_bdSeq_raw
            ,null as ndeath_bdSeq_raw
            ,null as nbirth_ndeath_inserted_at_raw
        from sparkplug_messages_vw as b
        where message_type in ('DBIRTH' ,'DDATA')
        ;
    
    create or replace stream device_records_stream
        on view device_records_vw
    
        show_initial_rows = true
        comment = 'used for monitoring latest device messages'
        ;
    
    create or replace view sparkplug_msgs_nodebirth_contextualized_vw
        as
        with device_node_unioned as (
            select *
            from node_birth_death_vw
            union all
            select * exclude(METADATA$ROW_ID ,METADATA$ACTION ,METADATA$ISUPDATE)
            from device_records_stream
        )
        select 
            -- group_id ,message_type ,edge_node_id ,device_id 
            -- ,message ,message_sequence ,inserted_at
            * exclude(nbirth_or_ndeath_raw ,nbirth_bdSeq_raw ,ndeath_bdSeq_raw ,nbirth_ndeath_inserted_at_raw)
            ,nvl(nbirth_or_ndeath_raw
                    ,lag(nbirth_or_ndeath_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as nbirth_or_ndeath
    
            ,nvl(nbirth_bdSeq_raw
                    ,lag(nbirth_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as nbirth_bdSeq
    
            ,nvl(ndeath_bdSeq_raw
                    ,lag(ndeath_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as ndeath_bdSeq
    
            ,nvl(nbirth_ndeath_inserted_at_raw
                    ,lag(nbirth_ndeath_inserted_at_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as nbirth_ndeath_inserted_at
    
            ,case true
                when (nbirth_or_ndeath = 'NBIRTH') then false
                when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq != ndeath_bdSeq) ) then false
                when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq = ndeath_bdSeq) ) then true
                else true
            end as is_last_known_good_reading
    
            ,case lower(message_type)
                    when lower('NBIRTH') then 1
                    when lower('DBIRTH') then 2
                    when lower('DDATA') then 3
                    when lower('DDEATH') then 4
                    when lower('NDEATH') then 5
                    else 99
                end as message_type_order
    
            ,(nbirth_or_ndeath = 'NBIRTH') as is_node_alive
    
        from device_node_unioned
        ;
    
    create or replace view sparkplug_messages_flattened_vw
        as
        with base as (
            select 
                -- sparkplugb message level
                msg_id ,group_id, edge_node_id ,device_id ,message_type 
                ,message_sequence ,inserted_at 
                ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
                ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
                ,message_type_order ,is_node_alive	
                ,message_timestamp as root_message_timestamp
    
                -- attributes related to device data (ddata / dbirth)
                ,f.value:name::varchar as device_name
                ,f.value:value:reference::varchar as template_reference
                ,f.value:value:version::varchar as template_version
                ,f.value:timestamp::number as device_metric_timestamp
                ,f.value as ddata_msg
    
                -- attributes related to device level metrics
                ,concat(msg_id ,'^' ,f.index ,'::',d.index) as device_measure_uuid
                ,d.value:name::varchar as measure_name
                ,d.value:value as measure_value
                ,d.value:timestamp::number as measure_timestamp
                
            from sparkplug_msgs_nodebirth_contextualized_vw as b
                ,lateral flatten(input => b.message:metrics) as f
                ,lateral flatten(input => f.value:value:metrics) as d
            where message_type in ('DBIRTH' ,'DDATA')
                and template_reference is not null
        )
        select
            group_id, edge_node_id ,device_id ,message_type 
            ,message_sequence ,inserted_at 
            ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
            ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
            ,message_type_order ,is_node_alive ,root_message_timestamp
                
            ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
            ,null as is_historical
    
            ,device_measure_uuid
            ,object_agg(distinct measure_name ,measure_value) as measures_info
            ,measure_timestamp
            
            ,to_timestamp(measure_timestamp/1000) as measure_ts 
            ,to_date(measure_ts) as measure_date
            ,hour(measure_ts) as measure_hour
        from base
        group by group_id, edge_node_id ,device_id ,message_type 
                ,message_sequence ,inserted_at 
                ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
                ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
                ,message_type_order ,is_node_alive ,root_message_timestamp
    
                ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
                ,is_historical ,device_measure_uuid
                ,measure_timestamp
        
        ;
    
    create or replace transient table sparkplug_device_messages (
            group_id varchar 
            ,edge_node_id varchar
            ,device_id varchar
            ,message_type varchar
            ,message_sequence number
            
            ,inserted_at number
            ,nbirth_or_ndeath varchar
            ,nbirth_bdseq number
            ,ndeath_bdseq number
            ,nbirth_ndeath_inserted_at number
            ,is_last_known_good_reading boolean
            ,message_type_order number
            ,is_node_alive boolean
    
        	,root_message_timestamp number
            ,device_name varchar
            ,template_reference varchar
            ,template_version varchar
            ,device_metric_timestamp number
            ,ddata_msg variant
            ,is_historical boolean
    
            ,device_measure_uuid varchar
            ,measures_info variant
            ,measure_timestamp number
    
            ,measure_ts timestamp
            ,measure_date date
            ,measure_hour number
        )
        cluster by ( group_id ,edge_node_id ,device_id 
            ,template_reference ,template_version ,device_name 
            ,measure_date ,measure_hour)
        comment = 'materialized version of the sparkplug_messages_flattened_vw for easier downstream pipelines.'
        ;
        
    
    -- --  >>>>>>>>>>>>>>>>>>>>>>
    -- ================
    --  NODE BIRTH related assets
    -- ================
    
    create or replace stream nbirth_stream
        on view nbirth_vw
    
        show_initial_rows = true
        comment = 'stream to monitor for nbirth messages, so that assets are created automatically'
        ;


    • Expected Result: Stream NBIRTH_STREAM successfully created.
  • Script 03

    Code Block
    languagesql
    titleSQL Script 03
    collapsetrue
    set cl_bridge_node_db = 'cl_bridge_node_db';
    set staging_schema = 'stage_db';
    
    use role sysadmin;
    use database identifier($cl_bridge_node_db);
    use schema identifier($staging_schema);
    
    CREATE OR REPLACE PROCEDURE synch_device_messages() 
    RETURNS VARIANT NOT NULL
    LANGUAGE JAVASCRIPT
    COMMENT = 'Synch latest device updates and stores in table'
    AS
    $$
    
    // --- MAIN --------------------------------------
    var failure_err_msg = [];
    var return_result_as_json = {};
    var sucess_count = 0;
    var failure_count = 0;
    
    const qry = `
    insert into sparkplug_device_messages
    select *
    from sparkplug_messages_flattened_vw
    ;`
    
    res = [];
    try {
    var rs = snowflake.execute({ sqlText: qry });
    sucess_count = sucess_count + 1;
    
    } catch (err) {
    failure_count = failure_count + 1;
    failure_err_msg.push(` {
    sqlstatement : ‘${qry}’,
    error_code : ‘${err.code}’,
    error_state : ‘${err.state}’,
    error_message : ‘${err.message}’,
    stack_trace : ‘${err.stackTraceTxt}’
    } `);
    }
    
    return_result_as_json['asset_creation'] = res;
    return_result_as_json['Success'] = sucess_count;
    return_result_as_json['Failures'] = failure_count;
    return_result_as_json['Failure_error'] = failure_err_msg;
    return return_result_as_json;
    $$; 
    
    CREATE OR REPLACE FUNCTION GENERATE_TEMPLATE_ASSET_BASE_NAME
    (PARAM_TEMPLATE_NAME varchar ,PARAM_TEMPLATE_VERSION varchar) 
    RETURNS VARCHAR
    LANGUAGE JAVASCRIPT
    COMMENT = 'Used for generating device template name.'
    AS $$
    function normalize_name_str(p_str) {
    return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
    }
    
    function get_device_view_base_name(p_machine ,p_version) {
    const v = (p_version != null) ? p_version : "";
    return normalize_name_str(`${p_machine}${v}`);
    }
    
    
    return get_device_view_base_name(PARAM_TEMPLATE_NAME ,PARAM_TEMPLATE_VERSION);
    $$
    ;
    
    
    CREATE OR REPLACE FUNCTION GENERATE_DEVICE_BASE_VIEW_DDL
    ( PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_TEMPLATE_REFERENCE VARCHAR
    ,PARAM_SOURCE_DB varchar
    ,PARAM_SOURCE_SCHEMA varchar
    ,PARAM_TARGET_SCHEMA VARCHAR
    ,PARAM_TEMPLATE_ASSET_BASE_NAME varchar
    ,PARAM_TEMPLATE_DEFN variant
    ,PARAM_FORCE_RECREATE boolean
    ) 
    RETURNS VARCHAR
    LANGUAGE JAVASCRIPT
    COMMENT = 'Used for generating generic view ddl for device template.'
    AS $$
    
    var stmt_condition = `create view if not exists`;
    if (PARAM_FORCE_RECREATE == true)
    stmt_condition = `create or replace view`;
    
    const sql_stmt = `
    ${stmt_condition} ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${PARAM_TEMPLATE_ASSET_BASE_NAME}
    as 
    select 
    group_id ,edge_node_id ,device_id ,message_type ,message_sequence ,root_message_timestamp 
    ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
    ,nbirth_ndeath_inserted_at ,is_last_known_good_reading 
    ,message_type_order ,is_node_alive
    
    ,device_name ,template_version ,device_metric_timestamp ,ddata_msg ,is_historical
    
    ,device_measure_uuid ,measures_info ,measure_timestamp 
    
    ,measure_ts ,measure_date ,measure_hour
    
    from ${PARAM_SOURCE_SCHEMA}.sparkplug_device_messages
    where group_id = '${PARAM_GROUP_ID}'
    and edge_node_id = '${PARAM_EDGE_NODE_ID}'
    and template_reference = '${PARAM_TEMPLATE_REFERENCE}'
    ;
    `;
    
    return sql_stmt;
    $$
    ;
    
    
    
    CREATE OR REPLACE FUNCTION GENERATE_DEVICE_VIEW_DDL
    ( PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_TEMPLATE_REFERENCE VARCHAR
    ,PARAM_SOURCE_DB varchar
    ,PARAM_TARGET_SCHEMA VARCHAR
    ,PARAM_TEMPLATE_ASSET_BASE_NAME varchar
    ,PARAM_TEMPLATE_DEFN variant
    ,PARAM_FORCE_RECREATE boolean
    ) 
    RETURNS VARCHAR
    LANGUAGE JAVASCRIPT
    COMMENT = 'Used for generating generic view ddl for device template.'
    AS $$
    
    function normalize_name_str(p_str) {
    return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
    }
    
    function build_column_ddl_defn(p_template_defn) {
    var cols = [];
    const data_type_map = {
    "Int32":"::integer"
    ,"Int64":"::integer"
    ,"Float":"::double"
    ,"Template":"::variant"
    ,"Boolean":"::boolean"
    ,"String":"::varchar"
    };
    
    const m_entries = p_template_defn['value']['metrics']
    for (const [m_key, m_value] of Object.entries(m_entries)) {
    
    const measure_name = m_value['name'];
    const dtype = m_value['dataType'];
    
    const mname_cleansed = normalize_name_str(measure_name);
    // # default string cast, if the datatype is not mapped
    const dtype_converted = data_type_map[dtype] || "::varchar";
    
    const col_defn = `measures_info:"${measure_name}"${dtype_converted} as ${mname_cleansed} `;
    cols.push(col_defn);
    }
    
    const cols_joined = cols.join(',');
    return cols_joined
    }
    
    const vw_name = `${PARAM_TEMPLATE_ASSET_BASE_NAME}_vw`;
    const cols_joined = build_column_ddl_defn(PARAM_TEMPLATE_DEFN)
    
    const sql_stmt = `
    create or replace view ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${vw_name}
    as 
    select 
    * exclude(ddata_msg ,measures_info ,template_version)
    ,${cols_joined}
    from ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${PARAM_TEMPLATE_ASSET_BASE_NAME}
    ;
    `;
    
    return sql_stmt;
    $$; 
    
    CREATE OR REPLACE FUNCTION GENERATE_DEVICE_ASOF_VIEW_DDL
    ( PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_TEMPLATE_REFERENCE VARCHAR
    ,PARAM_SOURCE_DB varchar
    ,PARAM_TARGET_SCHEMA VARCHAR
    ,PARAM_TEMPLATE_ASSET_BASE_NAME varchar
    ,PARAM_TEMPLATE_DEFN variant
    ,PARAM_FORCE_RECREATE boolean
    ) 
    RETURNS VARCHAR
    LANGUAGE JAVASCRIPT
    COMMENT = 'Used for generating device asof view ddl.'
    AS $$
    
    function normalize_name_str(p_str) {
    return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
    }
    
    function build_column_ddl_defn(p_template_defn) {
    var cols = []
    
    const data_type_map = {
    "Int32":"::integer"
    ,"Int64":"::integer"
    ,"Float":"::double"
    ,"Template":"::variant"
    ,"Boolean":"::boolean"
    ,"String":"::varchar"
    }
    
    const m_entries = p_template_defn['value']['metrics']
    for (const [m_key, m_value] of Object.entries(m_entries)) {
    
    const measure_name = m_value['name'];
    const dtype = m_value['dataType'];
    
    const mname_cleansed = normalize_name_str(measure_name);
    // # default string cast, if the datatype is not mapped
    const dtype_converted = data_type_map[dtype] || "::varchar";
    
    const col_defn = `nvl(${mname_cleansed}
    ,lag(${mname_cleansed}) ignore nulls over (order by message_type_order ,measure_timestamp ,message_sequence) 
    ) AS ${mname_cleansed}
    `;
    
    cols.push(col_defn);
    }
    
    const cols_joined = cols.join(',');
    return cols_joined
    }
    
    const vw_name = `${PARAM_TEMPLATE_ASSET_BASE_NAME}_vw`;
    const recordasof_vw_name = `${PARAM_TEMPLATE_ASSET_BASE_NAME}_asof_vw`;
    const cols_joined = build_column_ddl_defn(PARAM_TEMPLATE_DEFN)
    
    const sql_stmt = `
    create or replace secure view ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${recordasof_vw_name}
    as
    select 
    group_id ,edge_node_id ,device_id ,device_name ,message_sequence 
    ,root_message_timestamp ,inserted_at
    
    ,message_type ,message_type_order
    
    ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
    ,nbirth_ndeath_inserted_at ,is_last_known_good_reading 
    ,is_node_alive
    ,is_historical
    
    ,device_measure_uuid
    ,measure_timestamp 
    ,measure_ts ,measure_date ,measure_hour
    
    ,${cols_joined}
    from ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${vw_name}
    order by measure_timestamp ,message_type_order ,message_sequence
    ;
    `;
    
    return sql_stmt;
    $$; 
    
    


    • Expected Result: Function GENERATE_DEVICE_ASOF_VIEW_DDL successfully created.
  • Script 04

    Code Block
    languagesql
    titleSQL Script 04
    collapsetrue


    • Expected Result: Function CREATE_EDGE_NODE_SCHEMA successfully created.
  • Script 05

    Code Block
    languagesql
    titleSQL Script 05
    collapsetrue


    • Expected Result: Function CREATE_ALL_EDGE_NODE_SCHEMAS successfully created.
  • Script 10

    Code Block
    languagesql
    titleSQL Script 10
    collapsetrue


    • Expected Result: Statement executed successfully.
  • Script 11

    Code Block
    languagesql
    titleSQL Script 11
    collapsetrue


    • Expected Result: 

...