Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Script 02

    Code Block
    languagesql
    titleSQL Script 02
    collapsetrue
    -- =========================
    -- In this script, we are setting up assets related to the node database
    -- ,which would eventually contain all the device specific views and tables.
    -- At the very core, the following assets are created:
    --  - Node Database
    --  - Staging schema
    
    -- The database & schema will be owned by SYSADMIN
    
    -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
    -- =========================
    
    set staged_sparkplug_raw_table = 'cl_bridge_stage_db.stage_db.sparkplug_raw';
    set cl_bridge_node_db = 'cl_bridge_node_db';
    set staging_schema = 'stage_db';
    
    -- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>>
    
    use role sysadmin;
    
    create database if not exists identifier($cl_bridge_node_db)
       -- DATA_RETENTION_TIME_IN_DAYS = 90
       -- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90
       comment = 'used for storing flattened messages processed from the staging database'
    ;
    
    -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>>
    
    use database identifier($cl_bridge_node_db);
    
    create schema if not exists identifier($staging_schema)
      with managed access
      -- data_retention_time_in_days = 90
      -- max_data_extension_time_in_days = 90
      comment = 'used for storing flattened messages processed from the staging database';
    
    -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>>
    
    use schema identifier($staging_schema);
    
    -- =========================
    -- Define tables
    -- =========================
    
    -- NOTE THE 'cl_bridge_stage_db.stage_db.sparkplug_raw' is hardcoded here; as the identifier
    -- staged_sparkplug_raw_table replacement does not work.
    
    create or replace view sparkplug_messages_vw
        change_tracking = true
        comment = 'parses out the core attributes from the message and topic.'
        as 
        select 
            uuid as msg_id
            ,namespace
            ,group_id
            ,msg_type as message_type
            ,edge_node_id
            ,device_id
            ,parse_json(msg) as message
            ,message:seq::int as message_sequence
            ,message:timestamp::number as message_timestamp
            ,inserted_at
        from identifier($staged_cl_bridge_stage_db.stage_db.sparkplug_raw_table)
        ;
    
    -- --  >>>>>>>>>>>>>>>>>>>>>>
    
    create or replace view nbirth_vw 
        change_tracking = true
        comment = 'filtered to nbirth messages. This is a mirror'
        as 
        select
           group_id ,edge_node_id 
        from sparkplug_messages_vw
        where message_type = 'NBIRTH'
        
        ;
    
    create or replace view node_machine_registry_vw 
        comment = 'Used to retreive the latest template definitions for a given group and edge_node'
        as 
        with base as (
            select
                group_id ,edge_node_id 
                ,max_by(message ,message_timestamp) as message
                ,max(message_timestamp) as latest_message_timestamp
            from sparkplug_messages_vw
            where message_type = 'NBIRTH'
            group by group_id ,edge_node_id
        )
        select 
            group_id ,edge_node_id
            ,f.value as template_definition
            ,template_definition:name::varchar as machine
            ,template_definition:reference::varchar as reference
            ,template_definition:version::varchar as version
            ,template_definition:timestamp::int as timestamp
        from base as b
            ,lateral flatten (input => b.message:metrics) f
        where template_definition:dataType::varchar = 'Template'
        ;
    
    -- --  >>>>>>>>>>>>>>>>>>>>>>
    
    create or replace view node_birth_death_vw
        comment = 'shows the latest node birth & death messages for each device'
        as
        select 
            b.* exclude(namespace)
            ,message_type as nbirth_or_ndeath_raw
            ,iff((message_type = 'NBIRTH') ,f.value:value ,null)::number as nbirth_bdSeq_raw
            ,iff((message_type = 'NDEATH') ,f.value:value ,null)::number as ndeath_bdSeq_raw
            ,inserted_at as nbirth_ndeath_inserted_at_raw
        from sparkplug_messages_vw as b
            ,lateral flatten (input => b.message:metrics) as f
        where message_type in ('NBIRTH' ,'NDEATH')
         and f.value:name::varchar = 'bdSeq'
         ;
    
    create or replace view device_records_vw
        change_tracking = true
        as
        select 
            b.* exclude(namespace)
        	,null as nbirth_or_ndeath_raw
        	,null as nbirth_bdSeq_raw
            ,null as ndeath_bdSeq_raw
            ,null as nbirth_ndeath_inserted_at_raw
        from sparkplug_messages_vw as b
        where message_type in ('DBIRTH' ,'DDATA')
        ;
    
    create or replace stream device_records_stream
        on view device_records_vw
    
        show_initial_rows = true
        comment = 'used for monitoring latest device messages'
        ;
    
    create or replace view sparkplug_msgs_nodebirth_contextualized_vw
        as
        with device_node_unioned as (
            select *
            from node_birth_death_vw
            union all
            select * exclude(METADATA$ROW_ID ,METADATA$ACTION ,METADATA$ISUPDATE)
            from device_records_stream
        )
        select 
            -- group_id ,message_type ,edge_node_id ,device_id 
            -- ,message ,message_sequence ,inserted_at
            * exclude(nbirth_or_ndeath_raw ,nbirth_bdSeq_raw ,ndeath_bdSeq_raw ,nbirth_ndeath_inserted_at_raw)
            ,nvl(nbirth_or_ndeath_raw
                    ,lag(nbirth_or_ndeath_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as nbirth_or_ndeath
    
            ,nvl(nbirth_bdSeq_raw
                    ,lag(nbirth_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as nbirth_bdSeq
    
            ,nvl(ndeath_bdSeq_raw
                    ,lag(ndeath_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as ndeath_bdSeq
    
            ,nvl(nbirth_ndeath_inserted_at_raw
                    ,lag(nbirth_ndeath_inserted_at_raw) ignore nulls over (order by inserted_at ,message_sequence) 
                ) as nbirth_ndeath_inserted_at
    
            ,case true
                when (nbirth_or_ndeath = 'NBIRTH') then false
                when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq != ndeath_bdSeq) ) then false
                when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq = ndeath_bdSeq) ) then true
                else true
            end as is_last_known_good_reading
    
            ,case lower(message_type)
                    when lower('NBIRTH') then 1
                    when lower('DBIRTH') then 2
                    when lower('DDATA') then 3
                    when lower('DDEATH') then 4
                    when lower('NDEATH') then 5
                    else 99
                end as message_type_order
    
            ,(nbirth_or_ndeath = 'NBIRTH') as is_node_alive
    
        from device_node_unioned
        ;
    
    create or replace view sparkplug_messages_flattened_vw
        as
        with base as (
            select 
                -- sparkplugb message level
                msg_id ,group_id, edge_node_id ,device_id ,message_type 
                ,message_sequence ,inserted_at 
                ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
                ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
                ,message_type_order ,is_node_alive	
                ,message_timestamp as root_message_timestamp
    
                -- attributes related to device data (ddata / dbirth)
                ,f.value:name::varchar as device_name
                ,f.value:value:reference::varchar as template_reference
                ,f.value:value:version::varchar as template_version
                ,f.value:timestamp::number as device_metric_timestamp
                ,f.value as ddata_msg
    
                -- attributes related to device level metrics
                ,concat(msg_id ,'^' ,f.index ,'::',d.index) as device_measure_uuid
                ,d.value:name::varchar as measure_name
                ,d.value:value as measure_value
                ,d.value:timestamp::number as measure_timestamp
                
            from sparkplug_msgs_nodebirth_contextualized_vw as b
                ,lateral flatten(input => b.message:metrics) as f
                ,lateral flatten(input => f.value:value:metrics) as d
            where message_type in ('DBIRTH' ,'DDATA')
                and template_reference is not null
        )
        select
            group_id, edge_node_id ,device_id ,message_type 
            ,message_sequence ,inserted_at 
            ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
            ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
            ,message_type_order ,is_node_alive ,root_message_timestamp
                
            ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
            ,null as is_historical
    
            ,device_measure_uuid
            ,object_agg(distinct measure_name ,measure_value) as measures_info
            ,measure_timestamp
            
            ,to_timestamp(measure_timestamp/1000) as measure_ts 
            ,to_date(measure_ts) as measure_date
            ,hour(measure_ts) as measure_hour
        from base
        group by group_id, edge_node_id ,device_id ,message_type 
                ,message_sequence ,inserted_at 
                ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq 
                ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
                ,message_type_order ,is_node_alive ,root_message_timestamp
    
                ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
                ,is_historical ,device_measure_uuid
                ,measure_timestamp
        
        ;
    
    create or replace transient table sparkplug_device_messages (
            group_id varchar 
            ,edge_node_id varchar
            ,device_id varchar
            ,message_type varchar
            ,message_sequence number
            
            ,inserted_at number
            ,nbirth_or_ndeath varchar
            ,nbirth_bdseq number
            ,ndeath_bdseq number
            ,nbirth_ndeath_inserted_at number
            ,is_last_known_good_reading boolean
            ,message_type_order number
            ,is_node_alive boolean
    
        	,root_message_timestamp number
            ,device_name varchar
            ,template_reference varchar
            ,template_version varchar
            ,device_metric_timestamp number
            ,ddata_msg variant
            ,is_historical boolean
    
            ,device_measure_uuid varchar
            ,measures_info variant
            ,measure_timestamp number
    
            ,measure_ts timestamp
            ,measure_date date
            ,measure_hour number
        )
        cluster by ( group_id ,edge_node_id ,device_id 
            ,template_reference ,template_version ,device_name 
            ,measure_date ,measure_hour)
        comment = 'materialized version of the sparkplug_messages_flattened_vw for easier downstream pipelines.'
        ;
        
    
    -- --  >>>>>>>>>>>>>>>>>>>>>>
    -- ================
    --  NODE BIRTH related assets
    -- ================
    
    create or replace stream nbirth_stream
        on view nbirth_vw
    
        show_initial_rows = true
        comment = 'stream to monitor for nbirth messages, so that assets are created automatically'
        ;


    • Expected Result: Stream NBIRTH_STREAM successfully created.
  • Script 03

    Code Block
    languagesql
    titleSQL Script 03
    collapsetrue
    set cl_bridge_node_db = 'cl_bridge_node_db';
    set staging_schema = 'stage_db';
    
    use role sysadmin;
    use database identifier($cl_bridge_node_db);
    use schema identifier($staging_schema);
    
    CREATE OR REPLACE PROCEDURE synch_device_messages() 
        RETURNS VARIANT NOT NULL
        LANGUAGE JAVASCRIPT
        COMMENT = 'Synch latest device updates and stores in table'
        AS
        $$
    
        // --- MAIN --------------------------------------
        var failure_err_msg = [];
        var return_result_as_json = {};
        var sucess_count = 0;
        var failure_count = 0;
            
        const qry = `
        insert into sparkplug_device_messages
        select *
        from sparkplug_messages_flattened_vw
        ;`
        
        res = [];
        try {
            var rs = snowflake.execute({ sqlText: qry });
            sucess_count = sucess_count + 1;
    
        } catch (err) {
            failure_count = failure_count + 1;
            failure_err_msg.push(` {
            sqlstatement : ‘${qry}’,
                error_code : ‘${err.code}’,
                error_state : ‘${err.state}’,
                error_message : ‘${err.message}’,
                stack_trace : ‘${err.stackTraceTxt}’
                } `);
        }
        
        return_result_as_json['asset_creation'] = res;
        return_result_as_json['Success'] = sucess_count;
        return_result_as_json['Failures'] = failure_count;
        return_result_as_json['Failure_error'] = failure_err_msg;
        return return_result_as_json;
        $$; 
    
    CREATE OR REPLACE FUNCTION GENERATE_TEMPLATE_ASSET_BASE_NAME
        (PARAM_TEMPLATE_NAME varchar ,PARAM_TEMPLATE_VERSION varchar) 
      RETURNS VARCHAR
      LANGUAGE JAVASCRIPT
      COMMENT = 'Used for generating device template name.'
      AS $$
        function normalize_name_str(p_str) {
            return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
        }
    
        function get_device_view_base_name(p_machine ,p_version) {
            const v = (p_version != null) ? p_version : "";
            return normalize_name_str(`${p_machine}${v}`);
        }
    
    
      return get_device_view_base_name(PARAM_TEMPLATE_NAME ,PARAM_TEMPLATE_VERSION);
      $$
      ;
    
    
    CREATE OR REPLACE FUNCTION GENERATE_DEVICE_BASE_VIEW_DDL
        (   PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_TEMPLATE_REFERENCE VARCHAR
            ,PARAM_SOURCE_DB varchar
            ,PARAM_SOURCE_SCHEMA varchar
            ,PARAM_TARGET_SCHEMA VARCHAR
            ,PARAM_TEMPLATE_ASSET_BASE_NAME varchar
            ,PARAM_TEMPLATE_DEFN variant
            ,PARAM_FORCE_RECREATE boolean
            ) 
      RETURNS VARCHAR
      LANGUAGE JAVASCRIPT
      COMMENT = 'Used for generating generic view ddl for device template.'
      AS $$
    
        var stmt_condition = `create view if not exists`;
        if (PARAM_FORCE_RECREATE == true)
            stmt_condition = `create or replace view`;
            
        const sql_stmt = `
            ${stmt_condition} ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${PARAM_TEMPLATE_ASSET_BASE_NAME}
            as 
            select 
                group_id ,edge_node_id ,device_id ,message_type ,message_sequence ,root_message_timestamp 
                ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq  ,ndeath_bdseq 
                ,nbirth_ndeath_inserted_at  ,is_last_known_good_reading  
                ,message_type_order ,is_node_alive
    
                ,device_name ,template_version ,device_metric_timestamp ,ddata_msg ,is_historical
    
                ,device_measure_uuid ,measures_info ,measure_timestamp 
    
                ,measure_ts ,measure_date ,measure_hour
    
            from ${PARAM_SOURCE_SCHEMA}.sparkplug_device_messages
            where group_id = '${PARAM_GROUP_ID}'
                and edge_node_id = '${PARAM_EDGE_NODE_ID}'
                and template_reference = '${PARAM_TEMPLATE_REFERENCE}'
            ;
        `;
    
        return sql_stmt;
      $$
      ;
    
    
    
    CREATE OR REPLACE FUNCTION GENERATE_DEVICE_VIEW_DDL
        (   PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_TEMPLATE_REFERENCE VARCHAR
            ,PARAM_SOURCE_DB varchar
            ,PARAM_TARGET_SCHEMA VARCHAR
            ,PARAM_TEMPLATE_ASSET_BASE_NAME varchar
            ,PARAM_TEMPLATE_DEFN variant
            ,PARAM_FORCE_RECREATE boolean
            ) 
      RETURNS VARCHAR
      LANGUAGE JAVASCRIPT
      COMMENT = 'Used for generating generic view ddl for device template.'
      AS $$
    
        function normalize_name_str(p_str) {
            return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
        }
    
        function build_column_ddl_defn(p_template_defn) {
            var cols = [];
            const data_type_map = {
                "Int32":"::integer"
                ,"Int64":"::integer"
                ,"Float":"::double"
                ,"Template":"::variant"
                ,"Boolean":"::boolean"
                ,"String":"::varchar"
                };
    
            const m_entries = p_template_defn['value']['metrics']
            for (const [m_key, m_value] of Object.entries(m_entries)) {
                
                const measure_name = m_value['name'];
                const dtype = m_value['dataType'];
                
                const mname_cleansed = normalize_name_str(measure_name);
                //     # default string cast, if the datatype is not mapped
                const dtype_converted = data_type_map[dtype] || "::varchar";
                
                const col_defn = `measures_info:"${measure_name}"${dtype_converted} as ${mname_cleansed} `;
                cols.push(col_defn);
            }
            
            const cols_joined = cols.join(',');
            return cols_joined
        }
    
        const vw_name = `${PARAM_TEMPLATE_ASSET_BASE_NAME}_vw`;
        const cols_joined = build_column_ddl_defn(PARAM_TEMPLATE_DEFN)
    
        const sql_stmt = `
            create or replace view ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${vw_name}
            as 
            select 
                * exclude(ddata_msg  ,measures_info ,template_version)
                ,${cols_joined}
            from ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${PARAM_TEMPLATE_ASSET_BASE_NAME}
            ;
        `;
    
        return sql_stmt;
    $$; 
    
    CREATE OR REPLACE FUNCTION GENERATE_DEVICE_ASOF_VIEW_DDL
        (   PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_TEMPLATE_REFERENCE VARCHAR
            ,PARAM_SOURCE_DB varchar
            ,PARAM_TARGET_SCHEMA VARCHAR
            ,PARAM_TEMPLATE_ASSET_BASE_NAME varchar
            ,PARAM_TEMPLATE_DEFN variant
            ,PARAM_FORCE_RECREATE boolean
            ) 
      RETURNS VARCHAR
      LANGUAGE JAVASCRIPT
      COMMENT = 'Used for generating device asof view ddl.'
      AS $$
    
        function normalize_name_str(p_str) {
            return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
        }
    
        function build_column_ddl_defn(p_template_defn) {
            var cols = []
    
            const data_type_map = {
                    "Int32":"::integer"
                    ,"Int64":"::integer"
                    ,"Float":"::double"
                    ,"Template":"::variant"
                    ,"Boolean":"::boolean"
                    ,"String":"::varchar"
                    }
    
            const m_entries = p_template_defn['value']['metrics']
            for (const [m_key, m_value] of Object.entries(m_entries)) {
                
                const measure_name = m_value['name'];
                const dtype = m_value['dataType'];
                
                const mname_cleansed = normalize_name_str(measure_name);
                //     # default string cast, if the datatype is not mapped
                const dtype_converted = data_type_map[dtype] || "::varchar";
                
                const col_defn = `nvl(${mname_cleansed}
                        ,lag(${mname_cleansed}) ignore nulls over (order by message_type_order ,measure_timestamp ,message_sequence) 
                    ) AS ${mname_cleansed}
                `;
                
                cols.push(col_defn);
            }
            
            const cols_joined = cols.join(',');
            return cols_joined
        }
    
        const vw_name = `${PARAM_TEMPLATE_ASSET_BASE_NAME}_vw`;
        const recordasof_vw_name = `${PARAM_TEMPLATE_ASSET_BASE_NAME}_asof_vw`;
        const cols_joined = build_column_ddl_defn(PARAM_TEMPLATE_DEFN)
    
        const sql_stmt = `
            create or replace secure view ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${recordasof_vw_name}
            as
            select 
                    group_id ,edge_node_id ,device_id ,device_name ,message_sequence 
                    ,root_message_timestamp ,inserted_at
    
                    ,message_type ,message_type_order
    
                    ,nbirth_or_ndeath ,nbirth_bdseq  ,ndeath_bdseq 
                    ,nbirth_ndeath_inserted_at  ,is_last_known_good_reading 
                    ,is_node_alive
                    ,is_historical
    
                    ,device_measure_uuid
                    ,measure_timestamp 
                    ,measure_ts ,measure_date ,measure_hour
    
                    ,${cols_joined}
                from ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${vw_name}
                order by measure_timestamp ,message_type_order ,message_sequence
                ;
        `;
    
        return sql_stmt;
    $$; 


    • Expected Result: Function GENERATE_DEVICE_ASOF_VIEW_DDL successfully created.
  • Script 04

    Code Block
    languagesql
    titleSQL Script 04
    collapsetrue
    set cl_bridge_node_db = 'cl_bridge_node_db';
    set staging_schema = 'stage_db';
    
    use role sysadmin;
    use database identifier($cl_bridge_node_db);
    use schema identifier($staging_schema);
    
    CREATE OR REPLACE FUNCTION NORMALIZE_ASSET_NAME(P_STR VARCHAR)
      RETURNS VARCHAR
      LANGUAGE JAVASCRIPT
      COMMENT = 'Used for creating asset names without spaces/special characters.'
      AS $$
      return P_STR.replace(/[\W_]+/g,"_").trim().toLowerCase();
      $$
      ;
    
    CREATE OR REPLACE FUNCTION EDGENODE_SCHEMA_NAME(PARAM_SCHEMA_PREFIX VARCHAR ,PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR)
      RETURNS VARCHAR
      LANGUAGE JAVASCRIPT
      COMMENT = 'Used for creating asset names for edgenode schema.'
      AS $$
    
        function normalize_name_str(p_str) {
            return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
        }
        const schema_name = `${PARAM_SCHEMA_PREFIX}_${PARAM_GROUP_ID}_${PARAM_EDGE_NODE_ID}`;
        return normalize_name_str(schema_name);
      $$
      ;
    
    CREATE OR REPLACE FUNCTION GENERATE_EDGENODE_SCHEMA_DDL
        (PARAM_SCHEMA_PREFIX VARCHAR ,PARAM_GROUP_ID VARCHAR 
            ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_FORCE_RECREATE boolean)
      RETURNS VARCHAR
      LANGUAGE JAVASCRIPT
      COMMENT = 'Used for generating edgenode schema ddl.'
      AS $$
    
        function normalize_name_str(p_str) {
            return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
        }
    
        // Returns the normalized schema name for the edgenode
        function get_edgenode_schema_name(p_schema_prefix ,p_group_id ,p_edge_node_id) {
            const schema_name = `${p_schema_prefix}_${p_group_id}_${p_edge_node_id}`;
            return normalize_name_str(schema_name);
        } 
    
        const schema_name = get_edgenode_schema_name(PARAM_SCHEMA_PREFIX  ,PARAM_GROUP_ID  ,PARAM_EDGE_NODE_ID);
        var sql_stmt = `create schema if not exists ${schema_name}; `
        if(PARAM_FORCE_RECREATE == true) {
            sql_stmt = `create or replace schema ${schema_name}; `
        }
        return sql_stmt;
      $$
      ;
    
    
    CREATE OR REPLACE PROCEDURE create_edge_node_schema(PARAM_SCHEMA_PREFIX VARCHAR ,PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_FORCE_RECREATE boolean) 
    RETURNS VARIANT NOT NULL
    LANGUAGE JAVASCRIPT
    COMMENT = 'Creates edge node specific schema, supposed to be invoked as part of NBIRTH messsage'
    AS
    $$
    
        function get_sql_stmt(p_schema_prefix ,p_group_id ,p_edge_node_id ,p_force_recreate) {
            const sql_stmt = `select 
    	        '${p_schema_prefix}' as schema_prefix
                ,iff(${p_force_recreate} = 1 ,true ,false) as force_recreate
                ,edgenode_schema_name(schema_prefix ,group_id ,edge_node_id) as edgenode_schema_name
    
                ,generate_edgenode_schema_ddl(schema_prefix, group_id ,edge_node_id ,force_recreate) as edgenode_schema_ddl
                ,machine 
                ,generate_template_asset_base_name(machine ,version) as machine_table_base_name
                ,template_definition 
                
                ,generate_device_base_view_ddl
                    (group_id ,edge_node_id  ,machine  
                        ,current_database() ,current_schema() ,edgenode_schema_name
                        ,machine_table_base_name ,template_definition
                        ,force_recreate) as device_base_view_ddl 
                
                ,generate_device_view_ddl
                    (group_id ,edge_node_id  ,machine  
                        ,current_database() ,edgenode_schema_name
                        ,machine_table_base_name ,template_definition
                        ,force_recreate) as device_view_ddl
    
                ,generate_device_asof_view_ddl
                    (group_id ,edge_node_id  ,machine  
                        ,current_database() ,edgenode_schema_name
                        ,machine_table_base_name ,template_definition
                        ,force_recreate) as device_asof_view_ddl
            
                from node_machine_registry_vw
                where group_id = '${p_group_id}' 
                    and edge_node_id = '${p_edge_node_id}'
                ;`
    
            return sql_stmt;
        }
    
        // --- MAIN --------------------------------------
        var failure_err_msg = [];
        var return_result_as_json = {};
        var sucess_count = 0;
        var failure_count = 0;
            
        const qry = get_sql_stmt(PARAM_SCHEMA_PREFIX ,PARAM_GROUP_ID ,PARAM_EDGE_NODE_ID ,PARAM_FORCE_RECREATE);
        var sql_stmt = qry;
        var schema_created = false;
        res = [];
        try {
            var rs = snowflake.execute({ sqlText: qry });
            while (rs.next()) {
                machine  = rs.getColumnValue('MACHINE');
                edgenode_schema_name = rs.getColumnValue('EDGENODE_SCHEMA_NAME');
                edgenode_schema_ddl = rs.getColumnValue('EDGENODE_SCHEMA_DDL');
                device_base_view_ddl = rs.getColumnValue('DEVICE_BASE_VIEW_DDL');
                device_view_ddl = rs.getColumnValue('DEVICE_VIEW_DDL');
                device_asof_view_ddl = rs.getColumnValue('DEVICE_ASOF_VIEW_DDL');
    
                if(schema_created == false) {
                    sql_stmt = edgenode_schema_ddl;
                    snowflake.execute({ sqlText: edgenode_schema_ddl });
                    //blind setting is not good, 
                    //TODO check if the schema is indeed created 
                    schema_created = true;
                }
    
                sql_stmt = device_base_view_ddl;
                snowflake.execute({ sqlText: device_base_view_ddl });
    
                sql_stmt = device_view_ddl;
                snowflake.execute({ sqlText: device_view_ddl });
    
                sql_stmt = device_asof_view_ddl;
                snowflake.execute({ sqlText: device_asof_view_ddl });
                
                sucess_count = sucess_count + 1;
                res.push(edgenode_schema_name + '.' + machine);
            }
    
        } catch (err) {
            failure_count = failure_count + 1;
            failure_err_msg.push(` {
                sqlstatement : ‘${sql_stmt}’,
                error_code : ‘${err.code}’,
                error_state : ‘${err.state}’,
                error_message : ‘${err.message}’,
                stack_trace : ‘${err.stackTraceTxt}’
                } `);
        }
    
        return_result_as_json['asset_creation'] = res;
         return_result_as_json['Success'] = sucess_count;
         return_result_as_json['Failures'] = failure_count;
         return_result_as_json['Failure_error'] = failure_err_msg;
        
        return return_result_as_json;
        $$;


    • Expected Result: Function CREATE_EDGE_NODE_SCHEMA successfully created.
  • Script 05

    Code Block
    languagesql
    titleSQL Script 05
    collapsetrue
    set cl_bridge_node_db = 'cl_bridge_node_db';
    set staging_schema = 'stage_db';
    
    use role sysadmin;
    use database identifier($cl_bridge_node_db);
    use schema identifier($staging_schema);
    
    
    CREATE OR REPLACE PROCEDURE create_all_edge_node_schemas(PARAM_SCHEMA_PREFIX VARCHAR ,PARAM_FORCE_RECREATE boolean) 
    RETURNS VARIANT NOT NULL
    LANGUAGE JAVASCRIPT
    COMMENT = 'Creates edge node specific schemas, supposed to be invoked as part of NBIRTH messsage'
    AS
    $$
    
    // --- MAIN --------------------------------------
    var failure_err_msg = [];
    var return_result_as_json = {};
    var sucess_count = 0;
    var failure_count = 0;
        
    const qry = `
    select distinct group_id ,edge_node_id 
        ,current_schema() as current_schema
    from node_machine_registry_vw
    where edge_node_id is not null
    ;`
    //node_machine_registry_vw
    //nbirth_stream
    
    var current_schema = 'public';
    res = [];
    try {
        var rs = snowflake.execute({ sqlText: qry });
        while (rs.next()) {
            group_id = rs.getColumnValue('GROUP_ID');
            edge_node_id = rs.getColumnValue('EDGE_NODE_ID');
            current_schema = rs.getColumnValue('CURRENT_SCHEMA');
    
            var schema_out = {}
            
            schema_out['execution'] = snowflake.execute({ 
                        sqlText: `call create_edge_node_schema('${PARAM_SCHEMA_PREFIX}' ,'${group_id}' ,'${edge_node_id}' ,${PARAM_FORCE_RECREATE});`
                    });
            
            res.push(schema_out);
         }
        sucess_count = sucess_count + 1;
    
    } catch (err) {
        failure_count = failure_count + 1;
        failure_err_msg.push(` {
        sqlstatement : ‘${qry}’,
            error_code : ‘${err.code}’,
            error_state : ‘${err.state}’,
            error_message : ‘${err.message}’,
            stack_trace : ‘${err.stackTraceTxt}’
            } `);
    }
       
    return_result_as_json['asset_creation'] = res;
    return_result_as_json['Success'] = sucess_count;
    return_result_as_json['Failures'] = failure_count;
    return_result_as_json['Failure_error'] = failure_err_msg;
    return return_result_as_json;
    $$; 


    • Expected Result: Function CREATE_ALL_EDGE_NODE_SCHEMAS successfully created.
  • Script 10

    Code Block
    languagesql
    titleSQL Script 10
    collapsetrue
    -- =========================
    -- In this script, we are setting up roles specifically requiring help of
    -- privlilleged roles like SYSADMIN. These are:
    --  - Create a custom role
    --  - Assign the custom role to create task and execute task
    --  - Create warehouse specifically used for ingestion
    --  - Grants
    
    -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
    -- =========================
    
    set processor_role = 'cl_bridge_process_rl';
    set cl_bridge_ingestion_warehouse = 'cl_bridge_ingest_wh';
    set staging_db = 'cl_bridge_stage_db';
    set staging_db_schema = 'cl_bridge_stage_db.stage_db';
    
    set node_db = 'cl_bridge_node_db';
    set node_db_staging_schema = 'cl_bridge_node_db.stage_db';
    
    -- >>>>>>>>>>>>>>>>>>>>>>>>>>> ROLE CREATION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
        use role securityadmin;
        create role if not exists identifier($processor_role)
            comment = 'role used by cirruslink bridge to ingest and process mqtt/sparkplug data';
    
        grant role identifier($processor_role)
            to role sysadmin;
    
    -- >>>>>>>>>>>>>>>>>>>>>>>>>>> WAREHOUSE >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
        use role sysadmin;
    
        create warehouse if not exists identifier($cl_bridge_ingestion_warehouse) with
            warehouse_type = standard
            warehouse_size = xsmall
            --max_cluster_count = 5
            initially_suspended = true
            comment = 'used by cirruslink bridge to ingest'
            ;
    
    
        grant usage on warehouse identifier($cl_bridge_ingestion_warehouse) 
            to role identifier($processor_role);
        grant operate on warehouse identifier($cl_bridge_ingestion_warehouse) 
            to role identifier($processor_role);
    
    -- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS STAGE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
        grant usage on database identifier($staging_db)
            to role identifier($processor_role);
    
        grant usage on schema identifier($staging_db_schema)
            to role identifier($processor_role);
    
        grant select on all tables in schema identifier($staging_db_schema)
            to role identifier($processor_role);
    
        grant insert on all tables in schema identifier($staging_db_schema)
            to role identifier($processor_role);
    
        grant select on future tables in schema identifier($staging_db_schema)
            to role identifier($processor_role);
    
        grant insert on future tables in schema identifier($staging_db_schema)
            to role identifier($processor_role);
    
    
    -- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS NODE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    
        grant usage on database identifier($node_db)
            to role identifier($processor_role);
    
        grant usage on schema identifier($node_db_staging_schema)
            to role identifier($processor_role);
    
        grant select on table sparkplug_device_messages
            to role identifier($processor_role);
    
        grant insert on table sparkplug_device_messages
            to role identifier($processor_role);
    
        grant select on all views in schema identifier($node_db_staging_schema)
            to role identifier($processor_role);
    
        grant usage on all functions in schema identifier($node_db_staging_schema)
            to role identifier($processor_role);  
    
        grant usage on all procedures in schema identifier($node_db_staging_schema)
            to role identifier($processor_role);
    
        -- need for:
        --  - creating edge node specific tables and views
        grant create schema on database identifier($node_db)
            to role identifier($processor_role);


    • Expected Result: Statement executed successfully.
  • Script 11

    Code Block
    languagesql
    titleSQL Script 11
    collapsetrue


    • Expected Result: 

...