Contents
Cirrus Link Resources
Cirrus Link Website
Contact Us (Sales/Support)
Forum
Cirrus Link Modules Docs for Ignition 7.9.x
Inductive Resources
Ignition User Manual
Knowledge Base Articles
Inductive University
Forum
...
Script 02
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
-- ========================= -- In this script, we are setting up assets related to the node database -- ,which would eventually contain all the device specific views and tables. -- At the very core, the following assets are created: -- - Node Database -- - Staging schema -- The database & schema will be owned by SYSADMIN -- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT -- ========================= set staged_sparkplug_raw_table = 'cl_bridge_stage_db.stage_db.sparkplug_raw'; set cl_bridge_node_db = 'cl_bridge_node_db'; set staging_schema = 'stage_db'; -- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>> use role sysadmin; create database if not exists identifier($cl_bridge_node_db) -- DATA_RETENTION_TIME_IN_DAYS = 90 -- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90 comment = 'used for storing flattened messages processed from the staging database' ; -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>> use database identifier($cl_bridge_node_db); create schema if not exists identifier($staging_schema) with managed access -- data_retention_time_in_days = 90 -- max_data_extension_time_in_days = 90 comment = 'used for storing flattened messages processed from the staging database'; -- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>> use schema identifier($staging_schema); -- ========================= -- Define tables -- ========================= -- NOTE THE 'cl_bridge_stage_db.stage_db.sparkplug_raw' is hardcoded here; as the identifier -- staged_sparkplug_raw_table replacement does not work. create or replace view sparkplug_messages_vw change_tracking = true comment = 'parses out the core attributes from the message and topic.' as select msg_id ,namespace ,group_id ,message_type ,edge_node_id ,device_id ,parse_json(msg) as message ,message:seq::int as message_sequence ,message:timestamp::number as message_timestamp ,inserted_at from cl_bridge_stage_db.stage_db.sparkplug_raw ; -- -- >>>>>>>>>>>>>>>>>>>>>> create or replace view nbirth_vw change_tracking = true comment = 'filtered to nbirth messages. This is a mirror' as select group_id ,edge_node_id from sparkplug_messages_vw where message_type = 'NBIRTH' ; create or replace view node_machine_registry_vw comment = 'Used to retreive the latest template definitions for a given group and edge_node' as with base as ( select group_id ,edge_node_id ,max_by(message ,message_timestamp) as message ,max(message_timestamp) as latest_message_timestamp from sparkplug_messages_vw where message_type = 'NBIRTH' group by group_id ,edge_node_id ) select group_id ,edge_node_id ,f.value as template_definition ,template_definition:name::varchar as machine ,template_definition:reference::varchar as reference ,template_definition:version::varchar as version ,template_definition:timestamp::int as timestamp from base as b ,lateral flatten (input => b.message:metrics) f where template_definition:dataType::varchar = 'Template' ; -- -- >>>>>>>>>>>>>>>>>>>>>> create or replace view node_birth_death_vw comment = 'shows the latest node birth & death messages for each device' as select b.* exclude(namespace) ,message_type as nbirth_or_ndeath_raw ,iff((message_type = 'NBIRTH') ,f.value:value ,null)::number as nbirth_bdSeq_raw ,iff((message_type = 'NDEATH') ,f.value:value ,null)::number as ndeath_bdSeq_raw ,inserted_at as nbirth_ndeath_inserted_at_raw from sparkplug_messages_vw as b ,lateral flatten (input => b.message:metrics) as f where message_type in ('NBIRTH' ,'NDEATH') and f.value:name::varchar = 'bdSeq' ; create or replace view device_records_vw change_tracking = true as select b.* exclude(namespace) ,null as nbirth_or_ndeath_raw ,null as nbirth_bdSeq_raw ,null as ndeath_bdSeq_raw ,null as nbirth_ndeath_inserted_at_raw from sparkplug_messages_vw as b where message_type in ('DBIRTH' ,'DDATA') ; create or replace stream device_records_stream on view device_records_vw show_initial_rows = true comment = 'used for monitoring latest device messages' ; create or replace view sparkplug_msgs_nodebirth_contextualized_vw as with device_node_unioned as ( select * from node_birth_death_vw union all select * exclude(METADATA$ROW_ID ,METADATA$ACTION ,METADATA$ISUPDATE) from device_records_stream ) select -- group_id ,message_type ,edge_node_id ,device_id -- ,message ,message_sequence ,inserted_at * exclude(nbirth_or_ndeath_raw ,nbirth_bdSeq_raw ,ndeath_bdSeq_raw ,nbirth_ndeath_inserted_at_raw) ,nvl(nbirth_or_ndeath_raw ,lag(nbirth_or_ndeath_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as nbirth_or_ndeath ,nvl(nbirth_bdSeq_raw ,lag(nbirth_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as nbirth_bdSeq ,nvl(ndeath_bdSeq_raw ,lag(ndeath_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as ndeath_bdSeq ,nvl(nbirth_ndeath_inserted_at_raw ,lag(nbirth_ndeath_inserted_at_raw) ignore nulls over (order by inserted_at ,message_sequence) ) as nbirth_ndeath_inserted_at ,case true when (nbirth_or_ndeath = 'NBIRTH') then false when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq != ndeath_bdSeq) ) then false when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq = ndeath_bdSeq) ) then true else true end as is_last_known_good_reading ,case lower(message_type) when lower('NBIRTH') then 1 when lower('DBIRTH') then 2 when lower('DDATA') then 3 when lower('DDEATH') then 4 when lower('NDEATH') then 5 else 99 end as message_type_order ,(nbirth_or_ndeath = 'NBIRTH') as is_node_alive from device_node_unioned ; create or replace view sparkplug_messages_flattened_vw as with base as ( select -- sparkplugb message level msg_id ,group_id, edge_node_id ,device_id ,message_type ,message_sequence ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,message_timestamp as root_message_timestamp -- attributes related to device data (ddata / dbirth) ,f.value:name::varchar as device_name ,f.value:value:reference::varchar as template_reference ,f.value:value:version::varchar as template_version ,f.value:timestamp::number as device_metric_timestamp ,f.value as ddata_msg -- attributes related to device level metrics ,concat(msg_id ,'^' ,f.index ,'::',d.index) as device_measure_uuid ,d.value:name::varchar as measure_name ,d.value:value as measure_value ,d.value:timestamp::number as measure_timestamp from sparkplug_msgs_nodebirth_contextualized_vw as b ,lateral flatten(input => b.message:metrics) as f ,lateral flatten(input => f.value:value:metrics) as d where message_type in ('DBIRTH' ,'DDATA') and template_reference is not null ) select group_id, edge_node_id ,device_id ,message_type ,message_sequence ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,root_message_timestamp ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg ,null as is_historical ,device_measure_uuid ,object_agg(distinct measure_name ,measure_value) as measures_info ,measure_timestamp ,to_timestamp(measure_timestamp/1000) as measure_ts ,to_date(measure_ts) as measure_date ,hour(measure_ts) as measure_hour from base group by group_id, edge_node_id ,device_id ,message_type ,message_sequence ,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq ,nbirth_ndeath_inserted_at ,is_last_known_good_reading ,message_type_order ,is_node_alive ,root_message_timestamp ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg ,is_historical ,device_measure_uuid ,measure_timestamp ; create or replace transient table sparkplug_device_messages ( group_id varchar ,edge_node_id varchar ,device_id varchar ,message_type varchar ,message_sequence number ,inserted_at number ,nbirth_or_ndeath varchar ,nbirth_bdseq number ,ndeath_bdseq number ,nbirth_ndeath_inserted_at number ,is_last_known_good_reading boolean ,message_type_order number ,is_node_alive boolean ,root_message_timestamp number ,device_name varchar ,template_reference varchar ,template_version varchar ,device_metric_timestamp number ,ddata_msg variant ,is_historical boolean ,device_measure_uuid varchar ,measures_info variant ,measure_timestamp number ,measure_ts timestamp ,measure_date date ,measure_hour number ) cluster by ( group_id ,edge_node_id ,device_id ,template_reference ,template_version ,device_name ,measure_date ,measure_hour) comment = 'materialized version of the sparkplug_messages_flattened_vw for easier downstream pipelines.' ; -- -- >>>>>>>>>>>>>>>>>>>>>> -- ================ -- NODE BIRTH related assets -- ================ create or replace stream nbirth_stream on view nbirth_vw show_initial_rows = true comment = 'stream to monitor for nbirth messages, so that assets are created automatically' ; |
Script 03
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
set cl_bridge_node_db = 'cl_bridge_node_db';
set staging_schema = 'stage_db';
use role sysadmin;
use database identifier($cl_bridge_node_db);
use schema identifier($staging_schema);
CREATE OR REPLACE PROCEDURE synch_device_messages()
RETURNS VARIANT NOT NULL
LANGUAGE JAVASCRIPT
COMMENT = 'Synch latest device updates and stores in table'
AS
$$
// --- MAIN --------------------------------------
var failure_err_msg = [];
var return_result_as_json = {};
var sucess_count = 0;
var failure_count = 0;
const qry = `
insert into sparkplug_device_messages
select *
from sparkplug_messages_flattened_vw
;`
res = [];
try {
var rs = snowflake.execute({ sqlText: qry });
sucess_count = sucess_count + 1;
} catch (err) {
failure_count = failure_count + 1;
failure_err_msg.push(` {
sqlstatement : ‘${qry}’,
error_code : ‘${err.code}’,
error_state : ‘${err.state}’,
error_message : ‘${err.message}’,
stack_trace : ‘${err.stackTraceTxt}’
} `);
}
return_result_as_json['asset_creation'] = res;
return_result_as_json['Success'] = sucess_count;
return_result_as_json['Failures'] = failure_count;
return_result_as_json['Failure_error'] = failure_err_msg;
return return_result_as_json;
$$;
CREATE OR REPLACE FUNCTION GENERATE_TEMPLATE_ASSET_BASE_NAME
(PARAM_TEMPLATE_NAME varchar ,PARAM_TEMPLATE_VERSION varchar)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
COMMENT = 'Used for generating device template name.'
AS $$
function normalize_name_str(p_str) {
return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
}
function get_device_view_base_name(p_machine ,p_version) {
const v = (p_version != null) ? p_version : "";
return normalize_name_str(`${p_machine}${v}`);
}
return get_device_view_base_name(PARAM_TEMPLATE_NAME ,PARAM_TEMPLATE_VERSION);
$$
;
CREATE OR REPLACE FUNCTION GENERATE_DEVICE_BASE_VIEW_DDL
( PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_TEMPLATE_REFERENCE VARCHAR
,PARAM_SOURCE_DB varchar
,PARAM_SOURCE_SCHEMA varchar
,PARAM_TARGET_SCHEMA VARCHAR
,PARAM_TEMPLATE_ASSET_BASE_NAME varchar
,PARAM_TEMPLATE_DEFN variant
,PARAM_FORCE_RECREATE boolean
)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
COMMENT = 'Used for generating generic view ddl for device template.'
AS $$
var stmt_condition = `create view if not exists`;
if (PARAM_FORCE_RECREATE == true)
stmt_condition = `create or replace view`;
const sql_stmt = `
${stmt_condition} ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${PARAM_TEMPLATE_ASSET_BASE_NAME}
as
select
group_id ,edge_node_id ,device_id ,message_type ,message_sequence ,root_message_timestamp
,inserted_at ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
,nbirth_ndeath_inserted_at ,is_last_known_good_reading
,message_type_order ,is_node_alive
,device_name ,template_version ,device_metric_timestamp ,ddata_msg ,is_historical
,device_measure_uuid ,measures_info ,measure_timestamp
,measure_ts ,measure_date ,measure_hour
from ${PARAM_SOURCE_SCHEMA}.sparkplug_device_messages
where group_id = '${PARAM_GROUP_ID}'
and edge_node_id = '${PARAM_EDGE_NODE_ID}'
and template_reference = '${PARAM_TEMPLATE_REFERENCE}'
;
`;
return sql_stmt;
$$
;
CREATE OR REPLACE FUNCTION GENERATE_DEVICE_VIEW_DDL
( PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_TEMPLATE_REFERENCE VARCHAR
,PARAM_SOURCE_DB varchar
,PARAM_TARGET_SCHEMA VARCHAR
,PARAM_TEMPLATE_ASSET_BASE_NAME varchar
,PARAM_TEMPLATE_DEFN variant
,PARAM_FORCE_RECREATE boolean
)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
COMMENT = 'Used for generating generic view ddl for device template.'
AS $$
function normalize_name_str(p_str) {
return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
}
function build_column_ddl_defn(p_template_defn) {
var cols = [];
const data_type_map = {
"Int32":"::integer"
,"Int64":"::integer"
,"Float":"::double"
,"Template":"::variant"
,"Boolean":"::boolean"
,"String":"::varchar"
};
const m_entries = p_template_defn['value']['metrics']
for (const [m_key, m_value] of Object.entries(m_entries)) {
const measure_name = m_value['name'];
const dtype = m_value['dataType'];
const mname_cleansed = normalize_name_str(measure_name);
// # default string cast, if the datatype is not mapped
const dtype_converted = data_type_map[dtype] || "::varchar";
const col_defn = `measures_info:"${measure_name}"${dtype_converted} as ${mname_cleansed} `;
cols.push(col_defn);
}
const cols_joined = cols.join(',');
return cols_joined
}
const vw_name = `${PARAM_TEMPLATE_ASSET_BASE_NAME}_vw`;
const cols_joined = build_column_ddl_defn(PARAM_TEMPLATE_DEFN)
const sql_stmt = `
create or replace view ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${vw_name}
as
select
* exclude(ddata_msg ,measures_info ,template_version)
,${cols_joined}
from ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${PARAM_TEMPLATE_ASSET_BASE_NAME}
;
`;
return sql_stmt;
$$;
CREATE OR REPLACE FUNCTION GENERATE_DEVICE_ASOF_VIEW_DDL
( PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_TEMPLATE_REFERENCE VARCHAR
,PARAM_SOURCE_DB varchar
,PARAM_TARGET_SCHEMA VARCHAR
,PARAM_TEMPLATE_ASSET_BASE_NAME varchar
,PARAM_TEMPLATE_DEFN variant
,PARAM_FORCE_RECREATE boolean
)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
COMMENT = 'Used for generating device asof view ddl.'
AS $$
function normalize_name_str(p_str) {
return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
}
function build_column_ddl_defn(p_template_defn) {
var cols = []
const data_type_map = {
"Int32":"::integer"
,"Int64":"::integer"
,"Float":"::double"
,"Template":"::variant"
,"Boolean":"::boolean"
,"String":"::varchar"
}
const m_entries = p_template_defn['value']['metrics']
for (const [m_key, m_value] of Object.entries(m_entries)) {
const measure_name = m_value['name'];
const dtype = m_value['dataType'];
const mname_cleansed = normalize_name_str(measure_name);
// # default string cast, if the datatype is not mapped
const dtype_converted = data_type_map[dtype] || "::varchar";
const col_defn = `nvl(${mname_cleansed}
,lag(${mname_cleansed}) ignore nulls over (order by message_type_order ,measure_timestamp ,message_sequence)
) AS ${mname_cleansed}
`;
cols.push(col_defn);
}
const cols_joined = cols.join(',');
return cols_joined
}
const vw_name = `${PARAM_TEMPLATE_ASSET_BASE_NAME}_vw`;
const recordasof_vw_name = `${PARAM_TEMPLATE_ASSET_BASE_NAME}_asof_vw`;
const cols_joined = build_column_ddl_defn(PARAM_TEMPLATE_DEFN)
const sql_stmt = `
create or replace secure view ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${recordasof_vw_name}
as
select
group_id ,edge_node_id ,device_id ,device_name ,message_sequence
,root_message_timestamp ,inserted_at
,message_type ,message_type_order
,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
,nbirth_ndeath_inserted_at ,is_last_known_good_reading
,is_node_alive
,is_historical
,device_measure_uuid
,measure_timestamp
,measure_ts ,measure_date ,measure_hour
,${cols_joined}
from ${PARAM_SOURCE_DB}.${PARAM_TARGET_SCHEMA}.${vw_name}
order by measure_timestamp ,message_type_order ,message_sequence
;
`;
return sql_stmt;
$$; |
Script 04
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
set cl_bridge_node_db = 'cl_bridge_node_db';
set staging_schema = 'stage_db';
use role sysadmin;
use database identifier($cl_bridge_node_db);
use schema identifier($staging_schema);
CREATE OR REPLACE FUNCTION NORMALIZE_ASSET_NAME(P_STR VARCHAR)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
COMMENT = 'Used for creating asset names without spaces/special characters.'
AS $$
return P_STR.replace(/[\W_]+/g,"_").trim().toLowerCase();
$$
;
CREATE OR REPLACE FUNCTION EDGENODE_SCHEMA_NAME(PARAM_SCHEMA_PREFIX VARCHAR ,PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
COMMENT = 'Used for creating asset names for edgenode schema.'
AS $$
function normalize_name_str(p_str) {
return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
}
const schema_name = `${PARAM_SCHEMA_PREFIX}_${PARAM_GROUP_ID}_${PARAM_EDGE_NODE_ID}`;
return normalize_name_str(schema_name);
$$
;
CREATE OR REPLACE FUNCTION GENERATE_EDGENODE_SCHEMA_DDL
(PARAM_SCHEMA_PREFIX VARCHAR ,PARAM_GROUP_ID VARCHAR
,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_FORCE_RECREATE boolean)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT
COMMENT = 'Used for generating edgenode schema ddl.'
AS $$
function normalize_name_str(p_str) {
return p_str.replace(/[\W_]+/g,"_").trim().toLowerCase();
}
// Returns the normalized schema name for the edgenode
function get_edgenode_schema_name(p_schema_prefix ,p_group_id ,p_edge_node_id) {
const schema_name = `${p_schema_prefix}_${p_group_id}_${p_edge_node_id}`;
return normalize_name_str(schema_name);
}
const schema_name = get_edgenode_schema_name(PARAM_SCHEMA_PREFIX ,PARAM_GROUP_ID ,PARAM_EDGE_NODE_ID);
var sql_stmt = `create schema if not exists ${schema_name}; `
if(PARAM_FORCE_RECREATE == true) {
sql_stmt = `create or replace schema ${schema_name}; `
}
return sql_stmt;
$$
;
CREATE OR REPLACE PROCEDURE create_edge_node_schema(PARAM_SCHEMA_PREFIX VARCHAR ,PARAM_GROUP_ID VARCHAR ,PARAM_EDGE_NODE_ID VARCHAR ,PARAM_FORCE_RECREATE boolean)
RETURNS VARIANT NOT NULL
LANGUAGE JAVASCRIPT
COMMENT = 'Creates edge node specific schema, supposed to be invoked as part of NBIRTH messsage'
AS
$$
function get_sql_stmt(p_schema_prefix ,p_group_id ,p_edge_node_id ,p_force_recreate) {
const sql_stmt = `select
'${p_schema_prefix}' as schema_prefix
,iff(${p_force_recreate} = 1 ,true ,false) as force_recreate
,edgenode_schema_name(schema_prefix ,group_id ,edge_node_id) as edgenode_schema_name
,generate_edgenode_schema_ddl(schema_prefix, group_id ,edge_node_id ,force_recreate) as edgenode_schema_ddl
,machine
,generate_template_asset_base_name(machine ,version) as machine_table_base_name
,template_definition
,generate_device_base_view_ddl
(group_id ,edge_node_id ,machine
,current_database() ,current_schema() ,edgenode_schema_name
,machine_table_base_name ,template_definition
,force_recreate) as device_base_view_ddl
,generate_device_view_ddl
(group_id ,edge_node_id ,machine
,current_database() ,edgenode_schema_name
,machine_table_base_name ,template_definition
,force_recreate) as device_view_ddl
,generate_device_asof_view_ddl
(group_id ,edge_node_id ,machine
,current_database() ,edgenode_schema_name
,machine_table_base_name ,template_definition
,force_recreate) as device_asof_view_ddl
from node_machine_registry_vw
where group_id = '${p_group_id}'
and edge_node_id = '${p_edge_node_id}'
;`
return sql_stmt;
}
// --- MAIN --------------------------------------
var failure_err_msg = [];
var return_result_as_json = {};
var sucess_count = 0;
var failure_count = 0;
const qry = get_sql_stmt(PARAM_SCHEMA_PREFIX ,PARAM_GROUP_ID ,PARAM_EDGE_NODE_ID ,PARAM_FORCE_RECREATE);
var sql_stmt = qry;
var schema_created = false;
res = [];
try {
var rs = snowflake.execute({ sqlText: qry });
while (rs.next()) {
machine = rs.getColumnValue('MACHINE');
edgenode_schema_name = rs.getColumnValue('EDGENODE_SCHEMA_NAME');
edgenode_schema_ddl = rs.getColumnValue('EDGENODE_SCHEMA_DDL');
device_base_view_ddl = rs.getColumnValue('DEVICE_BASE_VIEW_DDL');
device_view_ddl = rs.getColumnValue('DEVICE_VIEW_DDL');
device_asof_view_ddl = rs.getColumnValue('DEVICE_ASOF_VIEW_DDL');
if(schema_created == false) {
sql_stmt = edgenode_schema_ddl;
snowflake.execute({ sqlText: edgenode_schema_ddl });
//blind setting is not good,
//TODO check if the schema is indeed created
schema_created = true;
}
sql_stmt = device_base_view_ddl;
snowflake.execute({ sqlText: device_base_view_ddl });
sql_stmt = device_view_ddl;
snowflake.execute({ sqlText: device_view_ddl });
sql_stmt = device_asof_view_ddl;
snowflake.execute({ sqlText: device_asof_view_ddl });
sucess_count = sucess_count + 1;
res.push(edgenode_schema_name + '.' + machine);
}
} catch (err) {
failure_count = failure_count + 1;
failure_err_msg.push(` {
sqlstatement : ‘${sql_stmt}’,
error_code : ‘${err.code}’,
error_state : ‘${err.state}’,
error_message : ‘${err.message}’,
stack_trace : ‘${err.stackTraceTxt}’
} `);
}
return_result_as_json['asset_creation'] = res;
return_result_as_json['Success'] = sucess_count;
return_result_as_json['Failures'] = failure_count;
return_result_as_json['Failure_error'] = failure_err_msg;
return return_result_as_json;
$$; |
Script 05
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
set cl_bridge_node_db = 'cl_bridge_node_db';
set staging_schema = 'stage_db';
use role sysadmin;
use database identifier($cl_bridge_node_db);
use schema identifier($staging_schema);
CREATE OR REPLACE PROCEDURE create_all_edge_node_schemas(PARAM_SCHEMA_PREFIX VARCHAR ,PARAM_FORCE_RECREATE boolean)
RETURNS VARIANT NOT NULL
LANGUAGE JAVASCRIPT
COMMENT = 'Creates edge node specific schemas, supposed to be invoked as part of NBIRTH messsage'
AS
$$
// --- MAIN --------------------------------------
var failure_err_msg = [];
var return_result_as_json = {};
var sucess_count = 0;
var failure_count = 0;
const qry = `
select distinct group_id ,edge_node_id
,current_schema() as current_schema
from node_machine_registry_vw
where edge_node_id is not null
;`
//node_machine_registry_vw
//nbirth_stream
var current_schema = 'public';
res = [];
try {
var rs = snowflake.execute({ sqlText: qry });
while (rs.next()) {
group_id = rs.getColumnValue('GROUP_ID');
edge_node_id = rs.getColumnValue('EDGE_NODE_ID');
current_schema = rs.getColumnValue('CURRENT_SCHEMA');
var schema_out = {}
schema_out['execution'] = snowflake.execute({
sqlText: `call create_edge_node_schema('${PARAM_SCHEMA_PREFIX}' ,'${group_id}' ,'${edge_node_id}' ,${PARAM_FORCE_RECREATE});`
});
res.push(schema_out);
}
sucess_count = sucess_count + 1;
} catch (err) {
failure_count = failure_count + 1;
failure_err_msg.push(` {
sqlstatement : ‘${qry}’,
error_code : ‘${err.code}’,
error_state : ‘${err.state}’,
error_message : ‘${err.message}’,
stack_trace : ‘${err.stackTraceTxt}’
} `);
}
return_result_as_json['asset_creation'] = res;
return_result_as_json['Success'] = sucess_count;
return_result_as_json['Failures'] = failure_count;
return_result_as_json['Failure_error'] = failure_err_msg;
return return_result_as_json;
$$; |
Script 04
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Script 05
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Script 10
Code Block | ||||||
---|---|---|---|---|---|---|
|
Script 10
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Script 11
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
...