Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Set up assets related to the staging database and associated assets.These  These are:

  • Database

...

  • Staging schema
Code Block
languagesql
titleSQL Script 01
collapsetrue
-- =========================
-- In this script, we are setting up assets related to the staging database
-- and associated assets. These are:
--  - Database
--  - Staging schema
 
-- The database & schema will be owned by SYSADMIN
 
-- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
-- =========================
 
set cl_bridge_staging_db = 'CL_BRIDGE_STAGE_DB';
set staging_schema = 'stage_db';
 
-- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>>
 
use role sysadmin;
 
create database if not exists identifier($cl_bridge_staging_db)
   -- DATA_RETENTION_TIME_IN_DAYS = 90
   -- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90
   comment = 'used for storing messages received from CirrusLink Bridge'
;
 
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>>
 
use database identifier($cl_bridge_staging_db);
 
create schema if not exists identifier($staging_schema)
  with managed access
  -- data_retention_time_in_days = 90
  -- max_data_extension_time_in_days = 90
  comment = 'Used for staging data direct from CirrusLink Bridge';
 
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>>
 
use schema identifier($staging_schema);
 
-- =========================
-- Define tables
-- =========================
 
create or replace table sparkplug_raw (
    msg_id varchar
    ,msg_topic varchar
    ,namespace varchar
    ,group_id varchar
    ,message_type varchar
    ,edge_node_id varchar
    ,device_id varchar
    ,msg variant
    ,inserted_at number
)
change_tracking = true
cluster by (message_type ,group_id ,edge_node_id ,device_id)
comment = 'Used for storing json messages from sparkplug bridge/gateway'
;

...

Set up assets related to the node database which would eventually contain all the device specific views and tables. The following assets These are created:

  • Node Database

...

  • Staging schema
Code Block
languagesql
titleSQL Script 02
collapsetrue
-- =========================
-- In this script, we are setting up assets related to the node database
-- ,which would eventually contain all the device specific views and tables.
-- At the very core, the following assets are created:
--  - Node Database
--  - Staging schema
 
-- The database & schema will be owned by SYSADMIN
 
-- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
-- =========================
 
set staged_sparkplug_raw_table = 'cl_bridge_stage_db.stage_db.sparkplug_raw';
set cl_bridge_node_db = 'cl_bridge_node_db';
set staging_schema = 'stage_db';
 
-- >>>>>>>>>>>>>>>>>>>>>> DATABASE >>>>>>>>>>>>>>>>>>>>>>>>>
 
use role sysadmin;
 
create database if not exists identifier($cl_bridge_node_db)
   -- DATA_RETENTION_TIME_IN_DAYS = 90
   -- MAX_DATA_EXTENSION_TIME_IN_DAYS = 90
   comment = 'used for storing flattened messages processed from the staging database'
;
 
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA >>>>>>>>>>>>>>>>>>>>>>>>>
 
use database identifier($cl_bridge_node_db);
 
create schema if not exists identifier($staging_schema)
  with managed access
  -- data_retention_time_in_days = 90
  -- max_data_extension_time_in_days = 90
  comment = 'used for storing flattened messages processed from the staging database';
 
-- >>>>>>>>>>>>>>>>>>>>>> STAGING SCHEMA ASSETS >>>>>>>>>>>>>>>>>>>>>>>>>
 
use schema identifier($staging_schema);
 
-- =========================
-- Define tables
-- =========================
 
-- NOTE THE 'cl_bridge_stage_db.stage_db.sparkplug_raw' is hardcoded here; as the identifier
-- staged_sparkplug_raw_table replacement does not work.
 
create or replace view sparkplug_messages_vw
    change_tracking = true
    comment = 'parses out the core attributes from the message and topic.'
    as
    select
        msg_id
        ,namespace
        ,group_id
        ,message_type
        ,edge_node_id
        ,device_id
        ,parse_json(msg) as message
        ,message:seq::int as message_sequence
        ,message:timestamp::number as message_timestamp
        ,inserted_at
    from cl_bridge_stage_db.stage_db.sparkplug_raw
    ;
 
-- --  >>>>>>>>>>>>>>>>>>>>>>
 
create or replace view nbirth_vw
    change_tracking = true
    comment = 'filtered to nbirth messages. This is a mirror'
    as
    select
       group_id ,edge_node_id
    from sparkplug_messages_vw
    where message_type = 'NBIRTH'
     
    ;
 
create or replace view node_machine_registry_vw
    comment = 'Used to retreive the latest template definitions for a given group and edge_node'
    as
    with base as (
        select
            group_id ,edge_node_id
            ,max_by(message ,message_timestamp) as message
            ,max(message_timestamp) as latest_message_timestamp
        from sparkplug_messages_vw
        where message_type = 'NBIRTH'
        group by group_id ,edge_node_id
    )
    select
        group_id ,edge_node_id
        ,f.value as template_definition
        ,template_definition:name::varchar as machine
        ,template_definition:reference::varchar as reference
        ,template_definition:version::varchar as version
        ,template_definition:timestamp::int as timestamp
    from base as b
        ,lateral flatten (input => b.message:metrics) f
    where template_definition:dataType::varchar = 'Template'
    ;
 
-- --  >>>>>>>>>>>>>>>>>>>>>>
 
create or replace view node_birth_death_vw
    comment = 'shows the latest node birth & death messages for each device'
    as
    select
        b.* exclude(namespace)
        ,message_type as nbirth_or_ndeath_raw
        ,iff((message_type = 'NBIRTH') ,f.value:value ,null)::number as nbirth_bdSeq_raw
        ,iff((message_type = 'NDEATH') ,f.value:value ,null)::number as ndeath_bdSeq_raw
        ,inserted_at as nbirth_ndeath_inserted_at_raw
    from sparkplug_messages_vw as b
        ,lateral flatten (input => b.message:metrics) as f
    where message_type in ('NBIRTH' ,'NDEATH')
     and f.value:name::varchar = 'bdSeq'
     ;
 
create or replace view device_records_vw
    change_tracking = true
    as
    select
        b.* exclude(namespace)
        ,null as nbirth_or_ndeath_raw
        ,null as nbirth_bdSeq_raw
        ,null as ndeath_bdSeq_raw
        ,null as nbirth_ndeath_inserted_at_raw
    from sparkplug_messages_vw as b
    where message_type in ('DBIRTH' ,'DDATA')
    ;
 
create or replace stream device_records_stream
    on view device_records_vw
 
    show_initial_rows = true
    comment = 'used for monitoring latest device messages'
    ;
 
create or replace view sparkplug_msgs_nodebirth_contextualized_vw
    as
    with device_node_unioned as (
        select *
        from node_birth_death_vw
        union all
        select * exclude(METADATA$ROW_ID ,METADATA$ACTION ,METADATA$ISUPDATE)
        from device_records_stream
    )
    select
        -- group_id ,message_type ,edge_node_id ,device_id
        -- ,message ,message_sequence ,inserted_at
        * exclude(nbirth_or_ndeath_raw ,nbirth_bdSeq_raw ,ndeath_bdSeq_raw ,nbirth_ndeath_inserted_at_raw)
        ,nvl(nbirth_or_ndeath_raw
                ,lag(nbirth_or_ndeath_raw) ignore nulls over (order by inserted_at ,message_sequence)
            ) as nbirth_or_ndeath
 
        ,nvl(nbirth_bdSeq_raw
                ,lag(nbirth_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence)
            ) as nbirth_bdSeq
 
        ,nvl(ndeath_bdSeq_raw
                ,lag(ndeath_bdSeq_raw) ignore nulls over (order by inserted_at ,message_sequence)
            ) as ndeath_bdSeq
 
        ,nvl(nbirth_ndeath_inserted_at_raw
                ,lag(nbirth_ndeath_inserted_at_raw) ignore nulls over (order by inserted_at ,message_sequence)
            ) as nbirth_ndeath_inserted_at
 
        ,case true
            when (nbirth_or_ndeath = 'NBIRTH') then false
            when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq != ndeath_bdSeq) ) then false
            when ( (nbirth_or_ndeath = 'NDEATH') and (nbirth_bdSeq = ndeath_bdSeq) ) then true
            else true
        end as is_last_known_good_reading
 
        ,case lower(message_type)
                when lower('NBIRTH') then 1
                when lower('DBIRTH') then 2
                when lower('DDATA') then 3
                when lower('DDEATH') then 4
                when lower('NDEATH') then 5
                else 99
            end as message_type_order
 
        ,(nbirth_or_ndeath = 'NBIRTH') as is_node_alive
 
    from device_node_unioned
    ;
 
create or replace view sparkplug_messages_flattened_vw
    as
    with base as (
        select
            -- sparkplugb message level
            msg_id ,group_id, edge_node_id ,device_id ,message_type
            ,message_sequence ,inserted_at
            ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
            ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
            ,message_type_order ,is_node_alive 
            ,message_timestamp as root_message_timestamp
 
            -- attributes related to device data (ddata / dbirth)
            ,f.value:name::varchar as device_name
            ,f.value:value:reference::varchar as template_reference
            ,f.value:value:version::varchar as template_version
            ,f.value:timestamp::number as device_metric_timestamp
            ,f.value as ddata_msg
 
            -- attributes related to device level metrics
            ,concat(msg_id ,'^' ,f.index ,'::',d.index) as device_measure_uuid
            ,d.value:name::varchar as measure_name
            ,d.value:value as measure_value
            ,d.value:timestamp::number as measure_timestamp
             
        from sparkplug_msgs_nodebirth_contextualized_vw as b
            ,lateral flatten(input => b.message:metrics) as f
            ,lateral flatten(input => f.value:value:metrics) as d
        where message_type in ('DBIRTH' ,'DDATA')
            and template_reference is not null
    )
    select
        group_id, edge_node_id ,device_id ,message_type
        ,message_sequence ,inserted_at
        ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
        ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
        ,message_type_order ,is_node_alive ,root_message_timestamp
             
        ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
        ,null as is_historical
 
        ,device_measure_uuid
        ,object_agg(distinct measure_name ,measure_value) as measures_info
        ,measure_timestamp
         
        ,to_timestamp(measure_timestamp/1000) as measure_ts
        ,to_date(measure_ts) as measure_date
        ,hour(measure_ts) as measure_hour
    from base
    group by group_id, edge_node_id ,device_id ,message_type
            ,message_sequence ,inserted_at
            ,nbirth_or_ndeath ,nbirth_bdseq ,ndeath_bdseq
            ,nbirth_ndeath_inserted_at ,is_last_known_good_reading
            ,message_type_order ,is_node_alive ,root_message_timestamp
 
            ,device_name ,template_reference ,template_version ,device_metric_timestamp ,ddata_msg
            ,is_historical ,device_measure_uuid
            ,measure_timestamp
     
    ;
 
create or replace transient table sparkplug_device_messages (
        group_id varchar
        ,edge_node_id varchar
        ,device_id varchar
        ,message_type varchar
        ,message_sequence number
         
        ,inserted_at number
        ,nbirth_or_ndeath varchar
        ,nbirth_bdseq number
        ,ndeath_bdseq number
        ,nbirth_ndeath_inserted_at number
        ,is_last_known_good_reading boolean
        ,message_type_order number
        ,is_node_alive boolean
 
        ,root_message_timestamp number
        ,device_name varchar
        ,template_reference varchar
        ,template_version varchar
        ,device_metric_timestamp number
        ,ddata_msg variant
        ,is_historical boolean
 
        ,device_measure_uuid varchar
        ,measures_info variant
        ,measure_timestamp number
 
        ,measure_ts timestamp
        ,measure_date date
        ,measure_hour number
    )
    cluster by ( group_id ,edge_node_id ,device_id
        ,template_reference ,template_version ,device_name
        ,measure_date ,measure_hour)
    comment = 'materialized version of the sparkplug_messages_flattened_vw for easier downstream pipelines.'
    ;
     
 
-- --  >>>>>>>>>>>>>>>>>>>>>>
-- ================
--  NODE BIRTH related assets
-- ================
 
create or replace stream nbirth_stream
    on view nbirth_vw
 
    show_initial_rows = true
    comment = 'stream to monitor for nbirth messages, so that assets are created automatically'
    ;

...

Code Block
languagesql
titleSQL Script 06
collapsetrue
set cl_bridge_node_db = 'cl_bridge_node_db';
set staging_schema = 'stage_db';
 
set reader_role_warehouse = 'compute_wh';
 
use role sysadmin;
use database identifier($cl_bridge_node_db);
use schema identifier($staging_schema);
 
 
create or replace dynamic table D_ACTIVE_NBIRTH
    lag = '1 day'
    warehouse = compute_wh
    as
    -- Tabularize the active nbirth events for easier error handling during
    -- investigations
    with base as (
        select
            group_id ,edge_node_id ,device_id
                ,nvl(nbirth_bdseq_raw ,ndeath_bdseq_raw ) as bdseq
                ,message_type
                ,to_timestamp(message_timestamp/1000) as MEASURE_TS
        from NODE_BIRTH_DEATH_VW
         
    ), nbirth_ndeath_matched as (
        select distinct nb.bdseq
        from base as nb
            join base as nd
                on nb.bdseq = nd.bdseq
        where nb.message_type = 'NBIRTH'
            and nd.message_type = 'NDEATH'
    )
    select b.* exclude(measure_ts ,message_type)
        ,min(measure_ts) as nbirth
    from base as b
    where b.bdseq not in (select bdseq from nbirth_ndeath_matched)
    group by all
    ;
 
create or replace dynamic table D_DEVICE_HEARTBEATS
    lag = '1 day'
    warehouse = compute_wh
    as
    -- tabularize the dbirth/data messages for each devices. Meant to be used
    -- for active investigation on message receipt from devices and error tracking
    with base as (
        select group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type ,MEASURE_TS
            ,row_number()
                over ( partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type
                        order by MEASURE_TS desc
                ) as row_num
         
        from SPARKPLUG_DEVICE_MESSAGES
 
    ), rows_filtered as (
        select *
        from base
        where row_num <= 2
    ), object_constructed as (
        select group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type
            ,first_value(MEASURE_TS)
                ignore nulls
                over ( partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type
                        order by MEASURE_TS desc
                        --ROWS BETWEEN 1 PRECEDING  AND 1 FOLLOWING
                        )
                        as latest_message_received_at
         
            ,NTH_VALUE( MEASURE_TS , 2 )
                FROM FIRST
                IGNORE NULLS
                over ( partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME ,message_type
                        order by MEASURE_TS desc
                        --ROWS BETWEEN 1 PRECEDING  AND 1 FOLLOWING
                        ) as prev_latest_message_received_at
         
            ,timestampdiff('second' ,prev_latest_message_received_at ,latest_message_received_at) as interval_time
         
            ,last_value(MEASURE_TS)
                ignore nulls
                over ( partition by group_id ,edge_node_id ,device_id ,message_type
                        order by MEASURE_TS desc
                        --ROWS BETWEEN 1 PRECEDING  AND 1 FOLLOWING
                        )
                        as oldest_message_received_at
         
            ,object_construct(
                'latest_message_received_at' ,latest_message_received_at
                ,'prev_latest_message_received_at' ,prev_latest_message_received_at
                ,'interval_seconds',interval_time
                ,'oldest_message_received_at',oldest_message_received_at
            ) as obj
        from rows_filtered
 
    ), unioned as (
        select group_id ,edge_node_id ,device_id ,DEVICE_NAME
            ,null as dbirth
            ,obj as ddata
        from object_constructed
        where message_type = 'DDATA'
        union
        select group_id ,edge_node_id ,device_id ,DEVICE_NAME
            ,obj as dbirth
            ,null as ddata
        from object_constructed
        where message_type = 'DBIRTH'
 
    )
    select group_id ,edge_node_id ,device_id ,DEVICE_NAME
        ,first_value(ddata)
                ignore nulls
                over (partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME
                    order by ddata)
            as ddata
        ,first_value(dbirth)
                ignore nulls
                over (partition by group_id ,edge_node_id ,device_id ,DEVICE_NAME
                    order by dbirth)
            as dbirth
    from unioned
    ;

SQL Script 07

Setup roles specifically requiring help of privileged roles like SYSADMIN. These are:

  • Create a custom role
  • Assign the custom role to create task and execute task
  • Create warehouse specifically used for ingestion
  • Grants
Code Block
languagesql
titleSQL Script 07
collapsetrue
-- =========================
-- In this script, we are setting up roles specifically requiring help of
-- privlilleged roles like SYSADMIN. These are:
--  - Create a custom role
--  - Assign the custom role to create task and execute task
--  - Create warehouse specifically used for ingestion
--  - Grants
 
-- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
-- =========================
 
set processor_role = 'cl_bridge_process_rl';
set cl_bridge_ingestion_warehouse = 'cl_bridge_ingest_wh';
set staging_db = 'cl_bridge_stage_db';
set staging_db_schema = 'cl_bridge_stage_db.stage_db';
 
set node_db = 'cl_bridge_node_db';
set node_db_staging_schema = 'cl_bridge_node_db.stage_db';
 
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> ROLE CREATION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    use role securityadmin;
    create role if not exists identifier($processor_role)
        comment = 'role used by cirruslink bridge to ingest and process mqtt/sparkplug data';
 
    grant role identifier($processor_role)
        to role sysadmin;
 
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> WAREHOUSE >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    use role sysadmin;
 
    create warehouse if not exists identifier($cl_bridge_ingestion_warehouse) with
        warehouse_type = standard
        warehouse_size = xsmall
        --max_cluster_count = 5
        initially_suspended = true
        comment = 'used by cirruslink bridge to ingest'
        ;
 
 
    grant usage on warehouse identifier($cl_bridge_ingestion_warehouse)
        to role identifier($processor_role);
    grant operate on warehouse identifier($cl_bridge_ingestion_warehouse)
        to role identifier($processor_role);
 
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS STAGE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    grant usage on database identifier($staging_db)
        to role identifier($processor_role);
 
    grant usage on schema identifier($staging_db_schema)
        to role identifier($processor_role);
 
    grant select on all tables in schema identifier($staging_db_schema)
        to role identifier($processor_role);
 
    grant insert on all tables in schema identifier($staging_db_schema)
        to role identifier($processor_role);
 
    grant select on future tables in schema identifier($staging_db_schema)
        to role identifier($processor_role);
 
    grant insert on future tables in schema identifier($staging_db_schema)
        to role identifier($processor_role);
 
 
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS NODE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
 
    grant usage on database identifier($node_db)
        to role identifier($processor_role);
 
    grant usage on schema identifier($node_db_staging_schema)
        to role identifier($processor_role);
 
    grant select on all tables in schema identifier($node_db_staging_schema)
        to role identifier($processor_role);
 
    grant insert on all tables in schema identifier($node_db_staging_schema)
        to role identifier($processor_role);
 
    grant select on all views in schema identifier($node_db_staging_schema)
        to role identifier($processor_role);
 
    grant usage on all functions in schema identifier($node_db_staging_schema)
        to role identifier($processor_role); 
 
    grant usage on all procedures in schema identifier($node_db_staging_schema)
        to role identifier($processor_role);
 
    -- need for:
    --  - creating edge node specific tables and views
    grant create schema on database identifier($node_db)
        to role identifier($processor_role);

SQL Script 08

Setup roles specifically requiring help of privileged roles like SYSADMIN, SECURITYADMIN, ACCOUNTADMIN. These are:

  • Create a custom role
  • Assign the custom role to create task and execute task
  • Grants
Code Block
languagesql
titleSQL Script 08
collapsetrue
-- =========================
-- In this script, we are setting up roles specifically requiring help of
-- privlilleged roles like SYSADMIN, SECURITYADMIN, ACCOUNTADMIN. These are:
--  - Create a custom role
--  - Assign the custom role to create task and execute task
--  - Grants
 
-- REPLACE THE SESSION VARIABLE ACCORDING TO YOUR ENVIRONMENT
-- =========================
 
set reader_role = 'cl_bridge_reader_rl';
 
set reader_role_warehouse = 'compute_wh';
 
set staging_db = 'cl_bridge_stage_db';
set staging_db_schema = 'cl_bridge_stage_db.stage_db';
 
set node_db = 'cl_bridge_node_db';
set node_db_staging_schema = 'cl_bridge_node_db.stage_db';
 
 
 
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> ROLE CREATION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    use role securityadmin;
 
    create role if not exists identifier($reader_role)
        comment = 'role used by user/process to query and operate on the views and tables managed by cirruslink bridge';
 
    grant role identifier($reader_role)
        to role sysadmin;
 
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS WAREHOUSE >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    use role accountadmin;
 
    grant usage on warehouse identifier($reader_role_warehouse)
        to role sysadmin with grant option;
    grant modify on warehouse identifier($reader_role_warehouse)
        to role sysadmin with grant option;
    grant operate on warehouse identifier($reader_role_warehouse)
        to role sysadmin with grant option;
 
    use role sysadmin;
 
    grant usage on warehouse identifier($reader_role_warehouse)
        to role identifier($reader_role);
    grant modify on warehouse identifier($reader_role_warehouse)
        to role identifier($reader_role);
    grant operate on warehouse identifier($reader_role_warehouse)
        to role identifier($reader_role);
 
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS TASK EXECUTION >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
 
    -- Creating and executing tasks require exculated privileges that can be done
    -- only by the accountadmin. Hence we have to switch roles
    -- use role accountadmin;
    -- grant execute managed task on account to role identifier($reader_role);
    -- grant execute task on account to role identifier($reader_role);
 
 
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS STAGE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
    grant usage on database identifier($staging_db)
        to role identifier($reader_role);
 
    grant usage on schema identifier($staging_db_schema)
        to role identifier($reader_role);
 
    grant select on all tables in schema identifier($staging_db_schema)
        to role identifier($reader_role);
    grant select on future tables in schema identifier($staging_db_schema)
        to role identifier($reader_role);
 
-- >>>>>>>>>>>>>>>>>>>>>>>>>>> GRANTS NODE DB >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
 
    grant usage on database identifier($node_db)
        to role identifier($reader_role);
 
    grant usage on schema identifier($node_db_staging_schema)
        to role identifier($reader_role);
 
    grant select on all tables in schema identifier($node_db_staging_schema)
        to role identifier($reader_role);
 
    grant select on all views in schema identifier($node_db_staging_schema)
        to role identifier($reader_role);
 
    grant usage on all functions in schema identifier($node_db_staging_schema)
        to role identifier($reader_role); 
 
    grant usage on all procedures in schema identifier($node_db_staging_schema)
        to role identifier($reader_role);
 
    -- need for:
    --  - new generated specific tables and views
    use role securityadmin;
    grant usage on future schemas in database identifier($node_db)
        to role identifier($reader_role);

...