Skip to content

optimize(stablecoins): read latest snapshot for prior_balances instead of full-history self-scan#9727

Draft
a-monteiro wants to merge 1 commit into
mainfrom
andre/stablecoins-balances-prior-snapshot
Draft

optimize(stablecoins): read latest snapshot for prior_balances instead of full-history self-scan#9727
a-monteiro wants to merge 1 commit into
mainfrom
andre/stablecoins-balances-prior-snapshot

Conversation

@a-monteiro
Copy link
Copy Markdown
Member

@a-monteiro a-monteiro commented Jun 7, 2026

The stablecoins *_core_balances / *_extended_balances incremental models compute prior_balances (each address/token's last known balance before the incremental window) by scanning the model's entire history ({{ this }} where not incremental_predicate('day')) and aggregating with max_by(...). Because prior_balances is referenced twice in changed_balances (a join and a union), Trino inlines it into two full-history self-scans.

On stablecoins_tron_core_balances that is two scans of ~71.3B rows / ~4.3 TB each — the single most expensive query on the spellbook-daily cluster: ~108 CPU-hours and ~9.1 TB scanned per run (~214 CPU-hours/day), spilling 41 GB and peaking ~195 GiB per task.

These tables are daily forward-filled snapshots (one row per active address/token/day), so every still-active (non-zero) balance is already present on the most recent pre-window day partition. Reading that single partition is equivalent to max_by(...) over all history: the addresses it drops are exactly those whose balance has gone to zero, which contribute nothing to the output (they default to 0 via the existing coalesce and are removed by the final balance_raw > 0 predicate).

Verified read-only on prod against real tron and bnb data: the rewritten output is byte-identical to the current logic (current EXCEPT rewrite and rewrite EXCEPT current both return 0 rows over a representative window). Measured on tron: physical input 9,110 GB → 7.1 GB, compute CPU ~390,000 s → ~1,955 s, peak memory ~1 TB → 16 GB, spill 41 GB → 0.

The change is in the two shared macros, so it applies to the generic EVM balances macro (~57 chains × {core, extended}) and the tron variant.

@github-actions github-actions Bot added WIP work in progress dbt: daily covers the Daily dbt subproject labels Jun 7, 2026
Copy link
Copy Markdown
Member Author

@a-monteiro
Copy link
Copy Markdown
Member Author

Equivalence + performance proof (verified fresh on prod, read-only)

Claim: replacing prior_balances — a max_by(...) over the model's entire history, which is inlined and scanned twice — with a read of the single latest pre-window day partition produces byte-identical model output at a fraction of the cost.

Why it's safe (the forward-fill invariant)

core/extended_balances is a daily forward-filled snapshot: one row per active (address, token) per day. Therefore:

  1. For any (address, token) present on the latest pre-window day, max_by(balance_raw, day) over history returns exactly that latest day's balance — which is what the new prior selects. (definitional: the latest day is the max day.)
  2. The only keys the new prior drops are those present in history but absent from the latest day. On real data, every one of them has balance_raw = 0 (Lemma below).
  3. A zero-balance prior row (whose last_updated precedes the window) forward-fills to zero-balance output rows with last_updated::date < day, which the model's final filter balance_raw > 0 OR (= 0 AND last_updated::date = day) removes. (so dropping them cannot change the output.)

Steps 1 and 3 are logic you can read in the macro; step 2 is the empirical Lemma. Together they imply identical output — which the two end-to-end EXCEPT checks then confirm directly.

Proofs — all read-only, real prod data, run with SET TIME ZONE 'UTC'

check what it measures result query_id cluster
Lemma of all (address,token) the OLD full-history prior keeps but the NEW latest-day prior drops, how many have a nonzero balance 7,162,564 dropped / 0 nonzero 20260607_160219_04205_fhyfg spellbook-tokens
E1 (tron) full pipeline: out_old EXCEPT out_new and out_new EXCEPT out_old 0 / 0 20260607_194716_01824_m4mfb spellbook-solana
E2 (bnb) full pipeline: out_old EXCEPT out_new and out_new EXCEPT out_old 0 / 0 20260607_195231_01943_m4mfb spellbook-solana

Reproduction caveat (matters): the transfers filter block_time >= timestamp '2026-06-03' is timezone-sensitive. Production runs in UTC, so the proof queries must begin with SET TIME ZONE 'UTC'; (included below). Without it, a session in a +01:00 zone pulls an extra hour of transfers into the window and a lead() tie yields a spurious ~6.8k diff. The Lemma uses a 35-day OLD lookback (exhaustive over recent zeroing); E1/E2 use a 14-day OLD lookback so the full reconstruction runs end-to-end in a few minutes.

Performance — tron core_balances

axis OLD (prod MERGE …085103…hgcax) NEW (rewritten body, checksum SELECT …094418…ee6bj) Δ
physical input 9,110 GB 7.1 GB ~1,280×
compute CPU ~391,000 s (108.6 CPU-hr) ~1,957 s ~200×
peak memory ~1 TB 15.7 GB ~65×
spill 41 GB 0
wall ~56 min ~13 s

NEW is the rewritten model body wrapped in count(*) + checksum() over all 10 output columns (3 warm runs, identical checksums, 230,247,346 rows). OLD is the production MERGE — its read side dominates; the ~460M-row write is negligible next to the 9.1 TB scan the rewrite eliminates.

Runnable SQL — tron end-to-end EXCEPT (E1)
SET TIME ZONE 'UTC';
-- Equivalence (tron): OLD prior_balances (max_by over history) vs NEW (single latest pre-window day).
-- Identical pipeline otherwise. window_start=06-03, prior day=06-02, output days=[06-03,06-06).
-- OLD bounded to last 14 pre-window days (day >= 2026-05-20) for a tractable end-to-end run;
-- the exhaustive zero-balance guarantee comes from the 35-day Lemma (see above).
with
transfers_in as (
  select t.blockchain, t.block_date as day, t.block_time, t."to" as address, t.to_varchar as address_varchar,
    t.token_address, t.contract_address, t.amount_raw as inflow, uint256 '0' as outflow
  from delta_prod.stablecoins_tron.core_transfers t
  where t."from" != t."to" and t.block_time >= timestamp '2026-06-03'
),
transfers_out as (
  select t.blockchain, t.block_date as day, t.block_time, t."from" as address, t.from_varchar as address_varchar,
    t.token_address, t.contract_address, uint256 '0' as inflow, t.amount_raw as outflow
  from delta_prod.stablecoins_tron.core_transfers t
  where t."from" != t."to" and t.block_time >= timestamp '2026-06-03'
),
all_flows as (select * from transfers_in union all select * from transfers_out),
daily_aggregated as (
  select blockchain, day, max(block_time) as last_updated, address, max(address_varchar) as address_varchar,
    token_address, contract_address, sum(inflow) as daily_inflow, sum(outflow) as daily_outflow
  from all_flows group by 1,2,4,6,7
),
days as (
  select cast(timestamp as date) as day from delta_prod.utils.days
  where cast(timestamp as date) >= DATE '2026-06-03' and cast(timestamp as date) < DATE '2026-06-06'
),
prior_old as (
  select blockchain, address, address_varchar, token_address, contract_address,
    max(day) as day, max_by(last_updated, day) as last_updated, max_by(balance_raw, day) as balance_raw
  from hive.stablecoins_tron.core_balances
  where day < DATE '2026-06-03' and day >= DATE '2026-05-20'
  group by 1,2,3,4,5
),
prior_new as (
  select blockchain, address, address_varchar, token_address, contract_address, day, last_updated, balance_raw
  from hive.stablecoins_tron.core_balances
  where day = (select max(day) from hive.stablecoins_tron.core_balances where day < DATE '2026-06-03')
),
changed_old as (
  select blockchain, day, last_updated, address, address_varchar, token_address, contract_address, balance_raw,
    lead(cast(day as timestamp)) over (partition by address, token_address order by day) as next_update_day
  from (
    select d.blockchain, d.day, d.last_updated, d.address, d.address_varchar, d.token_address, d.contract_address,
      cast(greatest(0e0, least(1.0e77, coalesce(cast(p.balance_raw as double),0e0) +
        sum(cast(d.daily_inflow as double) - cast(d.daily_outflow as double)) over (
          partition by d.address, d.token_address order by d.day rows between unbounded preceding and current row))) as uint256) as balance_raw
    from daily_aggregated d left join prior_old p on d.address=p.address and d.token_address=p.token_address
    union all
    select p.blockchain, p.day, p.last_updated, p.address, p.address_varchar, p.token_address, p.contract_address, p.balance_raw
    from prior_old p
  )
),
changed_new as (
  select blockchain, day, last_updated, address, address_varchar, token_address, contract_address, balance_raw,
    lead(cast(day as timestamp)) over (partition by address, token_address order by day) as next_update_day
  from (
    select d.blockchain, d.day, d.last_updated, d.address, d.address_varchar, d.token_address, d.contract_address,
      cast(greatest(0e0, least(1.0e77, coalesce(cast(p.balance_raw as double),0e0) +
        sum(cast(d.daily_inflow as double) - cast(d.daily_outflow as double)) over (
          partition by d.address, d.token_address order by d.day rows between unbounded preceding and current row))) as uint256) as balance_raw
    from daily_aggregated d left join prior_new p on d.address=p.address and d.token_address=p.token_address
    union all
    select p.blockchain, p.day, p.last_updated, p.address, p.address_varchar, p.token_address, p.contract_address, p.balance_raw
    from prior_new p
  )
),
out_old as (
  select b.blockchain, d.day, b.address, b.address_varchar, b.token_address, b.contract_address,
    'trc20' as token_standard, cast(null as uint256) as token_id, b.balance_raw, b.last_updated
  from days d left join changed_old b on d.day >= b.day and (b.next_update_day is null or cast(d.day as timestamp) < b.next_update_day)
  where (b.balance_raw > uint256 '0' or (b.balance_raw = uint256 '0' and cast(b.last_updated as date) = d.day))
    and b.address != 0x0000000000000000000000000000000000000000 and d.day >= DATE '2026-06-03'
),
out_new as (
  select b.blockchain, d.day, b.address, b.address_varchar, b.token_address, b.contract_address,
    'trc20' as token_standard, cast(null as uint256) as token_id, b.balance_raw, b.last_updated
  from days d left join changed_new b on d.day >= b.day and (b.next_update_day is null or cast(d.day as timestamp) < b.next_update_day)
  where (b.balance_raw > uint256 '0' or (b.balance_raw = uint256 '0' and cast(b.last_updated as date) = d.day))
    and b.address != 0x0000000000000000000000000000000000000000 and d.day >= DATE '2026-06-03'
)
select 'old_minus_new' as dir, count(*) as cnt from (select * from out_old except select * from out_new)
union all
select 'new_minus_old' as dir, count(*) as cnt from (select * from out_new except select * from out_old);
Runnable SQL — Lemma (zero-balance invariant)
SET TIME ZONE 'UTC';
-- Lemma (tron): every (address, token) the OLD prior (max_by over ALL history, bounded to 35 pre-window
-- days here) keeps but the NEW prior (latest pre-window day only) drops, has balance_raw = 0 -- so it
-- cannot affect the final output (which keeps only balance_raw > 0). This IS the forward-fill invariant.
with
prior_old as (
  select address, token_address, max_by(balance_raw, day) as balance_raw
  from hive.stablecoins_tron.core_balances
  where day < DATE '2026-06-03' and day >= DATE '2026-04-28'
  group by 1, 2
),
prior_new as (
  select distinct address, token_address
  from hive.stablecoins_tron.core_balances
  where day = (select max(day) from hive.stablecoins_tron.core_balances where day < DATE '2026-06-03')
)
select
  count(*) as dropped_by_new,
  count_if(o.balance_raw > uint256 '0') as dropped_with_nonzero_balance
from prior_old o
left join prior_new n on o.address = n.address and o.token_address = n.token_address
where n.address is null;

E2 (bnb) is identical to E1 with stablecoins_bnb, erc20 token_standard, and without the address_varchar/contract_address columns.

@tomfutago
Copy link
Copy Markdown
Contributor

lgtm, but would add temp data restriction to say last 7-14 days to allow ci to complete and run regression test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dbt: daily covers the Daily dbt subproject WIP work in progress

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants