Contents
Cirrus Link Resources
Cirrus Link Website
Contact Us (Sales/Support)
Forum
Cirrus Link Modules Docs for Ignition 7.9.x
Inductive Resources
Ignition User Manual
Knowledge Base Articles
Inductive University
Forum
...
Script 02
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
-- ========================= -- In this script, we are setting up assets related to the node database -- ,which would eventually contain all the device specific views and tables. -- At the very core, the following assets are created: -- - Node Database -- - Staging schema -- The database & schema will be owned by SYSADMIN -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT -- ========================= set staged_sparkplug_raw_table = 'cl_bridge_stage_db.stage_db.sparkplug_raw'; set cl_bridge_node_db = 'cl_bridge_node_db'; set staging_schema = 'stage_db'; -- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>> use role sysadmin; create database if not exists identifier($cl_bridge_node_db) -- DATA_RETENTION_TIME_IN_DAYS = 90 -- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90 comment = 'used for storing flattened messages processed from the staging database' ; -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>> use database identifier($cl_bridge_node_db); create schema if not exists identifier($staging_schema) with managed access -- data_retention_time_in_days = 90 -- max_data_extension_time_in_days = 90 comment = 'used for storing flattened messages processed from the staging database'; -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>> use schema identifier($staging_schema); -- ========================= -- Define tables -- ========================= -- NOTE THE 'cl_bridge_stage_db.stage_db.sparkplug_raw' is hardcoded here; as the identifier -- staged_sparkplug_raw_table replacement does not work. create or replace view sparkplug_messages_vw change_tracking = true comment = 'parses out the core attributes from the message and topic.' as select uuid as msg_id ,namespace ,group_id ,msg_type as message_type ,edge_node_id ,device_id ,parse_json(msg) as message ,message:seq::int as message_sequence ,message:timestamp::number as message_timestamp ,inserted_at from identifier($staged_cl_bridge_stage_db.stage_db.sparkplug_raw_table) ; -- -- >>>>>>>>>>>>>>>>>>>>>> create or replace view nbirth_vw change_tracking = true comment = 'filtered to nbirth messages. This is a mirror' as select group_id ,edge_node_id from sparkplug_messages_vw where message_type = 'NBIRTH' ; create or replace view node_machine_registry_vw comment = 'Used to retreive the latest template definitions for a given group and edge_node' as with base as ( select group_id ,edge_node_id ,max_by(message ,message_timestamp) as message ,max(message_timestamp) as latest_message_timestamp from sparkplug_messages_vw where message_type = 'NBIRTH' group by group_id ,edge_node_id ) select group_id ,edge_node_id ,f.value as template_definition ,template_definition:name::varchar as machine ,template_definition:reference::varchar as reference ,template_definition:version::varchar as version ,template_definition:timestamp::int as timestamp from base as b ,lateral flatten (input => b.message:metrics) f where template_definition:dataType::varchar = 'Template' ; -- -- >>>>>>>>>>>>>>>>>>>>>> create or replace view node_birth_death_vw comment = 'shows the latest node birth & death messages for each device' as select b.* exclude(namespace) ,message_type as nbirth_or_ndeath_raw ,iff((message_type = 'NBIRTH') ,f.value:value ,null)::number as nbirth_bdSeq_raw ,iff((message_type = 'NDEATH') ,f.value:value ,null)::number as ndeath_bdSeq_raw ,inserted_at as nbirth_ndeath_inserted_at_raw from sparkplug_messages_vw as b ,lateral flatten (input => b.message:metrics) as f where message_type in ('NBIRTH' ,'NDEATH') and f.value:name::varchar = 'bdSeq' ; create or replace view device_records_vw change_tracking = true as select b.* exclude(namespace) ,null as nbirth_or_ndeath_raw ,null as nbirth_bdSeq_raw ,null as ndeath_bdSeq_raw ,null as nbirth_ndeath_inserted_at_raw from sparkplug_messages_vw as b where message_type in ('DBIRTH' ,'DDATA') ; create or replace stream device_records_stream on view device_records_vw show_initial_rows = true comment = 'used for monitoring latest device messages' ; create or replace view sparkplug_msgs_nodebirth_contextualized_vw as with device_node_unioned as ( select * from node_birth_death_vw union all select * exclude(METADATA$ROW_ID ,METADATA$ACTION ,METADATA$ISUPDATE) from device_records_stream ) select -- group_id ,message_type ,edge_node_id ,device_id -- ,message ,message_sequence ,inserted_at * exclude(nbirth_or_ndeath_raw ,nbirth_bdSeq_raw ,ndeath_bdSeq_raw ,nbirth_ndeath_inserted_at_raw) ,nvl(nbirth_or_ndeath_raw ,lag(nbirth_or_ndeath_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as nbirth_or_ndeath ,nvl(nbirth_bdSeq_raw ,lag(nbirth_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as nbirth_bdSeq ,nvl(ndeath_bdSeq_raw ,lag(ndeath_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as ndeath_bdSeq ,nvl(nbirth_ndeath_inserted_at_raw ,lag(nbirth_ndeath_inserted_at_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as nbirth_ndeath_inserted_at ,case true when (nbirth_or_ndeath = 'NBIRTH') then false when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq != ndeath_bdSeq) ) then false when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq = ndeath_bdSeq) ) then true else true end as is_last_known_good_reading ,case lower(message_type) when lower('NBIRTH') then 1 when lower('DBIRTH') then 2 when lower('DDATA') then 3 when lower('DDEATH') then 4 when lower('NDEATH') then 5 else 99 end as message_type_order ,(nbirth_or_ndeath = 'NBIRTH') as is_node_alive from device_node_unioned ; create or replace view sparkplug_messages_flattened_vw as with base as ( select -- sparkplugb message level msg_id ,group_id, edge_node_id ,device_id ,message_type ,message_sequence ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,message_timestamp as root_message_timestamp -- attributes related to device data (ddata / dbirth) ,f.value:name::varchar as device_name ,f.value:value:reference::varchar as template_reference ,f.value:value:version::varchar as template_version ,f.value:timestamp::number as device_metric_timestamp ,f.value as ddata_msg -- attributes related to device level metrics ,concat(msg_id ,'^' ,f.index ,'::',d.index) as device_measure_uuid ,d.value:name::varchar as measure_name ,d.value:value as measure_value ,d.value:timestamp::number as measure_timestamp from sparkplug_msgs_nodebirth_contextualized_vw as b ,lateral flatten(input => b.message:metrics) as f ,lateral flatten(input => f.value:value:metrics) as d where message_type in ('DBIRTH' ,'DDATA') and template_reference is not null ) select group_id, edge_node_id ,device_id ,message_type ,message_sequence ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,root_message_timestamp ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg ,null as is_historical ,device_measure_uuid ,object_agg(distinct measure_name ,measure_value) as measures_info ,measure_timestamp ,to_timestamp(measure_timestamp/1000) as measure_ts ,to_date(measure_ts) as measure_date ,hour(measure_ts) as measure_hour from base group by group_id, edge_node_id ,device_id ,message_type ,message_sequence ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,root_message_timestamp ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg ,is_historical ,device_measure_uuid ,measure_timestamp ; create or replace transient table sparkplug_device_messages ( group_id varchar ,edge_node_id varchar ,device_id varchar ,message_type varchar ,message_sequence number ,inserted_at number ,nbirth_or_ndeath varchar ,nbirth_bdseq number ,ndeath_bdseq number ,nbirth_ndeath_inserted_at number ,is_last_known_good_reading boolean ,message_type_order number ,is_node_alive boolean ,root_message_timestamp number ,device_name varchar ,template_reference varchar ,template_version varchar ,device_metric_timestamp number ,ddata_msg variant ,is_historical boolean ,device_measure_uuid varchar ,measures_info variant ,measure_timestamp number ,measure_ts timestamp ,measure_date date ,measure_hour number ) cluster by ( group_id ,edge_node_id ,device_id ,template_reference ,template_version ,device_name ,measure_date ,measure_hour) comment = 'materialized version of the sparkplug_messages_flattened_vw for easier downstream pipelines.' ; -- -- >>>>>>>>>>>>>>>>>>>>>> -- ================ -- NODE BIRTH related assets -- ================ create or replace stream nbirth_stream on view nbirth_vw show_initial_rows = true comment = 'stream to monitor for nbirth messages, so that assets are created automatically' ; |
Script 03
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
set cl_bridge_node_db = 'cl_bridge_node_db'; set staging_schema = 'stage_db'; use role sysadmin; use database identifier($cl_bridge_node_db); use schema identifier($staging_schema); CREATE OR REPLACE PROCEDURE synch_device_messages() RETURNS VARIANT NOT NULL LANGUAGE JAVASCRIPT COMMENT = 'Synch latest device updates and stores in table' AS $$ // --- MAIN -------------------------------------- var failure_err_msg = []; var return_result_as_json = {}; var sucess_count = 0; var failure_count = 0; const qry = ` insert into sparkplug_device_messages select * from sparkplug_messages_flattened_vw ;` res = []; try { var rs = snowflake.execute({ sqlText: qry }); sucess_count = sucess_count + 1; } catch (err) { failure_count = failure_count + 1; failure_err_msg.push(` { sqlstatement : ‘${qry}’, error_code : ‘${err.code}’, error_state : ‘${err.state}’, error_message : ‘${err.message}’, stack_trace : ‘${err.stackTraceTxt}’ } `); } return_result_as_json['asset_creation'] = res; return_result_as_json['Success'] = sucess_count; return_result_as_json['Failures'] = failure_count; return_result_as_json['Failure_error'] = failure_err_msg; return return_result_as_json; $$; CREATE OR REPLACE FUNCTION GENERATE_TEMPLATE_ASSET_BASE_NAME (PARAM_TEMPLATE_NAME varchar ,PARAM_TEMPLATE_VERSION varchar) RETURNS VARCHAR LANGUAGE JAVASCRIPT COMMENT = 'Used for generating device template name.' AS $$ function normalize_name_str(p_str) { return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase(); } function get_device_view_base_name(p_machine ,p_version) { const v = (p_version != null) ? p_version : ""; return normalize_name_str(`${p_machine}${v}`); } return get_device_view_base_name(PARAM_TEMPLATE_NAME ,PARAM_TEMPLATE_VERSION); $$ ; CREATE OR REPLACE FUNCTION GENERATE_DEVICE_BASE_VIEW_DDL ( PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_TEMPLATE_REFERENCE VARCHAR ,PARAM_SOURCE_DB varchar ,PARAM_SOURCE_SCHEMA varchar ,PARAM_TARGET_SCHEMA VARCHAR ,PARAM_TEMPLATE_ASSET_BASE_NAME varchar ,PARAM_TEMPLATE_DEFN variant ,PARAM_FORCE_RECREATE boolean ) RETURNS VARCHAR LANGUAGE JAVASCRIPT COMMENT = 'Used for generating generic view ddl for device template.' AS $$ var stmt_condition = `create view if not exists`; if (PARAM_FORCE_RECREATE == true) stmt_condition = `create or replace view`; const sql_stmt = ` ${stmt_condition} ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${PARAM_TEMPLATE_ASSET_BASE_NAME} as select group_id ,edge_node_id ,device_id ,message_type ,message_sequence ,root_message_timestamp ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,device_name ,template_version ,device_metric_timestamp ,ddata_msg ,is_historical ,device_measure_uuid ,measures_info ,measure_timestamp ,measure_ts ,measure_date ,measure_hour from ${PARAM_SOURCE_SCHEMA}.sparkplug_device_messages where group_id = '${PARAM_GROUP_ID}' and edge_node_id = '${PARAM_EDGE_NODE_ID}' and template_reference = '${PARAM_TEMPLATE_REFERENCE}' ; `; return sql_stmt; $$ ; CREATE OR REPLACE FUNCTION GENERATE_DEVICE_VIEW_DDL ( PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_TEMPLATE_REFERENCE VARCHAR ,PARAM_SOURCE_DB varchar ,PARAM_TARGET_SCHEMA VARCHAR ,PARAM_TEMPLATE_ASSET_BASE_NAME varchar ,PARAM_TEMPLATE_DEFN variant ,PARAM_FORCE_RECREATE boolean ) RETURNS VARCHAR LANGUAGE JAVASCRIPT COMMENT = 'Used for generating generic view ddl for device template.' AS $$ function normalize_name_str(p_str) { return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase(); } function build_column_ddl_defn(p_template_defn) { var cols = []; const data_type_map = { "Int32":"::integer" ,"Int64":"::integer" ,"Float":"::double" ,"Template":"::variant" ,"Boolean":"::boolean" ,"String":"::varchar" }; const m_entries = p_template_defn['value']['metrics'] for (const [m_key, m_value] of Object.entries(m_entries)) { const measure_name = m_value['name']; const dtype = m_value['dataType']; const mname_cleansed = normalize_name_str(measure_name); // # default string cast, if the datatype is not mapped const dtype_converted = data_type_map[dtype] || "::varchar"; const col_defn = `measures_info:"${measure_name}"${dtype_converted} as ${mname_cleansed} `; cols.push(col_defn); } const cols_joined = cols.join(','); return cols_joined } const vw_name = `${PARAM_TEMPLATE_ASSET_BASE_NAME}_vw`; const cols_joined = build_column_ddl_defn(PARAM_TEMPLATE_DEFN) const sql_stmt = ` create or replace view ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${vw_name} as select * exclude(ddata_msg ,measures_info ,template_version) ,${cols_joined} from ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${PARAM_TEMPLATE_ASSET_BASE_NAME} ; `; return sql_stmt; $$; CREATE OR REPLACE FUNCTION GENERATE_DEVICE_ASOF_VIEW_DDL ( PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_TEMPLATE_REFERENCE VARCHAR ,PARAM_SOURCE_DB varchar ,PARAM_TARGET_SCHEMA VARCHAR ,PARAM_TEMPLATE_ASSET_BASE_NAME varchar ,PARAM_TEMPLATE_DEFN variant ,PARAM_FORCE_RECREATE boolean ) RETURNS VARCHAR LANGUAGE JAVASCRIPT COMMENT = 'Used for generating device asof view ddl.' AS $$ function normalize_name_str(p_str) { return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase(); } function build_column_ddl_defn(p_template_defn) { var cols = [] const data_type_map = { "Int32":"::integer" ,"Int64":"::integer" ,"Float":"::double" ,"Template":"::variant" ,"Boolean":"::boolean" ,"String":"::varchar" } const m_entries = p_template_defn['value']['metrics'] for (const [m_key, m_value] of Object.entries(m_entries)) { const measure_name = m_value['name']; const dtype = m_value['dataType']; const mname_cleansed = normalize_name_str(measure_name); // # default string cast, if the datatype is not mapped const dtype_converted = data_type_map[dtype] || "::varchar"; const col_defn = `nvl(${mname_cleansed} ,lag(${mname_cleansed}) ignore nulls over (order by message_type_order ,measure_timestamp ,message_sequence) ) AS ${mname_cleansed} `; cols.push(col_defn); } const cols_joined = cols.join(','); return cols_joined } const vw_name = `${PARAM_TEMPLATE_ASSET_BASE_NAME}_vw`; const recordasof_vw_name = `${PARAM_TEMPLATE_ASSET_BASE_NAME}_asof_vw`; const cols_joined = build_column_ddl_defn(PARAM_TEMPLATE_DEFN) const sql_stmt = ` create or replace secure view ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${recordasof_vw_name} as select group_id ,edge_node_id ,device_id ,device_name ,message_sequence ,root_message_timestamp ,inserted_at ,message_type ,message_type_order ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,is_node_alive ,is_historical ,device_measure_uuid ,measure_timestamp ,measure_ts ,measure_date ,measure_hour ,${cols_joined} from ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${vw_name} order by measure_timestamp ,message_type_order ,message_sequence ; `; return sql_stmt; $$; |
Script 04
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
set cl_bridge_node_db = 'cl_bridge_node_db'; set staging_schema = 'stage_db'; use role sysadmin; use database identifier($cl_bridge_node_db); use schema identifier($staging_schema); CREATE OR REPLACE FUNCTION NORMALIZE_ASSET_NAME(P_STR VARCHAR) RETURNS VARCHAR LANGUAGE JAVASCRIPT COMMENT = 'Used for creating asset names without spaces/special characters.' AS $$ return P_STR.replace(/[\W_]+/g,"_").trim().toLowerCase(); $$ ; CREATE OR REPLACE FUNCTION EDGENODE_SCHEMA_NAME(PARAM_SCHEMA_PREFIX VARCHAR ,PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR) RETURNS VARCHAR LANGUAGE JAVASCRIPT COMMENT = 'Used for creating asset names for edgenode schema.' AS $$ function normalize_name_str(p_str) { return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase(); } const schema_name = `${PARAM_SCHEMA_PREFIX}_${PARAM_GROUP_ID}_${PARAM_EDGE_NODE_ID}`; return normalize_name_str(schema_name); $$ ; CREATE OR REPLACE FUNCTION GENERATE_EDGENODE_SCHEMA_DDL (PARAM_SCHEMA_PREFIX VARCHAR ,PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_FORCE_RECREATE boolean) RETURNS VARCHAR LANGUAGE JAVASCRIPT COMMENT = 'Used for generating edgenode schema ddl.' AS $$ function normalize_name_str(p_str) { return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase(); } // Returns the normalized schema name for the edgenode function get_edgenode_schema_name(p_schema_prefix ,p_group_id ,p_edge_node_id) { const schema_name = `${p_schema_prefix}_${p_group_id}_${p_edge_node_id}`; return normalize_name_str(schema_name); } const schema_name = get_edgenode_schema_name(PARAM_SCHEMA_PREFIX ,PARAM_GROUP_ID ,PARAM_EDGE_NODE_ID); var sql_stmt = `create schema if not exists ${schema_name}; ` if(PARAM_FORCE_RECREATE == true) { sql_stmt = `create or replace schema ${schema_name}; ` } return sql_stmt; $$ ; CREATE OR REPLACE PROCEDURE create_edge_node_schema(PARAM_SCHEMA_PREFIX VARCHAR ,PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_FORCE_RECREATE boolean) RETURNS VARIANT NOT NULL LANGUAGE JAVASCRIPT COMMENT = 'Creates edge node specific schema, supposed to be invoked as part of NBIRTH messsage' AS $$ function get_sql_stmt(p_schema_prefix ,p_group_id ,p_edge_node_id ,p_force_recreate) { const sql_stmt = `select '${p_schema_prefix}' as schema_prefix ,iff(${p_force_recreate} = 1 ,true ,false) as force_recreate ,edgenode_schema_name(schema_prefix ,group_id ,edge_node_id) as edgenode_schema_name ,generate_edgenode_schema_ddl(schema_prefix, group_id ,edge_node_id ,force_recreate) as edgenode_schema_ddl ,machine ,generate_template_asset_base_name(machine ,version) as machine_table_base_name ,template_definition ,generate_device_base_view_ddl (group_id ,edge_node_id ,machine ,current_database() ,current_schema() ,edgenode_schema_name ,machine_table_base_name ,template_definition ,force_recreate) as device_base_view_ddl ,generate_device_view_ddl (group_id ,edge_node_id ,machine ,current_database() ,edgenode_schema_name ,machine_table_base_name ,template_definition ,force_recreate) as device_view_ddl ,generate_device_asof_view_ddl (group_id ,edge_node_id ,machine ,current_database() ,edgenode_schema_name ,machine_table_base_name ,template_definition ,force_recreate) as device_asof_view_ddl from node_machine_registry_vw where group_id = '${p_group_id}' and edge_node_id = '${p_edge_node_id}' ;` return sql_stmt; } // --- MAIN -------------------------------------- var failure_err_msg = []; var return_result_as_json = {}; var sucess_count = 0; var failure_count = 0; const qry = get_sql_stmt(PARAM_SCHEMA_PREFIX ,PARAM_GROUP_ID ,PARAM_EDGE_NODE_ID ,PARAM_FORCE_RECREATE); var sql_stmt = qry; var schema_created = false; res = []; try { var rs = snowflake.execute({ sqlText: qry }); while (rs.next()) { machine = rs.getColumnValue('MACHINE'); edgenode_schema_name = rs.getColumnValue('EDGENODE_SCHEMA_NAME'); edgenode_schema_ddl = rs.getColumnValue('EDGENODE_SCHEMA_DDL'); device_base_view_ddl = rs.getColumnValue('DEVICE_BASE_VIEW_DDL'); device_view_ddl = rs.getColumnValue('DEVICE_VIEW_DDL'); device_asof_view_ddl = rs.getColumnValue('DEVICE_ASOF_VIEW_DDL'); if(schema_created == false) { sql_stmt = edgenode_schema_ddl; snowflake.execute({ sqlText: edgenode_schema_ddl }); //blind setting is not good, //TODO check if the schema is indeed created schema_created = true; } sql_stmt = device_base_view_ddl; snowflake.execute({ sqlText: device_base_view_ddl }); sql_stmt = device_view_ddl; snowflake.execute({ sqlText: device_view_ddl }); sql_stmt = device_asof_view_ddl; snowflake.execute({ sqlText: device_asof_view_ddl }); sucess_count = sucess_count + 1; res.push(edgenode_schema_name + '.' + machine); } } catch (err) { failure_count = failure_count + 1; failure_err_msg.push(` { sqlstatement : ‘${sql_stmt}’, error_code : ‘${err.code}’, error_state : ‘${err.state}’, error_message : ‘${err.message}’, stack_trace : ‘${err.stackTraceTxt}’ } `); } return_result_as_json['asset_creation'] = res; return_result_as_json['Success'] = sucess_count; return_result_as_json['Failures'] = failure_count; return_result_as_json['Failure_error'] = failure_err_msg; return return_result_as_json; $$; |
Script 05
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
set cl_bridge_node_db = 'cl_bridge_node_db'; set staging_schema = 'stage_db'; use role sysadmin; use database identifier($cl_bridge_node_db); use schema identifier($staging_schema); CREATE OR REPLACE PROCEDURE create_all_edge_node_schemas(PARAM_SCHEMA_PREFIX VARCHAR ,PARAM_FORCE_RECREATE boolean) RETURNS VARIANT NOT NULL LANGUAGE JAVASCRIPT COMMENT = 'Creates edge node specific schemas, supposed to be invoked as part of NBIRTH messsage' AS $$ // --- MAIN -------------------------------------- var failure_err_msg = []; var return_result_as_json = {}; var sucess_count = 0; var failure_count = 0; const qry = ` select distinct group_id ,edge_node_id ,current_schema() as current_schema from node_machine_registry_vw where edge_node_id is not null ;` //node_machine_registry_vw //nbirth_stream var current_schema = 'public'; res = []; try { var rs = snowflake.execute({ sqlText: qry }); while (rs.next()) { group_id = rs.getColumnValue('GROUP_ID'); edge_node_id = rs.getColumnValue('EDGE_NODE_ID'); current_schema = rs.getColumnValue('CURRENT_SCHEMA'); var schema_out = {} schema_out['execution'] = snowflake.execute({ sqlText: `call create_edge_node_schema('${PARAM_SCHEMA_PREFIX}' ,'${group_id}' ,'${edge_node_id}' ,${PARAM_FORCE_RECREATE});` }); res.push(schema_out); } sucess_count = sucess_count + 1; } catch (err) { failure_count = failure_count + 1; failure_err_msg.push(` { sqlstatement : ‘${qry}’, error_code : ‘${err.code}’, error_state : ‘${err.state}’, error_message : ‘${err.message}’, stack_trace : ‘${err.stackTraceTxt}’ } `); } return_result_as_json['asset_creation'] = res; return_result_as_json['Success'] = sucess_count; return_result_as_json['Failures'] = failure_count; return_result_as_json['Failure_error'] = failure_err_msg; return return_result_as_json; $$; |
Script 10
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
-- ========================= -- In this script, we are setting up roles specifically requiring help of -- privlilleged roles like SYSADMIN. These are: -- - Create a custom role -- - Assign the custom role to create task and execute task -- - Create warehouse specifically used for ingestion -- - Grants -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT -- ========================= set processor_role = 'cl_bridge_process_rl'; set cl_bridge_ingestion_warehouse = 'cl_bridge_ingest_wh'; set staging_db = 'cl_bridge_stage_db'; set staging_db_schema = 'cl_bridge_stage_db.stage_db'; set node_db = 'cl_bridge_node_db'; set node_db_staging_schema = 'cl_bridge_node_db.stage_db'; -- >>>>>>>>>>>>>>>>>>>>>>>>>>> ROLE CREATION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use role securityadmin; create role if not exists identifier($processor_role) comment = 'role used by cirruslink bridge to ingest and process mqtt/sparkplug data'; grant role identifier($processor_role) to role sysadmin; -- >>>>>>>>>>>>>>>>>>>>>>>>>>> WAREHOUSE >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> use role sysadmin; create warehouse if not exists identifier($cl_bridge_ingestion_warehouse) with warehouse_type = standard warehouse_size = xsmall --max_cluster_count = 5 initially_suspended = true comment = 'used by cirruslink bridge to ingest' ; grant usage on warehouse identifier($cl_bridge_ingestion_warehouse) to role identifier($processor_role); grant operate on warehouse identifier($cl_bridge_ingestion_warehouse) to role identifier($processor_role); -- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS STAGE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> grant usage on database identifier($staging_db) to role identifier($processor_role); grant usage on schema identifier($staging_db_schema) to role identifier($processor_role); grant select on all tables in schema identifier($staging_db_schema) to role identifier($processor_role); grant insert on all tables in schema identifier($staging_db_schema) to role identifier($processor_role); grant select on future tables in schema identifier($staging_db_schema) to role identifier($processor_role); grant insert on future tables in schema identifier($staging_db_schema) to role identifier($processor_role); -- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS NODE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> grant usage on database identifier($node_db) to role identifier($processor_role); grant usage on schema identifier($node_db_staging_schema) to role identifier($processor_role); grant select on table sparkplug_device_messages to role identifier($processor_role); grant insert on table sparkplug_device_messages to role identifier($processor_role); grant select on all views in schema identifier($node_db_staging_schema) to role identifier($processor_role); grant usage on all functions in schema identifier($node_db_staging_schema) to role identifier($processor_role); grant usage on all procedures in schema identifier($node_db_staging_schema) to role identifier($processor_role); -- need for: -- - creating edge node specific tables and views grant create schema on database identifier($node_db) to role identifier($processor_role); |
Script 11
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
...