Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions .github/workflows/dbt_run_full_observability.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: dbt_run_full_observability
run-name: dbt_run_full_observability

on:
workflow_dispatch:
schedule:
# Runs “At 00:00 on day-of-month 1.” (see https://crontab.guru)
- cron: '0 0 1 * *'

env:
DBT_PROFILES_DIR: ./

ACCOUNT: "${{ vars.ACCOUNT }}"
ROLE: "${{ vars.ROLE }}"
USER: "${{ vars.USER }}"
PASSWORD: "${{ secrets.PASSWORD }}"
REGION: "${{ vars.REGION }}"
DATABASE: "${{ vars.DATABASE }}"
WAREHOUSE: "${{ vars.WAREHOUSE }}"
SCHEMA: "${{ vars.SCHEMA }}"

concurrency:
group: ${{ github.workflow }}

jobs:
run_dbt_jobs:
runs-on: ubuntu-latest
environment:
name: workflow_prod_2xl

steps:
- uses: actions/checkout@v3

- uses: actions/setup-python@v1
with:
python-version: "3.7.x"

- name: install dependencies
run: |
pip3 install dbt-snowflake==${{ vars.DBT_VERSION }} cli_passthrough requests click
dbt deps
- name: Run DBT Jobs
run: |
dbt run --threads 2 --vars '{"OBSERV_FULL_TEST":True}' -m models/silver/_observability

1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ target/
dbt_packages/
logs/
dbt-env/
.user.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}

WITH summary_stats AS (

SELECT
MIN(height) AS min_block,
MAX(height) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM
{{ ref('silver__block_log') }}
WHERE
block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())

{% if is_incremental() %}
AND (height >= (SELECT MIN(block_number)
FROM ( SELECT MIN(height) AS block_number
FROM {{ ref('silver__block_log') }}
WHERE block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())

UNION

SELECT MIN(VALUE) - 1 AS block_number
FROM(SELECT blocks_impacted_array
FROM {{ this }} qualify ROW_NUMBER() over (ORDER BY test_timestamp DESC) = 1),
LATERAL FLATTEN(input => blocks_impacted_array)
)
) {% if var('OBSERV_FULL_TEST') %}
OR height >= 0
{% endif %}
)
{% endif %}

),

block_range AS (

SELECT
_id AS block_number
FROM {{ source('crosschain_silver', 'number_sequence') }}
WHERE _id BETWEEN (SELECT min_block FROM summary_stats)
AND (SELECT max_block FROM summary_stats)

),

blocks AS (

SELECT
l.height as block_number,
block_timestamp,
LAG(l.height,1) over (ORDER BY l.height ASC) AS prev_BLOCK_NUMBER
FROM {{ ref("silver__block_log") }} l

INNER JOIN block_range b
ON l.height = b.block_number
AND l.height >= (SELECT MIN(block_number) FROM block_range)

),

block_gen AS (

SELECT
_id AS block_number
FROM {{ source( 'crosschain_silver', 'number_sequence' ) }}
WHERE _id BETWEEN (SELECT MIN(block_number) FROM blocks)
AND (SELECT MAX(block_number) FROM blocks )

)

SELECT
'blocks' AS test_name,
MIN( b.block_number) AS min_block,
MAX(b.block_number) AS max_block,
MIN(b.block_timestamp) AS min_block_timestamp,
MAX(b.block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested,
COUNT(CASE WHEN C.block_number IS NOT NULL THEN A.block_number END) AS blocks_impacted_count,
ARRAY_AGG(CASE WHEN C.block_number IS NOT NULL THEN A.block_number END) within GROUP (ORDER BY A.block_number ) AS blocks_impacted_array,
CURRENT_TIMESTAMP AS test_timestamp
FROM block_gen A

LEFT JOIN blocks b
ON A.block_number = b.block_number

LEFT JOIN blocks C
ON A.block_number > C.prev_block_number
AND A.block_number < C.block_number
AND C.block_number - C.prev_block_number <> 1

WHERE
COALESCE(b.block_number, C.block_number) IS NOT NULL
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
{{ config(
materialized = 'incremental',
unique_key = 'test_timestamp',
full_refresh = false,
tags = ['observability']
) }}

WITH summary_stats AS (

SELECT
MIN(height) AS min_block,
MAX(height) AS max_block,
MIN(block_timestamp) AS min_block_timestamp,
MAX(block_timestamp) AS max_block_timestamp,
COUNT(1) AS blocks_tested
FROM {{ ref('silver__block_log') }}
WHERE block_timestamp <= DATEADD('hour', -12, CURRENT_TIMESTAMP())


{% if is_incremental() %}
AND (block_number >= (SELECT MIN(block_number)
FROM ( SELECT MIN(height) AS block_number
FROM {{ ref('silver__block_log') }}
WHERE block_timestamp BETWEEN DATEADD('hour', -96, CURRENT_TIMESTAMP())
AND DATEADD('hour', -95, CURRENT_TIMESTAMP())

UNION

SELECT MIN(VALUE) - 1 AS block_number
FROM(SELECT blocks_impacted_array
FROM {{ this }} qualify ROW_NUMBER() over (ORDER BY test_timestamp DESC) = 1),
LATERAL FLATTEN(input => blocks_impacted_array)
)
) {% if var('OBSERV_FULL_TEST') %}
OR block_number >= 0
{% endif %}
)
{% endif %}

),

block_range AS (

SELECT
_id AS block_number
FROM {{ source( 'crosschain_silver', 'number_sequence' ) }}
WHERE _id BETWEEN ( SELECT min_block FROM summary_stats)
AND (SELECT max_block FROM summary_stats)

),

broken_blocks AS (

SELECT
DISTINCT height as block_number
FROM {{ ref("silver__block_log") }} b

LEFT JOIN {{ ref("silver__swaps") }} t
ON b.height = t.block_id

JOIN block_range br
ON b.height = br.block_number
AND t.block_id = br.block_number

WHERE t.tx_id IS NULL

),

impacted_blocks AS (

SELECT
COUNT(1) AS blocks_impacted_count,
ARRAY_AGG(block_number) within GROUP (ORDER BY block_number ) AS blocks_impacted_array
FROM broken_blocks

)

SELECT
'transactions' AS test_name,
min_block,
max_block,
min_block_timestamp,
max_block_timestamp,
blocks_tested,
blocks_impacted_count,
blocks_impacted_array,
CURRENT_TIMESTAMP() AS test_timestamp
FROM summary_stats

JOIN impacted_blocks
ON 1 = 1
7 changes: 7 additions & 0 deletions models/sources.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,10 @@ sources:
schema: silver_crosschain
tables:
- name: address_labels
- name: crosschain_silver
database: crosschain
schema: silver
tables:
- name: number_sequence