From 70a32755a8cb9ea3eadbf1cc4fe3e33279ccf3b4 Mon Sep 17 00:00:00 2001 From: Marcus Pasell <3690498+rickyrombo@users.noreply.github.com> Date: Fri, 10 Oct 2025 16:08:13 -0700 Subject: [PATCH 1/4] wip: support damm v2 --- api/v1_coin.go | 176 ++-- api/v1_coins.go | 11 + config/config.go | 1 + ddl/functions/calculate_artist_coin_fees.sql | 55 ++ ddl/functions/handle_damm_v2_pool.sql | 21 + ddl/migrations/0169_damm_and_positions.sql | 137 ++++ solana/indexer/backfill.go | 2 +- solana/indexer/damm_v2.go | 761 ++++++++++++++++++ solana/indexer/dbc.go | 208 +++++ solana/indexer/processor.go | 49 +- solana/indexer/solana_indexer.go | 35 +- solana/indexer/subscription.go | 5 +- solana/indexer/unprocessed_transactions.go | 26 +- solana/indexer/utils.go | 16 +- .../spl/programs/meteora_damm_v2/accounts.go | 13 + solana/spl/programs/meteora_damm_v2/client.go | 113 +++ .../programs/meteora_damm_v2/client_test.go | 24 + .../programs/meteora_damm_v2/instruction.go | 9 + solana/spl/programs/meteora_damm_v2/types.go | 131 +++ .../programs/meteora_damm_v2/types_test.go | 46 ++ .../spl/programs/meteora_damm_v2/uint256le.go | 49 ++ solana/spl/programs/meteora_damm_v2/utils.go | 27 + .../programs/meteora_dbc/MigrationDammV2.go | 107 +++ solana/spl/programs/meteora_dbc/client.go | 2 - .../spl/programs/meteora_dbc/instruction.go | 120 +++ 25 files changed, 1972 insertions(+), 172 deletions(-) create mode 100644 ddl/functions/calculate_artist_coin_fees.sql create mode 100644 ddl/functions/handle_damm_v2_pool.sql create mode 100644 ddl/migrations/0169_damm_and_positions.sql create mode 100644 solana/indexer/damm_v2.go create mode 100644 solana/indexer/dbc.go create mode 100644 solana/spl/programs/meteora_damm_v2/accounts.go create mode 100644 solana/spl/programs/meteora_damm_v2/client.go create mode 100644 solana/spl/programs/meteora_damm_v2/client_test.go create mode 100644 solana/spl/programs/meteora_damm_v2/instruction.go create mode 100644 solana/spl/programs/meteora_damm_v2/types.go create mode 100644 solana/spl/programs/meteora_damm_v2/types_test.go create mode 100644 solana/spl/programs/meteora_damm_v2/uint256le.go create mode 100644 solana/spl/programs/meteora_damm_v2/utils.go create mode 100644 solana/spl/programs/meteora_dbc/MigrationDammV2.go create mode 100644 solana/spl/programs/meteora_dbc/instruction.go diff --git a/api/v1_coin.go b/api/v1_coin.go index 998d6e9d..f0d1b1db 100644 --- a/api/v1_coin.go +++ b/api/v1_coin.go @@ -5,15 +5,7 @@ import ( "github.com/jackc/pgx/v5" ) -func (app *ApiServer) v1Coin(c *fiber.Ctx) error { - mint := c.Params("mint") - if mint == "" { - return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ - "error": "mint parameter is required", - }) - } - - sql := ` +const sharedSql = ` SELECT artist_coins.name, artist_coins.mint, @@ -28,46 +20,46 @@ func (app *ApiServer) v1Coin(c *fiber.Ctx) error { artist_coins.link_4, artist_coins.has_discord, artist_coins.created_at, - artist_coins.updated_at as coin_updated_at, - COALESCE(artist_coin_stats.market_cap, 0) as market_cap, - COALESCE(artist_coin_stats.fdv, 0) as fdv, - COALESCE(artist_coin_stats.liquidity, 0) as liquidity, - COALESCE(artist_coin_stats.last_trade_unix_time, 0) as last_trade_unix_time, - COALESCE(artist_coin_stats.last_trade_human_time, '') as last_trade_human_time, - COALESCE(artist_coin_stats.price, 0) as price, - COALESCE(artist_coin_stats.history_24h_price, 0) as history_24h_price, - COALESCE(artist_coin_stats.price_change_24h_percent, 0) as price_change_24h_percent, - COALESCE(artist_coin_stats.unique_wallet_24h, 0) as unique_wallet_24h, - COALESCE(artist_coin_stats.unique_wallet_history_24h, 0) as unique_wallet_history_24h, - COALESCE(artist_coin_stats.unique_wallet_24h_change_percent, 0) as unique_wallet_24h_change_percent, - COALESCE(artist_coin_stats.total_supply, 0) as total_supply, - COALESCE(artist_coin_stats.circulating_supply, 0) as circulating_supply, - COALESCE(artist_coin_stats.holder, 0) as holder, - COALESCE(artist_coin_stats.trade_24h, 0) as trade_24h, - COALESCE(artist_coin_stats.trade_history_24h, 0) as trade_history_24h, - COALESCE(artist_coin_stats.trade_24h_change_percent, 0) as trade_24h_change_percent, - COALESCE(artist_coin_stats.sell_24h, 0) as sell_24h, - COALESCE(artist_coin_stats.sell_history_24h, 0) as sell_history_24h, - COALESCE(artist_coin_stats.sell_24h_change_percent, 0) as sell_24h_change_percent, - COALESCE(artist_coin_stats.buy_24h, 0) as buy_24h, - COALESCE(artist_coin_stats.buy_history_24h, 0) as buy_history_24h, - COALESCE(artist_coin_stats.buy_24h_change_percent, 0) as buy_24h_change_percent, - COALESCE(artist_coin_stats.v_24h, 0) as v_24h, - COALESCE(artist_coin_stats.v_24h_usd, 0) as v_24h_usd, - COALESCE(artist_coin_stats.v_history_24h, 0) as v_history_24h, - COALESCE(artist_coin_stats.v_history_24h_usd, 0) as v_history_24h_usd, - COALESCE(artist_coin_stats.v_24h_change_percent, 0) as v_24h_change_percent, - COALESCE(artist_coin_stats.v_buy_24h, 0) as v_buy_24h, - COALESCE(artist_coin_stats.v_buy_24h_usd, 0) as v_buy_24h_usd, - COALESCE(artist_coin_stats.v_buy_history_24h, 0) as v_buy_history_24h, - COALESCE(artist_coin_stats.v_buy_history_24h_usd, 0) as v_buy_history_24h_usd, - COALESCE(artist_coin_stats.v_buy_24h_change_percent, 0) as v_buy_24h_change_percent, - COALESCE(artist_coin_stats.v_sell_24h, 0) as v_sell_24h, - COALESCE(artist_coin_stats.v_sell_24h_usd, 0) as v_sell_24h_usd, - COALESCE(artist_coin_stats.v_sell_history_24h, 0) as v_sell_history_24h, - COALESCE(artist_coin_stats.v_sell_history_24h_usd, 0) as v_sell_history_24h_usd, - COALESCE(artist_coin_stats.v_sell_24h_change_percent, 0) as v_sell_24h_change_percent, - COALESCE(artist_coin_stats.number_markets, 0) as number_markets, + artist_coins.updated_at AS coin_updated_at, + COALESCE(artist_coin_stats.market_cap, 0) AS market_cap, + COALESCE(artist_coin_stats.fdv, 0) AS fdv, + COALESCE(artist_coin_stats.liquidity, 0) AS liquidity, + COALESCE(artist_coin_stats.last_trade_unix_time, 0) AS last_trade_unix_time, + COALESCE(artist_coin_stats.last_trade_human_time, '') AS last_trade_human_time, + COALESCE(artist_coin_stats.price, 0) AS price, + COALESCE(artist_coin_stats.history_24h_price, 0) AS history_24h_price, + COALESCE(artist_coin_stats.price_change_24h_percent, 0) AS price_change_24h_percent, + COALESCE(artist_coin_stats.unique_wallet_24h, 0) AS unique_wallet_24h, + COALESCE(artist_coin_stats.unique_wallet_history_24h, 0) AS unique_wallet_history_24h, + COALESCE(artist_coin_stats.unique_wallet_24h_change_percent, 0) AS unique_wallet_24h_change_percent, + COALESCE(artist_coin_stats.total_supply, 0) AS total_supply, + COALESCE(artist_coin_stats.circulating_supply, 0) AS circulating_supply, + COALESCE(artist_coin_stats.holder, 0) AS holder, + COALESCE(artist_coin_stats.trade_24h, 0) AS trade_24h, + COALESCE(artist_coin_stats.trade_history_24h, 0) AS trade_history_24h, + COALESCE(artist_coin_stats.trade_24h_change_percent, 0) AS trade_24h_change_percent, + COALESCE(artist_coin_stats.sell_24h, 0) AS sell_24h, + COALESCE(artist_coin_stats.sell_history_24h, 0) AS sell_history_24h, + COALESCE(artist_coin_stats.sell_24h_change_percent, 0) AS sell_24h_change_percent, + COALESCE(artist_coin_stats.buy_24h, 0) AS buy_24h, + COALESCE(artist_coin_stats.buy_history_24h, 0) AS buy_history_24h, + COALESCE(artist_coin_stats.buy_24h_change_percent, 0) AS buy_24h_change_percent, + COALESCE(artist_coin_stats.v_24h, 0) AS v_24h, + COALESCE(artist_coin_stats.v_24h_usd, 0) AS v_24h_usd, + COALESCE(artist_coin_stats.v_history_24h, 0) AS v_history_24h, + COALESCE(artist_coin_stats.v_history_24h_usd, 0) AS v_history_24h_usd, + COALESCE(artist_coin_stats.v_24h_change_percent, 0) AS v_24h_change_percent, + COALESCE(artist_coin_stats.v_buy_24h, 0) AS v_buy_24h, + COALESCE(artist_coin_stats.v_buy_24h_usd, 0) AS v_buy_24h_usd, + COALESCE(artist_coin_stats.v_buy_history_24h, 0) AS v_buy_history_24h, + COALESCE(artist_coin_stats.v_buy_history_24h_usd, 0) AS v_buy_history_24h_usd, + COALESCE(artist_coin_stats.v_buy_24h_change_percent, 0) AS v_buy_24h_change_percent, + COALESCE(artist_coin_stats.v_sell_24h, 0) AS v_sell_24h, + COALESCE(artist_coin_stats.v_sell_24h_usd, 0) AS v_sell_24h_usd, + COALESCE(artist_coin_stats.v_sell_history_24h, 0) AS v_sell_history_24h, + COALESCE(artist_coin_stats.v_sell_history_24h_usd, 0) AS v_sell_history_24h_usd, + COALESCE(artist_coin_stats.v_sell_24h_change_percent, 0) AS v_sell_24h_change_percent, + COALESCE(artist_coin_stats.number_markets, 0) AS number_markets, JSON_BUILD_OBJECT( 'address', COALESCE(artist_coin_pools.address, ''), 'price', COALESCE(artist_coin_pools.price, 0), @@ -78,12 +70,25 @@ func (app *ApiServer) v1Coin(c *fiber.Ctx) error { 'totalTradingQuoteFee', COALESCE(artist_coin_pools.total_trading_quote_fee, 0), 'creatorWalletAddress', COALESCE(artist_coin_pools.creator_wallet_address, '') ) AS dynamic_bonding_curve, - COALESCE(artist_coin_stats.updated_at, artist_coins.created_at) as updated_at + ROW_TO_JSON(calculate_artist_coin_fees(artist_coins.mint)) AS artist_fees, + COALESCE(artist_coin_stats.updated_at, artist_coins.created_at) AS updated_at FROM artist_coins LEFT JOIN artist_coin_stats ON artist_coin_stats.mint = artist_coins.mint LEFT JOIN artist_coin_pools ON artist_coin_pools.base_mint = artist_coins.mint +` + +func (app *ApiServer) v1Coin(c *fiber.Ctx) error { + mint := c.Params("mint") + if mint == "" { + return c.Status(fiber.StatusBadRequest).JSON(fiber.Map{ + "error": "mint parameter is required", + }) + } + + sql := ` + ` + sharedSql + ` WHERE artist_coins.mint = @mint LIMIT 1 ` @@ -114,76 +119,7 @@ func (app *ApiServer) v1CoinByTicker(c *fiber.Ctx) error { } sql := ` - SELECT - artist_coins.name, - artist_coins.mint, - artist_coins.ticker, - artist_coins.decimals, - artist_coins.user_id, - artist_coins.logo_uri, - artist_coins.description, - artist_coins.link_1, - artist_coins.link_2, - artist_coins.link_3, - artist_coins.link_4, - artist_coins.has_discord, - artist_coins.created_at, - artist_coins.updated_at as coin_updated_at, - COALESCE(artist_coin_stats.market_cap, 0) as market_cap, - COALESCE(artist_coin_stats.fdv, 0) as fdv, - COALESCE(artist_coin_stats.liquidity, 0) as liquidity, - COALESCE(artist_coin_stats.last_trade_unix_time, 0) as last_trade_unix_time, - COALESCE(artist_coin_stats.last_trade_human_time, '') as last_trade_human_time, - COALESCE(artist_coin_stats.price, 0) as price, - COALESCE(artist_coin_stats.history_24h_price, 0) as history_24h_price, - COALESCE(artist_coin_stats.price_change_24h_percent, 0) as price_change_24h_percent, - COALESCE(artist_coin_stats.unique_wallet_24h, 0) as unique_wallet_24h, - COALESCE(artist_coin_stats.unique_wallet_history_24h, 0) as unique_wallet_history_24h, - COALESCE(artist_coin_stats.unique_wallet_24h_change_percent, 0) as unique_wallet_24h_change_percent, - COALESCE(artist_coin_stats.total_supply, 0) as total_supply, - COALESCE(artist_coin_stats.circulating_supply, 0) as circulating_supply, - COALESCE(artist_coin_stats.holder, 0) as holder, - COALESCE(artist_coin_stats.trade_24h, 0) as trade_24h, - COALESCE(artist_coin_stats.trade_history_24h, 0) as trade_history_24h, - COALESCE(artist_coin_stats.trade_24h_change_percent, 0) as trade_24h_change_percent, - COALESCE(artist_coin_stats.sell_24h, 0) as sell_24h, - COALESCE(artist_coin_stats.sell_history_24h, 0) as sell_history_24h, - COALESCE(artist_coin_stats.sell_24h_change_percent, 0) as sell_24h_change_percent, - COALESCE(artist_coin_stats.buy_24h, 0) as buy_24h, - COALESCE(artist_coin_stats.buy_history_24h, 0) as buy_history_24h, - COALESCE(artist_coin_stats.buy_24h_change_percent, 0) as buy_24h_change_percent, - COALESCE(artist_coin_stats.v_24h, 0) as v_24h, - COALESCE(artist_coin_stats.v_24h_usd, 0) as v_24h_usd, - COALESCE(artist_coin_stats.v_history_24h, 0) as v_history_24h, - COALESCE(artist_coin_stats.v_history_24h_usd, 0) as v_history_24h_usd, - COALESCE(artist_coin_stats.v_24h_change_percent, 0) as v_24h_change_percent, - COALESCE(artist_coin_stats.v_buy_24h, 0) as v_buy_24h, - COALESCE(artist_coin_stats.v_buy_24h_usd, 0) as v_buy_24h_usd, - COALESCE(artist_coin_stats.v_buy_history_24h, 0) as v_buy_history_24h, - COALESCE(artist_coin_stats.v_buy_history_24h_usd, 0) as v_buy_history_24h_usd, - COALESCE(artist_coin_stats.v_buy_24h_change_percent, 0) as v_buy_24h_change_percent, - COALESCE(artist_coin_stats.v_sell_24h, 0) as v_sell_24h, - COALESCE(artist_coin_stats.v_sell_24h_usd, 0) as v_sell_24h_usd, - COALESCE(artist_coin_stats.v_sell_history_24h, 0) as v_sell_history_24h, - COALESCE(artist_coin_stats.v_sell_history_24h_usd, 0) as v_sell_history_24h_usd, - COALESCE(artist_coin_stats.v_sell_24h_change_percent, 0) as v_sell_24h_change_percent, - COALESCE(artist_coin_stats.number_markets, 0) as number_markets, - JSON_BUILD_OBJECT( - 'address', COALESCE(artist_coin_pools.address, ''), - 'price', COALESCE(artist_coin_pools.price, 0), - 'priceUSD', COALESCE(artist_coin_pools.price_usd, 0), - 'curveProgress', COALESCE(artist_coin_pools.curve_progress, 0), - 'isMigrated', COALESCE(artist_coin_pools.is_migrated, false), - 'creatorQuoteFee', COALESCE(artist_coin_pools.creator_quote_fee, 0), - 'totalTradingQuoteFee', COALESCE(artist_coin_pools.total_trading_quote_fee, 0), - 'creatorWalletAddress', COALESCE(artist_coin_pools.creator_wallet_address, '') - ) AS dynamic_bonding_curve, - COALESCE(artist_coin_stats.updated_at, artist_coins.created_at) as updated_at - FROM artist_coins - LEFT JOIN artist_coin_stats - ON artist_coin_stats.mint = artist_coins.mint - LEFT JOIN artist_coin_pools - ON artist_coin_pools.base_mint = artist_coins.mint + ` + sharedSql + ` WHERE artist_coins.ticker = @ticker LIMIT 1 ` diff --git a/api/v1_coins.go b/api/v1_coins.go index 763a9d23..dae90d7d 100644 --- a/api/v1_coins.go +++ b/api/v1_coins.go @@ -9,6 +9,15 @@ import ( "github.com/jackc/pgx/v5" ) +type ArtistCoinFees struct { + UnclaimedDbcFees float64 `json:"unclaimed_dbc_fees" db:"unclaimed_dbc_fees"` + TotalDbcFees float64 `json:"total_dbc_fees" db:"total_dbc_fees"` + UnclaimedDammV2Fees float64 `json:"unclaimed_damm_v2_fees" db:"unclaimed_damm_v2_fees"` + TotalDammV2Fees float64 `json:"total_damm_v2_fees" db:"total_damm_v2_fees"` + UnclaimedFees float64 `json:"unclaimed_fees" db:"unclaimed_fees"` + TotalFees float64 `json:"total_fees" db:"total_fees"` +} + type ArtistCoin struct { Name string `json:"name"` Ticker string `json:"ticker"` @@ -65,6 +74,7 @@ type ArtistCoin struct { VSell24hChangePercent float64 `json:"vSell24hChangePercent" db:"v_sell_24h_change_percent"` NumberMarkets int `json:"numberMarkets" db:"number_markets"` DynamicBondingCurve *DynamicBondingCurveInsights `json:"dynamicBondingCurve" db:"dynamic_bonding_curve"` + ArtistFees *ArtistCoinFees `json:"artistFees" db:"artist_fees"` UpdatedAt time.Time `json:"updatedAt" db:"updated_at"` } @@ -189,6 +199,7 @@ func (app *ApiServer) v1Coins(c *fiber.Ctx) error { 'totalTradingQuoteFee', COALESCE(artist_coin_pools.total_trading_quote_fee, 0), 'creatorWalletAddress', COALESCE(artist_coin_pools.creator_wallet_address, '') ) AS dynamic_bonding_curve, + ROW_TO_JSON(calculate_artist_coin_fees(artist_coins.mint)) AS artist_fees, COALESCE(artist_coin_stats.updated_at, artist_coins.created_at) as updated_at FROM artist_coins LEFT JOIN artist_coin_stats diff --git a/config/config.go b/config/config.go index 821e0a27..10f24cd0 100644 --- a/config/config.go +++ b/config/config.go @@ -91,6 +91,7 @@ func init() { Cfg.AudiusdChainID = core_config.DevAcdcChainID Cfg.AudiusdEntityManagerAddress = core_config.DevAcdcAddress + Cfg.SolanaIndexerRetryInterval = 10 * time.Second case "stage": fallthrough case "staging": diff --git a/ddl/functions/calculate_artist_coin_fees.sql b/ddl/functions/calculate_artist_coin_fees.sql new file mode 100644 index 00000000..0c5c3dd4 --- /dev/null +++ b/ddl/functions/calculate_artist_coin_fees.sql @@ -0,0 +1,55 @@ +BEGIN; +DROP FUNCTION IF EXISTS calculate_artist_coin_fees(TEXT); +CREATE OR REPLACE FUNCTION calculate_artist_coin_fees(artist_coin_mint TEXT) +RETURNS TABLE ( + unclaimed_dbc_fees NUMERIC, + total_dbc_fees NUMERIC, + unclaimed_damm_v2_fees NUMERIC, + total_damm_v2_fees NUMERIC, + unclaimed_fees NUMERIC, + total_fees NUMERIC +) LANGUAGE sql AS $function$ + WITH + damm_fees AS ( + SELECT + pool.token_a_mint AS mint, + ( + pool.fee_b_per_liquidity + * ( + position.unlocked_liquidity + position.vested_liquidity + position.permanent_locked_liquidity + ) + / POWER (2, 128) + + position.fee_b_pending + ) AS total_damm_v2_fees, + ( + (pool.fee_b_per_liquidity - position.fee_b_per_token_checkpoint) + * ( + position.unlocked_liquidity + position.vested_liquidity + position.permanent_locked_liquidity + ) + / POWER (2, 128) + + position.fee_b_pending + ) AS unclaimed_damm_v2_fees + FROM sol_meteora_damm_v2_pools pool + JOIN sol_meteora_dbc_migrations migration ON migration.base_mint = pool.token_a_mint + JOIN sol_meteora_damm_v2_positions position ON position.address = migration.first_position + WHERE pool.token_a_mint = artist_coin_mint + ), + dbc_fees AS ( + SELECT + base_mint AS mint, + total_trading_quote_fee / 2 AS total_dbc_fees, + creator_quote_fee / 2 AS unclaimed_dbc_fees + FROM artist_coin_pools + WHERE base_mint = artist_coin_mint + ) + SELECT + FLOOR(COALESCE(dbc_fees.unclaimed_dbc_fees, 0)) AS unclaimed_dbc_fees, + FLOOR(COALESCE(dbc_fees.total_dbc_fees, 0)) AS total_dbc_fees, + FLOOR(COALESCE(damm_fees.unclaimed_damm_v2_fees, 0)) AS unclaimed_damm_v2_fees, + FLOOR(COALESCE(damm_fees.total_damm_v2_fees, 0)) AS total_damm_v2_fees, + FLOOR(COALESCE(dbc_fees.unclaimed_dbc_fees, 0) + COALESCE(damm_fees.unclaimed_damm_v2_fees, 0)) AS unclaimed_fees, + FLOOR(COALESCE(dbc_fees.total_dbc_fees, 0) + COALESCE(damm_fees.total_damm_v2_fees, 0)) AS total_fees + FROM dbc_fees + FULL OUTER JOIN damm_fees USING (mint); +$function$; +COMMIT; \ No newline at end of file diff --git a/ddl/functions/handle_damm_v2_pool.sql b/ddl/functions/handle_damm_v2_pool.sql new file mode 100644 index 00000000..b5515b48 --- /dev/null +++ b/ddl/functions/handle_damm_v2_pool.sql @@ -0,0 +1,21 @@ +CREATE OR REPLACE FUNCTION handle_meteora_dbc_migrations() +RETURNS trigger AS $$ +BEGIN + PERFORM pg_notify('meteora_dbc_migration', json_build_object('operation', TG_OP)::text); + RETURN NEW; + EXCEPTION + WHEN OTHERS THEN + RAISE WARNING 'An error occurred in %: %', TG_NAME, SQLERRM; + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +DO $$ +BEGIN + CREATE TRIGGER on_meteora_dbc_migrations + AFTER INSERT OR DELETE ON sol_meteora_dbc_migrations + FOR EACH ROW EXECUTE FUNCTION handle_meteora_dbc_migrations(); +EXCEPTION + WHEN others THEN NULL; -- Ignore if trigger already exists +END $$; +COMMENT ON TRIGGER on_meteora_dbc_migrations ON sol_meteora_dbc_migrations IS 'Notifies when a DBC pool migrates to a DAMM V2 pool.' \ No newline at end of file diff --git a/ddl/migrations/0169_damm_and_positions.sql b/ddl/migrations/0169_damm_and_positions.sql new file mode 100644 index 00000000..6bb0ddae --- /dev/null +++ b/ddl/migrations/0169_damm_and_positions.sql @@ -0,0 +1,137 @@ +CREATE TABLE IF NOT EXISTS sol_meteora_dbc_migrations ( + signature TEXT NOT NULL, + instruction_index INT NOT NULL, + slot BIGINT NOT NULL, + dbc_pool TEXT NOT NULL, + migration_metadata TEXT NOT NULL, + config TEXT NOT NULL, + dbc_pool_authority TEXT NOT NULL, + damm_v2_pool TEXT NOT NULL, + first_position_nft_mint TEXT NOT NULL, + first_position_nft_account TEXT NOT NULL, + first_position TEXT NOT NULL, + second_position_nft_mint TEXT NOT NULL, + second_position_nft_account TEXT NOT NULL, + second_position TEXT NOT NULL, + damm_pool_authority TEXT NOT NULL, + base_mint TEXT NOT NULL, + quote_mint TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (signature, instruction_index) +); +CREATE INDEX IF NOT EXISTS sol_meteora_dbc_migrations_base_mint_idx ON sol_meteora_dbc_migrations(base_mint); +COMMENT ON TABLE sol_meteora_dbc_migrations IS 'Tracks migrations from DBC pools to DAMM V2 pools.'; +COMMENT ON INDEX sol_meteora_dbc_migrations_base_mint_idx IS 'Used for finding artist positions by base_mint.'; + +CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_pools ( + address TEXT PRIMARY KEY, + token_a_mint TEXT NOT NULL, + token_b_mint TEXT NOT NULL, + token_a_vault TEXT NOT NULL, + token_b_vault TEXT NOT NULL, + whitelisted_vault TEXT NOT NULL, + partner TEXT NOT NULL, + liquidity NUMERIC NOT NULL, + protocol_a_fee BIGINT NOT NULL, + protocol_b_fee BIGINT NOT NULL, + partner_a_fee BIGINT NOT NULL, + partner_b_fee BIGINT NOT NULL, + sqrt_min_price NUMERIC NOT NULL, + sqrt_max_price NUMERIC NOT NULL, + sqrt_price NUMERIC NOT NULL, + activation_point BIGINT NOT NULL, + activation_type SMALLINT NOT NULL, + pool_status SMALLINT NOT NULL, + token_a_flag SMALLINT NOT NULL, + token_b_flag SMALLINT NOT NULL, + collect_fee_mode SMALLINT NOT NULL, + pool_type SMALLINT NOT NULL, + fee_a_per_liquidity BIGINT NOT NULL, + fee_b_per_liquidity BIGINT NOT NULL, + permanent_lock_liquidity NUMERIC NOT NULL, + creator TEXT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +COMMENT ON TABLE sol_meteora_damm_v2_pools IS 'Tracks DAMM V2 pool state. Join with sol_meteora_damm_v2_pool_metrics, sol_meteora_damm_v2_pool_fees, sol_meteora_damm_v2_pool_base_fees, and sol_meteora_damm_v2_pool_dynamic_fees for full pool state.'; + +CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_pool_metrics ( + pool TEXT PRIMARY KEY REFERENCES sol_meteora_damm_v2_pools(address) ON DELETE CASCADE, + total_lp_a_fee NUMERIC NOT NULL, + total_lp_b_fee NUMERIC NOT NULL, + total_protocol_a_fee NUMERIC NOT NULL, + total_protocol_b_fee NUMERIC NOT NULL, + total_partner_a_fee NUMERIC NOT NULL, + total_partner_b_fee NUMERIC NOT NULL, + total_position BIGINT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +COMMENT ON TABLE sol_meteora_damm_v2_pool_metrics IS 'Tracks aggregated metrics for DAMM V2 pools. A slice of the DAMM V2 pool state.'; + +CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_pool_fees ( + pool TEXT PRIMARY KEY REFERENCES sol_meteora_damm_v2_pools(address) ON DELETE CASCADE, + protocol_fee_percent SMALLINT NOT NULL, + partner_fee_percent SMALLINT NOT NULL, + referral_fee_percent SMALLINT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +COMMENT ON TABLE sol_meteora_damm_v2_pool_fees IS 'Tracks fee configuration for DAMM V2 pools. A slice of the DAMM V2 pool state.'; + +CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_pool_base_fees ( + pool TEXT PRIMARY KEY REFERENCES sol_meteora_damm_v2_pools(address) ON DELETE CASCADE, + cliff_fee_numerator BIGINT NOT NULL, + fee_scheduler_mode SMALLINT NOT NULL, + number_of_period SMALLINT NOT NULL, + period_frequency BIGINT NOT NULL, + reduction_factor BIGINT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +COMMENT ON TABLE sol_meteora_damm_v2_pool_base_fees IS 'Tracks base fee configuration for DAMM V2 pools. A slice of the DAMM V2 pool state.'; + +CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_pool_dynamic_fees ( + pool TEXT PRIMARY KEY REFERENCES sol_meteora_damm_v2_pools(address) ON DELETE CASCADE, + initialized SMALLINT NOT NULL, + max_volatility_accumulator INTEGER NOT NULL, + variable_fee_control INTEGER NOT NULL, + bin_step SMALLINT NOT NULL, + filter_period SMALLINT NOT NULL, + decay_period SMALLINT NOT NULL, + reduction_factor SMALLINT NOT NULL, + last_update_timestamp BIGINT NOT NULL, + bin_step_u128 NUMERIC NOT NULL, + sqrt_price_reference NUMERIC NOT NULL, + volatility_accumulator NUMERIC NOT NULL, + volatility_reference NUMERIC NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +COMMENT ON TABLE sol_meteora_damm_v2_pool_dynamic_fees IS 'Tracks dynamic fee configuration for DAMM V2 pools. A slice of the DAMM V2 pool state.'; + +CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_positions ( + address TEXT PRIMARY KEY, + pool TEXT NOT NULL REFERENCES sol_meteora_damm_v2_pools(address) ON DELETE CASCADE, + nft_mint TEXT NOT NULL, + fee_a_per_token_checkpoint BIGINT NOT NULL, + fee_b_per_token_checkpoint BIGINT NOT NULL, + fee_a_pending BIGINT NOT NULL, + fee_b_pending BIGINT NOT NULL, + unlocked_liquidity NUMERIC NOT NULL, + vested_liquidity NUMERIC NOT NULL, + permanent_locked_liquidity NUMERIC NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +COMMENT ON TABLE sol_meteora_damm_v2_positions IS 'Tracks DAMM V2 positions representing a claim to the liquidity and associated fees in a DAMM V2 pool. Join with sol_meteora_damm_v2_position_metrics for full position state.'; + +CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_position_metrics ( + position TEXT PRIMARY KEY REFERENCES sol_meteora_damm_v2_positions(address) ON DELETE CASCADE, + total_claimed_a_fee BIGINT NOT NULL, + total_claimed_b_fee BIGINT NOT NULL, + created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP +); +COMMENT ON TABLE sol_meteora_damm_v2_position_metrics IS 'Tracks aggregated metrics for DAMM V2 positions. A slice of the DAMM V2 position state.'; \ No newline at end of file diff --git a/solana/indexer/backfill.go b/solana/indexer/backfill.go index fca052dd..0535fd68 100644 --- a/solana/indexer/backfill.go +++ b/solana/indexer/backfill.go @@ -105,7 +105,7 @@ func (s *SolanaIndexer) backfillAddressTransactions(ctx context.Context, address } opts.Before = before - res, err := withRetries(func() ([]*rpc.TransactionSignature, error) { + res, err := withRetriesResult(func() ([]*rpc.TransactionSignature, error) { return s.rpcClient.GetSignaturesForAddressWithOpts(ctx, address, &opts) }, 5, time.Second*1) if err != nil { diff --git a/solana/indexer/damm_v2.go b/solana/indexer/damm_v2.go new file mode 100644 index 00000000..dc381e6e --- /dev/null +++ b/solana/indexer/damm_v2.go @@ -0,0 +1,761 @@ +package indexer + +import ( + "context" + "fmt" + + "api.audius.co/database" + "api.audius.co/solana/spl/programs/meteora_damm_v2" + bin "github.com/gagliardetto/binary" + "github.com/gagliardetto/solana-go" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + pb "github.com/rpcpool/yellowstone-grpc/examples/golang/proto" + "go.uber.org/zap" +) + +type notificationCallback func(ctx context.Context, notification *pgconn.Notification) + +type DammV2Indexer struct { + pool database.DbPool + grpcConfig GrpcConfig + logger *zap.Logger +} + +const MAX_DAMM_V2_POOLS_PER_SUBSCRIPTION = 10000 +const DAMM_V2_POOL_SUBSCRIPTION_KEY = "dammV2Pools" +const DBC__MIGRATION_NOTIFICATION_NAME = "meteora_dbc_migration" + +func (d *DammV2Indexer) Start(ctx context.Context) { + // To ensure only one subscription task is running at a time, keep track of + // the last cancel function and call it on the next notification. + var lastCancel context.CancelFunc + + // Ensure all gRPC clients are closed on shutdown + var grpcClients []GrpcClient + defer (func() { + for _, client := range grpcClients { + client.Close() + } + })() + + handleNotif := func(ctx context.Context, notification *pgconn.Notification) { + // Cancel the previous task if it exists + subCtx, cancel := context.WithCancel(ctx) + if lastCancel != nil { + lastCancel() + } + for _, client := range grpcClients { + client.Close() + } + clients, err := subscribeToDammV2Pools(subCtx, d.pool, d.grpcConfig, d.logger) + grpcClients = clients + if err != nil { + d.logger.Error("failed to resubscribe to DAMM V2 pools", zap.Error(err)) + return + } + lastCancel = cancel + } + + // Setup initial subscription + clients, err := subscribeToDammV2Pools(ctx, d.pool, d.grpcConfig, d.logger) + if err != nil { + d.logger.Error("failed to subscribe to DAMM V2 pools", zap.Error(err)) + return + } + grpcClients = clients + + // Watch for new pools to be added + err = watchPgNotification(ctx, d.pool, DBC__MIGRATION_NOTIFICATION_NAME, handleNotif, d.logger) + if err != nil { + d.logger.Error("failed to watch for DAMM V2 pool changes", zap.Error(err)) + return + } + + for { + select { + case <-ctx.Done(): + d.logger.Info("received shutdown signal, stopping DAMM V2 indexer") + return + default: + } + } +} + +func subscribeToDammV2Pools(ctx context.Context, db database.DBTX, grpcConfig GrpcConfig, logger *zap.Logger) ([]GrpcClient, error) { + done := false + page := 0 + pageSize := MAX_DAMM_V2_POOLS_PER_SUBSCRIPTION + total := 0 + grpcClients := make([]GrpcClient, 0) + for !done { + dammV2Pools, err := getWatchedDammV2Pools(ctx, db, pageSize, page*pageSize) + if err != nil { + return nil, fmt.Errorf("failed to get watched DAMM V2 pools: %w", err) + } + if len(dammV2Pools) == 0 { + logger.Info("no DAMM V2 pools to subscribe to") + return grpcClients, nil + } + total += len(dammV2Pools) + + logger.Debug("subscribing to DAMM V2 pools....", zap.Int("numPools", len(dammV2Pools))) + subscription := makeDammV2SubscriptionRequest(dammV2Pools) + + handleMessage := func(ctx context.Context, msg *pb.SubscribeUpdate) { + handleDammV2Message(ctx, db, msg, logger) + } + + grpcClient := NewGrpcClient(grpcConfig) + err = grpcClient.Subscribe(ctx, subscription, handleMessage, func(err error) { + logger.Error("error in DAMM V2 subscription", zap.Error(err)) + }) + if err != nil { + return nil, fmt.Errorf("failed to subscribe to DAMM V2 pools: %w", err) + } + grpcClients = append(grpcClients, grpcClient) + + if len(dammV2Pools) < pageSize { + done = true + } + page++ + } + logger.Info("subscribed to DAMM V2 pools", zap.Int("numPools", total)) + return grpcClients, nil +} + +func watchPgNotification(ctx context.Context, pool database.DbPool, notification string, callback notificationCallback, logger *zap.Logger) error { + if logger == nil { + logger = zap.NewNop() + } + + childLogger := logger.With(zap.String("notification", notification)) + + conn, err := pool.Acquire(ctx) + if err != nil { + return fmt.Errorf("failed to acquire database connection: %w", err) + } + + rawConn := conn.Conn() + _, err = rawConn.Exec(ctx, fmt.Sprintf(`LISTEN %s`, notification)) + if err != nil { + return fmt.Errorf("failed to listen for %s changes: %w", notification, err) + } + + go func() { + defer func() { + if rawConn != nil && !rawConn.PgConn().IsClosed() && ctx.Err() != nil { + _, _ = rawConn.Exec(ctx, fmt.Sprintf(`UNLISTEN %s`, notification)) + } + childLogger.Info("received shutdown signal, stopping notification watcher") + conn.Release() + }() + for { + select { + case <-ctx.Done(): + return + default: + } + + notif, err := rawConn.WaitForNotification(ctx) + if err != nil { + childLogger.Error("failed waiting for notification", zap.Error(err)) + } + if notif == nil { + childLogger.Warn("received nil notification, continuing to wait for notifications") + continue + } + callback(ctx, notif) + } + }() + return nil +} + +func makeDammV2SubscriptionRequest(dammV2Pools []string) *pb.SubscribeRequest { + commitment := pb.CommitmentLevel_CONFIRMED + subscription := &pb.SubscribeRequest{ + Commitment: &commitment, + } + + // Listen for slot updates for checkpointing + subscription.Slots = make(map[string]*pb.SubscribeRequestFilterSlots) + subscription.Slots["checkpoints"] = &pb.SubscribeRequestFilterSlots{} + + // fromSlot := uint64(372380625) + // subscription.FromSlot = &fromSlot + + subscription.Accounts = make(map[string]*pb.SubscribeRequestFilterAccounts) + + // Listen to all watched pools + accountFilter := pb.SubscribeRequestFilterAccounts{ + Owner: []string{meteora_damm_v2.ProgramID.String()}, + Account: dammV2Pools, + } + subscription.Accounts[DAMM_V2_POOL_SUBSCRIPTION_KEY] = &accountFilter + + // Listen to all positions for each pool + for _, pool := range dammV2Pools { + accountFilter := pb.SubscribeRequestFilterAccounts{ + Owner: []string{meteora_damm_v2.ProgramID.String()}, + Filters: []*pb.SubscribeRequestFilterAccountsFilter{ + { + Filter: &pb.SubscribeRequestFilterAccountsFilter_Memcmp{ + Memcmp: &pb.SubscribeRequestFilterAccountsFilterMemcmp{ + Offset: 8, // Offset of the pool field in the position account (after discriminator) + Data: &pb.SubscribeRequestFilterAccountsFilterMemcmp_Base58{ + Base58: pool, + }, + }, + }, + }, + { + Filter: &pb.SubscribeRequestFilterAccountsFilter_Datasize{ + Datasize: 408, // byte size of a Position account + }, + }, + }, + } + subscription.Accounts[pool] = &accountFilter + } + + return subscription +} + +func handleDammV2Message(ctx context.Context, db database.DBTX, msg *pb.SubscribeUpdate, logger *zap.Logger) { + accUpdate := msg.GetAccount() + if accUpdate != nil { + if msg.Filters[0] == DAMM_V2_POOL_SUBSCRIPTION_KEY { + err := processDammV2PoolUpdate(ctx, db, accUpdate) + if err != nil { + logger.Error("failed to process DAMM V2 pool update", zap.Error(err)) + } else { + logger.Debug("processed DAMM V2 pool update", zap.String("account", solana.PublicKeyFromBytes(accUpdate.Account.Pubkey).String())) + } + } else { + err := processDammV2PositionUpdate(ctx, db, accUpdate) + if err != nil { + logger.Error("failed to process DAMM V2 position update", zap.Error(err)) + } else { + logger.Debug("processed DAMM V2 position update", zap.String("account", solana.PublicKeyFromBytes(accUpdate.Account.Pubkey).String())) + } + } + + } +} + +func processDammV2PoolUpdate( + ctx context.Context, + db database.DBTX, + update *pb.SubscribeUpdateAccount, +) error { + account := solana.PublicKeyFromBytes(update.Account.Pubkey) + var pool meteora_damm_v2.Pool + err := bin.NewBorshDecoder(update.Account.Data).Decode(&pool) + if err != nil { + return err + } + err = upsertDammV2Pool(ctx, db, account, &pool) + if err != nil { + return err + } + err = upsertDammV2PoolMetrics(ctx, db, account, &pool.Metrics) + if err != nil { + return err + } + err = upsertDammV2PoolFees(ctx, db, account, &pool.PoolFees) + if err != nil { + return err + } + err = upsertDammV2PoolBaseFee(ctx, db, account, &pool.PoolFees.BaseFee) + if err != nil { + return err + } + err = upsertDammV2PoolDynamicFee(ctx, db, account, &pool.PoolFees.DynamicFee) + if err != nil { + return err + } + return nil +} + +func processDammV2PositionUpdate( + ctx context.Context, + db database.DBTX, + update *pb.SubscribeUpdateAccount, +) error { + account := solana.PublicKeyFromBytes(update.Account.Pubkey) + var position meteora_damm_v2.PositionState + err := bin.NewBorshDecoder(update.Account.Data).Decode(&position) + if err != nil { + return err + } + err = upsertDammV2Position(ctx, db, account, &position) + if err != nil { + return err + } + err = upsertDammV2PositionMetrics(ctx, db, account, &position.Metrics) + if err != nil { + return err + } + return nil +} + +func getWatchedDammV2Pools(ctx context.Context, db database.DBTX, limit int, offset int) ([]string, error) { + sql := ` + SELECT damm_v2_pool + FROM sol_meteora_dbc_migrations + LIMIT @limit OFFSET @offset + ;` + rows, err := db.Query(ctx, sql, pgx.NamedArgs{ + "limit": limit, + "offset": offset, + }) + if err != nil { + return nil, err + } + defer rows.Close() + + var pools []string + for rows.Next() { + var address string + if err := rows.Scan(&address); err != nil { + return nil, err + } + pools = append(pools, address) + } + return pools, nil +} + +func upsertDammV2Pool( + ctx context.Context, + db database.DBTX, + account solana.PublicKey, + pool *meteora_damm_v2.Pool, +) error { + sqlPool := ` + INSERT INTO sol_meteora_damm_v2_pools ( + address, + token_a_mint, + token_b_mint, + token_a_vault, + token_b_vault, + whitelisted_vault, + partner, + liquidity, + protocol_a_fee, + protocol_b_fee, + partner_a_fee, + partner_b_fee, + sqrt_min_price, + sqrt_max_price, + sqrt_price, + activation_point, + activation_type, + pool_status, + token_a_flag, + token_b_flag, + collect_fee_mode, + pool_type, + fee_a_per_liquidity, + fee_b_per_liquidity, + permanent_lock_liquidity, + creator, + created_at, + updated_at + ) VALUES ( + @address, + @token_a_mint, + @token_b_mint, + @token_a_vault, + @token_b_vault, + @whitelisted_vault, + @partner, + @liquidity, + @protocol_a_fee, + @protocol_b_fee, + @partner_a_fee, + @partner_b_fee, + @sqrt_min_price, + @sqrt_max_price, + @sqrt_price, + @activation_point, + @activation_type, + @pool_status, + @token_a_flag, + @token_b_flag, + @collect_fee_mode, + @pool_type, + @fee_a_per_liquidity, + @fee_b_per_liquidity, + @permanent_lock_liquidity, + @creator, + NOW(), + NOW() + ) + ON CONFLICT (address) DO UPDATE SET + token_a_mint = EXCLUDED.token_a_mint, + token_b_mint = EXCLUDED.token_b_mint, + token_a_vault = EXCLUDED.token_a_vault, + token_b_vault = EXCLUDED.token_b_vault, + whitelisted_vault = EXCLUDED.whitelisted_vault, + partner = EXCLUDED.partner, + liquidity = EXCLUDED.liquidity, + protocol_a_fee = EXCLUDED.protocol_a_fee, + protocol_b_fee = EXCLUDED.protocol_b_fee, + partner_a_fee = EXCLUDED.partner_a_fee, + partner_b_fee = EXCLUDED.partner_b_fee, + sqrt_min_price = EXCLUDED.sqrt_min_price, + sqrt_max_price = EXCLUDED.sqrt_max_price, + sqrt_price = EXCLUDED.sqrt_price, + activation_point = EXCLUDED.activation_point, + activation_type = EXCLUDED.activation_type, + pool_status = EXCLUDED.pool_status, + token_a_flag = EXCLUDED.token_a_flag, + token_b_flag = EXCLUDED.token_b_flag, + collect_fee_mode = EXCLUDED.collect_fee_mode, + pool_type = EXCLUDED.pool_type, + fee_a_per_liquidity = EXCLUDED.fee_a_per_liquidity, + fee_b_per_liquidity = EXCLUDED.fee_b_per_liquidity, + permanent_lock_liquidity = EXCLUDED.permanent_lock_liquidity, + creator = EXCLUDED.creator, + updated_at = NOW() + ` + args := pgx.NamedArgs{ + "address": account.String(), + "token_a_mint": pool.TokenAMint.String(), + "token_b_mint": pool.TokenBMint.String(), + "token_a_vault": pool.TokenAVault.String(), + "token_b_vault": pool.TokenBVault.String(), + "whitelisted_vault": pool.WhitelistedVault.String(), + "partner": pool.Partner.String(), + "liquidity": pool.Liquidity.String(), + "protocol_a_fee": pool.Metrics.TotalProtocolAFee, + "protocol_b_fee": pool.Metrics.TotalProtocolBFee, + "partner_a_fee": pool.Metrics.TotalPartnerAFee, + "partner_b_fee": pool.Metrics.TotalPartnerBFee, + "sqrt_min_price": pool.SqrtMinPrice.BigInt(), + "sqrt_max_price": pool.SqrtMaxPrice.BigInt(), + "sqrt_price": pool.SqrtPrice.BigInt(), + "activation_point": pool.ActivationPoint, + "activation_type": pool.ActivationType, + "pool_status": pool.PoolStatus, + "token_a_flag": pool.TokenAFlag, + "token_b_flag": pool.TokenBFlag, + "collect_fee_mode": pool.CollectFeeMode, + "pool_type": pool.PoolType, + "fee_a_per_liquidity": pool.FeeAPerLiquidity, + "fee_b_per_liquidity": pool.FeeBPerLiquidity, + "permanent_lock_liquidity": pool.PermanentLockLiquidity.BigInt(), + "creator": pool.Creator.String(), + } + _, err := db.Exec(ctx, sqlPool, args) + + return err +} + +func upsertDammV2PoolMetrics( + ctx context.Context, + db database.DBTX, + account solana.PublicKey, + metrics *meteora_damm_v2.PoolMetrics, +) error { + sqlMetrics := ` + INSERT INTO sol_meteora_damm_v2_pool_metrics ( + pool, + total_lp_a_fee, + total_lp_b_fee, + total_protocol_a_fee, + total_protocol_b_fee, + total_partner_a_fee, + total_partner_b_fee, + total_position, + created_at, + updated_at + ) VALUES ( + @pool, + @total_lp_a_fee, + @total_lp_b_fee, + @total_protocol_a_fee, + @total_protocol_b_fee, + @total_partner_a_fee, + @total_partner_b_fee, + @total_position, + NOW(), + NOW() + ) + ON CONFLICT (pool) DO UPDATE SET + total_lp_a_fee = EXCLUDED.total_lp_a_fee, + total_lp_b_fee = EXCLUDED.total_lp_b_fee, + total_protocol_a_fee = EXCLUDED.total_protocol_a_fee, + total_protocol_b_fee = EXCLUDED.total_protocol_b_fee, + total_partner_a_fee = EXCLUDED.total_partner_a_fee, + total_partner_b_fee = EXCLUDED.total_partner_b_fee, + total_position = EXCLUDED.total_position, + updated_at = NOW() + ` + + _, err := db.Exec(ctx, sqlMetrics, pgx.NamedArgs{ + "pool": account.String(), + "total_lp_a_fee": metrics.TotalLpAFee, + "total_lp_b_fee": metrics.TotalLpBFee, + "total_protocol_a_fee": metrics.TotalProtocolAFee, + "total_protocol_b_fee": metrics.TotalProtocolBFee, + "total_partner_a_fee": metrics.TotalPartnerAFee, + "total_partner_b_fee": metrics.TotalPartnerBFee, + "total_position": metrics.TotalPosition, + }) + return err +} + +func upsertDammV2PoolFees( + ctx context.Context, + db database.DBTX, + account solana.PublicKey, + fees *meteora_damm_v2.PoolFeesStruct, +) error { + sqlFees := ` + INSERT INTO sol_meteora_damm_v2_pool_fees ( + pool, + partner_fee_percent, + protocol_fee_percent, + referral_fee_percent, + created_at, + updated_at + ) VALUES ( + @pool, + @partner_fee_percent, + @protocol_fee_percent, + @referral_fee_percent, + NOW(), + NOW() + ) + ON CONFLICT (pool) DO UPDATE SET + partner_fee_percent = EXCLUDED.partner_fee_percent, + protocol_fee_percent = EXCLUDED.protocol_fee_percent, + referral_fee_percent = EXCLUDED.referral_fee_percent, + updated_at = NOW() + ` + + _, err := db.Exec(ctx, sqlFees, pgx.NamedArgs{ + "pool": account.String(), + "partner_fee_percent": fees.PartnerFeePercent, + "protocol_fee_percent": fees.ProtocolFeePercent, + "referral_fee_percent": fees.ReferralFeePercent, + }) + return err +} + +func upsertDammV2PoolBaseFee( + ctx context.Context, + db database.DBTX, + account solana.PublicKey, + baseFee *meteora_damm_v2.BaseFeeStruct, +) error { + sqlBaseFee := ` + INSERT INTO sol_meteora_damm_v2_pool_base_fees ( + pool, + cliff_fee_numerator, + fee_scheduler_mode, + number_of_period, + period_frequency, + reduction_factor, + created_at, + updated_at + ) VALUES ( + @pool, + @cliff_fee_numerator, + @fee_scheduler_mode, + @number_of_period, + @period_frequency, + @reduction_factor, + NOW(), + NOW() + ) + ON CONFLICT (pool) DO UPDATE SET + cliff_fee_numerator = EXCLUDED.cliff_fee_numerator, + fee_scheduler_mode = EXCLUDED.fee_scheduler_mode, + number_of_period = EXCLUDED.number_of_period, + period_frequency = EXCLUDED.period_frequency, + reduction_factor = EXCLUDED.reduction_factor, + updated_at = NOW() + ` + + _, err := db.Exec(ctx, sqlBaseFee, pgx.NamedArgs{ + "pool": account.String(), + "cliff_fee_numerator": baseFee.CliffFeeNumerator, + "fee_scheduler_mode": baseFee.FeeSchedulerMode, + "number_of_period": baseFee.NumberOfPeriod, + "period_frequency": baseFee.PeriodFrequency, + "reduction_factor": baseFee.ReductionFactor, + }) + return err +} + +func upsertDammV2PoolDynamicFee( + ctx context.Context, + db database.DBTX, + account solana.PublicKey, + dynamicFee *meteora_damm_v2.DynamicFeeStruct, +) error { + sqlDynamicFee := ` + INSERT INTO sol_meteora_damm_v2_pool_dynamic_fees ( + pool, + initialized, + max_volatility_accumulator, + variable_fee_control, + bin_step, + filter_period, + decay_period, + reduction_factor, + last_update_timestamp, + bin_step_u128, + sqrt_price_reference, + volatility_accumulator, + volatility_reference, + created_at, + updated_at + ) VALUES ( + @pool, + @initialized, + @max_volatility_accumulator, + @variable_fee_control, + @bin_step, + @filter_period, + @decay_period, + @reduction_factor, + @last_update_timestamp, + @bin_step_u128, + @sqrt_price_reference, + @volatility_accumulator, + @volatility_reference, + NOW(), + NOW() + ) + ON CONFLICT (pool) DO UPDATE SET + initialized = EXCLUDED.initialized, + max_volatility_accumulator = EXCLUDED.max_volatility_accumulator, + variable_fee_control = EXCLUDED.variable_fee_control, + bin_step = EXCLUDED.bin_step, + filter_period = EXCLUDED.filter_period, + decay_period = EXCLUDED.decay_period, + reduction_factor = EXCLUDED.reduction_factor, + last_update_timestamp = EXCLUDED.last_update_timestamp, + bin_step_u128 = EXCLUDED.bin_step_u128, + sqrt_price_reference = EXCLUDED.sqrt_price_reference, + volatility_accumulator = EXCLUDED.volatility_accumulator, + volatility_reference = EXCLUDED.volatility_reference, + updated_at = NOW() + ` + + _, err := db.Exec(ctx, sqlDynamicFee, pgx.NamedArgs{ + "pool": account.String(), + "initialized": dynamicFee.Initialized, + "max_volatility_accumulator": dynamicFee.MaxVolatilityAccumulator, + "variable_fee_control": dynamicFee.VariableFeeControl, + "bin_step": dynamicFee.BinStep, + "filter_period": dynamicFee.FilterPeriod, + "decay_period": dynamicFee.DecayPeriod, + "reduction_factor": dynamicFee.ReductionFactor, + "last_update_timestamp": dynamicFee.LastUpdateTimestamp, + "bin_step_u128": dynamicFee.BinStepU128, + "sqrt_price_reference": dynamicFee.SqrtPriceReference, + "volatility_accumulator": dynamicFee.VolatilityAccumulator, + "volatility_reference": dynamicFee.VolatilityReference, + }) + return err +} + +func upsertDammV2Position( + ctx context.Context, + db database.DBTX, + account solana.PublicKey, + position *meteora_damm_v2.PositionState, +) error { + sql := ` + INSERT INTO sol_meteora_damm_v2_positions ( + address, + pool, + nft_mint, + fee_a_per_token_checkpoint, + fee_b_per_token_checkpoint, + fee_a_pending, + fee_b_pending, + unlocked_liquidity, + vested_liquidity, + permanent_locked_liquidity, + updated_at, + created_at + ) VALUES ( + @address, + @pool, + @nft_mint, + @fee_a_per_token_checkpoint, + @fee_b_per_token_checkpoint, + @fee_a_pending, + @fee_b_pending, + @unlocked_liquidity, + @vested_liquidity, + @permanent_locked_liquidity, + NOW(), + NOW() + ) + ON CONFLICT (address) DO UPDATE SET + pool = EXCLUDED.pool, + nft_mint = EXCLUDED.nft_mint, + fee_a_per_token_checkpoint = EXCLUDED.fee_a_per_token_checkpoint, + fee_b_per_token_checkpoint = EXCLUDED.fee_b_per_token_checkpoint, + fee_a_pending = EXCLUDED.fee_a_pending, + fee_b_pending = EXCLUDED.fee_b_pending, + unlocked_liquidity = EXCLUDED.unlocked_liquidity, + vested_liquidity = EXCLUDED.vested_liquidity, + permanent_locked_liquidity = EXCLUDED.permanent_locked_liquidity, + updated_at = NOW() + ` + + _, err := db.Exec(ctx, sql, pgx.NamedArgs{ + "address": account.String(), + "pool": position.Pool.String(), + "nft_mint": position.NftMint.String(), + "fee_a_per_token_checkpoint": position.FeeAPerTokenCheckpoint, + "fee_b_per_token_checkpoint": position.FeeBPerTokenCheckpoint, + "fee_a_pending": position.FeeAPending, + "fee_b_pending": position.FeeBPending, + "unlocked_liquidity": position.UnlockedLiquidity.BigInt(), + "vested_liquidity": position.VestedLiquidity.BigInt(), + "permanent_locked_liquidity": position.PermanentLockedLiquidity.BigInt(), + }) + return err +} + +func upsertDammV2PositionMetrics( + ctx context.Context, + db database.DBTX, + account solana.PublicKey, + metrics *meteora_damm_v2.PositionMetrics, +) error { + sql := ` + INSERT INTO sol_meteora_damm_v2_position_metrics ( + position, + total_claimed_a_fee, + total_claimed_b_fee, + created_at, + updated_at + ) VALUES ( + @position, + @total_claimed_a_fee, + @total_claimed_b_fee, + NOW(), + NOW() + ) + ON CONFLICT (position) DO UPDATE SET + total_claimed_a_fee = EXCLUDED.total_claimed_a_fee, + total_claimed_b_fee = EXCLUDED.total_claimed_b_fee, + updated_at = NOW() + ` + + _, err := db.Exec(ctx, sql, pgx.NamedArgs{ + "position": account.String(), + "total_claimed_a_fee": metrics.TotalClaimedAFee, + "total_claimed_b_fee": metrics.TotalClaimedBFee, + }) + return err +} diff --git a/solana/indexer/dbc.go b/solana/indexer/dbc.go new file mode 100644 index 00000000..b1be576e --- /dev/null +++ b/solana/indexer/dbc.go @@ -0,0 +1,208 @@ +package indexer + +import ( + "context" + "fmt" + "strings" + "time" + + "api.audius.co/database" + "api.audius.co/solana/spl/programs/meteora_damm_v2" + "api.audius.co/solana/spl/programs/meteora_dbc" + "github.com/gagliardetto/solana-go" + "github.com/jackc/pgx/v5" + "go.uber.org/zap" +) + +func processDbcInstruction( + ctx context.Context, + db database.DBTX, + rpcClient RpcClient, + slot uint64, + tx *solana.Transaction, + instructionIndex int, + instruction solana.CompiledInstruction, + signature string, + instLogger *zap.Logger, +) error { + accounts, err := instruction.ResolveInstructionAccounts(&tx.Message) + if err != nil { + return fmt.Errorf("error resolving instruction accounts %d: %w", instructionIndex, err) + } + + inst, err := meteora_dbc.DecodeInstruction(accounts, []byte(instruction.Data)) + if err != nil { + // Ignore unknown instruction types. + // Not all DBC instruction types are implemented yet. + // See: solana/spl/programs/meteora_dbc/instruction.go + // See: https://github.com/gagliardetto/binary/blob/v0.8.0/variant.go#L315 + if strings.Contains(err.Error(), "no known type for type") { + return nil // ignore unknown instruction types + } + return fmt.Errorf("error decoding meteora_dbc instruction %d: %w", instructionIndex, err) + } + + switch inst.TypeID { + case meteora_dbc.InstructionImplDef.TypeID(meteora_dbc.Instruction_MigrationDammV2): + { + if migrationInst, ok := inst.Impl.(*meteora_dbc.MigrationDammV2); ok { + err := insertDbcMigration(ctx, db, dbcMigrationRow{ + signature: signature, + instructionIndex: instructionIndex, + slot: slot, + dbcPool: migrationInst.GetVirtualPool().PublicKey.String(), + migrationMetadata: migrationInst.GetMigrationMetadata().PublicKey.String(), + config: migrationInst.GetConfig().PublicKey.String(), + dbcPoolAuthority: migrationInst.GetPoolAuthority().PublicKey.String(), + dammV2Pool: migrationInst.GetPool().PublicKey.String(), + firstPositionNftMint: migrationInst.GetFirstPositionNftMint().PublicKey.String(), + firstPositionNftAccount: migrationInst.GetFirstPositionNftAccount().PublicKey.String(), + firstPosition: migrationInst.GetFirstPosition().PublicKey.String(), + secondPositionNftMint: migrationInst.GetSecondPositionNftMint().PublicKey.String(), + secondPositionNftAccount: migrationInst.GetSecondPositionNftAccount().PublicKey.String(), + secondPosition: migrationInst.GetSecondPosition().PublicKey.String(), + dammPoolAuthority: migrationInst.GetPoolAuthority().PublicKey.String(), + baseMint: migrationInst.GetBaseMint().PublicKey.String(), + quoteMint: migrationInst.GetQuoteMint().PublicKey.String(), + }) + if err != nil { + return fmt.Errorf("failed to insert dbc migration at instruction %d: %w", instructionIndex, err) + } + instLogger.Info("dbc migrationDammV2", + zap.String("mint", migrationInst.GetBaseMint().PublicKey.String()), + zap.String("dbcPool", migrationInst.GetVirtualPool().PublicKey.String()), + zap.String("dammV2Pool", migrationInst.GetPool().PublicKey.String()), + ) + + // Also index the pool and positions + + var dammPool meteora_damm_v2.Pool + err = withRetries(func() error { + return rpcClient.GetAccountDataBorshInto(ctx, migrationInst.GetPool().PublicKey, &dammPool) + }, 5, time.Second*1) + if err != nil { + return fmt.Errorf("failed to get damm v2 pool account data after retries: %w", err) + } else { + err = upsertDammV2Pool(ctx, db, migrationInst.GetPool().PublicKey, &dammPool) + if err != nil { + return fmt.Errorf("failed to upsert damm v2 pool: %w", err) + } + } + + var firstPosition meteora_damm_v2.PositionState + err = withRetries(func() error { + return rpcClient.GetAccountDataBorshInto(ctx, migrationInst.GetFirstPosition().PublicKey, &firstPosition) + }, 5, time.Second*1) + if err != nil { + return fmt.Errorf("failed to get first damm v2 position account data: %w", err) + } else { + err = upsertDammV2Position(ctx, db, migrationInst.GetFirstPosition().PublicKey, &firstPosition) + if err != nil { + return fmt.Errorf("failed to upsert first damm v2 position: %w", err) + } + } + + var secondPosition meteora_damm_v2.PositionState + err = withRetries(func() error { + return rpcClient.GetAccountDataBorshInto(ctx, migrationInst.GetSecondPosition().PublicKey, &secondPosition) + }, 5, time.Second*1) + if err != nil { + return fmt.Errorf("failed to get second damm v2 position account data: %w", err) + } else { + err = upsertDammV2Position(ctx, db, migrationInst.GetSecondPosition().PublicKey, &secondPosition) + if err != nil { + return fmt.Errorf("failed to upsert second damm v2 position: %w", err) + } + } + } + } + } + return nil +} + +type dbcMigrationRow struct { + signature string + instructionIndex int + slot uint64 + dbcPool string + migrationMetadata string + config string + dbcPoolAuthority string + dammV2Pool string + firstPositionNftMint string + firstPositionNftAccount string + firstPosition string + secondPositionNftMint string + secondPositionNftAccount string + secondPosition string + dammPoolAuthority string + baseMint string + quoteMint string +} + +func insertDbcMigration(ctx context.Context, db database.DBTX, row dbcMigrationRow) error { + sql := ` + INSERT INTO sol_meteora_dbc_migrations ( + signature, + instruction_index, + slot, + dbc_pool, + migration_metadata, + config, + dbc_pool_authority, + damm_v2_pool, + first_position_nft_mint, + first_position_nft_account, + first_position, + second_position_nft_mint, + second_position_nft_account, + second_position, + damm_pool_authority, + base_mint, + quote_mint, + created_at, + updated_at + ) VALUES ( + @signature, + @instructionIndex, + @slot, + @dbcPool, + @migrationMetadata, + @config, + @dbcPoolAuthority, + @dammV2Pool, + @firstPositionNftMint, + @firstPositionNftAccount, + @firstPosition, + @secondPositionNftMint, + @secondPositionNftAccount, + @secondPosition, + @dammPoolAuthority, + @baseMint, + @quoteMint, + NOW(), + NOW() + ) + ON CONFLICT DO NOTHING + ` + _, err := db.Exec(ctx, sql, pgx.NamedArgs{ + "signature": row.signature, + "instructionIndex": row.instructionIndex, + "slot": row.slot, + "dbcPool": row.dbcPool, + "migrationMetadata": row.migrationMetadata, + "config": row.config, + "dbcPoolAuthority": row.dbcPoolAuthority, + "dammV2Pool": row.dammV2Pool, + "firstPositionNftMint": row.firstPositionNftMint, + "firstPositionNftAccount": row.firstPositionNftAccount, + "firstPosition": row.firstPosition, + "secondPositionNftMint": row.secondPositionNftMint, + "secondPositionNftAccount": row.secondPositionNftAccount, + "secondPosition": row.secondPosition, + "dammPoolAuthority": row.dammPoolAuthority, + "baseMint": row.baseMint, + "quoteMint": row.quoteMint, + }) + return err +} diff --git a/solana/indexer/processor.go b/solana/indexer/processor.go index aecd5be8..c59ddfd4 100644 --- a/solana/indexer/processor.go +++ b/solana/indexer/processor.go @@ -8,6 +8,7 @@ import ( "api.audius.co/config" "api.audius.co/database" "api.audius.co/solana/spl/programs/claimable_tokens" + "api.audius.co/solana/spl/programs/meteora_dbc" "api.audius.co/solana/spl/programs/payment_router" "api.audius.co/solana/spl/programs/reward_manager" "github.com/gagliardetto/solana-go" @@ -61,32 +62,33 @@ func (p *DefaultProcessor) ProcessSignature(ctx context.Context, slot uint64, tx // Check if the transaction is in the cache if p.transactionCache != nil { - if _, ok := p.transactionCache.Get(txSig); ok { + if res, ok := p.transactionCache.Get(txSig); ok { logger.Debug("cache hit") - // If we hit the cache, it's already been processed - return nil + txRes = res } else { logger.Debug("cache miss") } } - // If the transaction is not in the cache, fetch it from the RPC - res, err := withRetries(func() (*rpc.GetTransactionResult, error) { - return p.rpcClient.GetTransaction( - ctx, - txSig, - &rpc.GetTransactionOpts{ - Commitment: rpc.CommitmentConfirmed, - MaxSupportedTransactionVersion: &rpc.MaxSupportedTransactionVersion0, - }, - ) - }, 5, 1*time.Second) - if err != nil { - return fmt.Errorf("failed to get transaction: %w", err) - } - if p.transactionCache != nil { - p.transactionCache.Set(txSig, res) - txRes = res + if txRes == nil { + // If the transaction is not in the cache, fetch it from the RPC + res, err := withRetriesResult(func() (*rpc.GetTransactionResult, error) { + return p.rpcClient.GetTransaction( + ctx, + txSig, + &rpc.GetTransactionOpts{ + Commitment: rpc.CommitmentConfirmed, + MaxSupportedTransactionVersion: &rpc.MaxSupportedTransactionVersion0, + }, + ) + }, 5, 1*time.Second) + if err != nil { + return fmt.Errorf("failed to get transaction: %w", err) + } + if p.transactionCache != nil { + p.transactionCache.Set(txSig, res) + txRes = res + } } tx, err := txRes.Transaction.GetTransaction() @@ -175,6 +177,13 @@ func (p *DefaultProcessor) ProcessTransaction( return fmt.Errorf("error processing payment_router instruction %d: %w", instructionIndex, err) } } + case meteora_dbc.ProgramID: + { + err := processDbcInstruction(ctx, p.pool, p.rpcClient, slot, tx, instructionIndex, instruction, signature, instLogger) + if err != nil { + return fmt.Errorf("error processing meteora_dbc instruction %d: %w", instructionIndex, err) + } + } } } diff --git a/solana/indexer/solana_indexer.go b/solana/indexer/solana_indexer.go index f2edef52..4a174da6 100644 --- a/solana/indexer/solana_indexer.go +++ b/solana/indexer/solana_indexer.go @@ -3,11 +3,9 @@ package indexer import ( "context" "fmt" - "time" "api.audius.co/config" "api.audius.co/database" - "api.audius.co/jobs" "api.audius.co/logging" "github.com/gagliardetto/solana-go" "github.com/gagliardetto/solana-go/rpc" @@ -43,6 +41,8 @@ type SolanaIndexer struct { pool database.DbPool workerCount int32 + dammV2Indexer *DammV2Indexer + checkpointId string logger *zap.Logger @@ -77,6 +77,16 @@ func New(config config.Config) *SolanaIndexer { MaxReconnectAttempts: 5, }) + dammV2Indexer := &DammV2Indexer{ + pool: pool, + grpcConfig: GrpcConfig{ + Server: config.SolanaConfig.GrpcProvider, + ApiToken: config.SolanaConfig.GrpcToken, + MaxReconnectAttempts: 5, + }, + logger: logger, + } + s := &SolanaIndexer{ rpcClient: rpcClient, grpcClient: grpcClient, @@ -84,6 +94,9 @@ func New(config config.Config) *SolanaIndexer { config: config, pool: pool, workerCount: workerCount, + + dammV2Indexer: dammV2Indexer, + processor: NewDefaultProcessor( rpcClient, pool, @@ -97,15 +110,17 @@ func New(config config.Config) *SolanaIndexer { func (s *SolanaIndexer) Start(ctx context.Context) error { go s.ScheduleRetries(ctx, s.config.SolanaIndexerRetryInterval) - statsJob := jobs.NewCoinStatsJob(s.config, s.pool) - statsCtx := context.WithoutCancel(ctx) - statsJob.ScheduleEvery(statsCtx, 5*time.Minute) - go statsJob.Run(statsCtx) + // statsJob := jobs.NewCoinStatsJob(s.config, s.pool) + // statsCtx := context.WithoutCancel(ctx) + // statsJob.ScheduleEvery(statsCtx, 5*time.Minute) + // go statsJob.Run(statsCtx) + + // dbcJob := jobs.NewCoinDBCJob(s.config, s.pool) + // dbcCtx := context.WithoutCancel(ctx) + // dbcJob.ScheduleEvery(dbcCtx, 5*time.Minute) + // go dbcJob.Run(dbcCtx) - dbcJob := jobs.NewCoinDBCJob(s.config, s.pool) - dbcCtx := context.WithoutCancel(ctx) - dbcJob.ScheduleEvery(dbcCtx, 5*time.Minute) - go dbcJob.Run(dbcCtx) + go s.dammV2Indexer.Start(ctx) err := s.Subscribe(ctx) if err != nil { diff --git a/solana/indexer/subscription.go b/solana/indexer/subscription.go index 7a3c2976..50729566 100644 --- a/solana/indexer/subscription.go +++ b/solana/indexer/subscription.go @@ -103,7 +103,7 @@ func (s *SolanaIndexer) Subscribe(ctx context.Context) error { return fmt.Errorf("failed to get last indexed slot: %w", err) } - latestSlot, err := withRetries(func() (uint64, error) { + latestSlot, err := withRetriesResult(func() (uint64, error) { return s.rpcClient.GetSlot(ctx, "confirmed") }, 5, time.Second*2) if err != nil { @@ -221,7 +221,7 @@ func buildSubscriptionRequest(mintAddresses []string, dbcPoolConfigs []string) ( for _, config := range dbcPoolConfigs { dbcFilter := pb.SubscribeRequestFilterAccounts{ - Owner: []string{meteora_dbc.DbcProgramID.String()}, + Owner: []string{meteora_dbc.ProgramID.String()}, Filters: []*pb.SubscribeRequestFilterAccountsFilter{ { Filter: &pb.SubscribeRequestFilterAccountsFilter_Memcmp{ @@ -266,7 +266,6 @@ func (s *SolanaIndexer) handleMessage(ctx context.Context, msg *pb.SubscribeUpda if slotUpdate := msg.GetSlot(); slotUpdate != nil && slotUpdate.Slot > 0 { // only update every 10 slots to reduce db load and write latency if slotUpdate.Slot%10 == 0 { - s.logger.Debug("slot update", zap.Uint64("slot", slotUpdate.Slot)) err := updateCheckpoint(ctx, s.pool, s.checkpointId, slotUpdate.Slot) if err != nil { logger.Error("failed to update slot checkpoint", zap.Error(err)) diff --git a/solana/indexer/unprocessed_transactions.go b/solana/indexer/unprocessed_transactions.go index 7e6b90c3..d44900a7 100644 --- a/solana/indexer/unprocessed_transactions.go +++ b/solana/indexer/unprocessed_transactions.go @@ -15,28 +15,24 @@ func (s *SolanaIndexer) ScheduleRetries(ctx context.Context, interval time.Durat ticker := time.NewTicker(interval) defer ticker.Stop() - go func() { - for { - select { - case <-ctx.Done(): - s.logger.Info("context cancelled, stopping retry ticker") - return - case <-ticker.C: - err := s.RetryUnprocessedTransactions(ctx) - if err != nil { - s.logger.Error("failed to retry unprocessed transactions", zap.Error(err)) - } + for { + select { + case <-ctx.Done(): + s.logger.Info("context cancelled, stopping retry ticker") + return + case <-ticker.C: + err := s.RetryUnprocessedTransactions(ctx) + if err != nil { + s.logger.Error("failed to retry unprocessed transactions", zap.Error(err)) } } - }() + } } func (s *SolanaIndexer) RetryUnprocessedTransactions(ctx context.Context) error { limit := 100 offset := 0 - logger := s.logger.With( - zap.String("indexerSource", "retryUnprocessedTransactions"), - ) + logger := s.logger.Named("RetryUnprocessedTransactions") count := 0 start := time.Now() logger.Debug("starting retry of unprocessed transactions...") diff --git a/solana/indexer/utils.go b/solana/indexer/utils.go index f2be0dad..3716f440 100644 --- a/solana/indexer/utils.go +++ b/solana/indexer/utils.go @@ -9,7 +9,21 @@ import ( "github.com/jackc/pgx/v5" ) -func withRetries[T any](f func() (T, error), maxRetries int, interval time.Duration) (T, error) { +func withRetries(f func() error, maxRetries int, interval time.Duration) error { + err := f() + retries := 0 + for err != nil && retries < maxRetries { + time.Sleep(interval) + err = f() + retries++ + } + if err != nil { + return fmt.Errorf("retry failed: %w", err) + } + return nil +} + +func withRetriesResult[T any](f func() (T, error), maxRetries int, interval time.Duration) (T, error) { result, err := f() retries := 0 for err != nil && retries < maxRetries { diff --git a/solana/spl/programs/meteora_damm_v2/accounts.go b/solana/spl/programs/meteora_damm_v2/accounts.go new file mode 100644 index 00000000..21c54f0a --- /dev/null +++ b/solana/spl/programs/meteora_damm_v2/accounts.go @@ -0,0 +1,13 @@ +package meteora_damm_v2 + +import "github.com/gagliardetto/solana-go" + +// Derives the position PDA from a position NFT mint +func DerivePositionPDA(positionNft solana.PublicKey) (solana.PublicKey, error) { + seeds := [][]byte{[]byte("position"), positionNft.Bytes()} + address, _, err := solana.FindProgramAddress(seeds, ProgramID) + if err != nil { + return solana.PublicKey{}, err + } + return address, nil +} diff --git a/solana/spl/programs/meteora_damm_v2/client.go b/solana/spl/programs/meteora_damm_v2/client.go new file mode 100644 index 00000000..0775ede5 --- /dev/null +++ b/solana/spl/programs/meteora_damm_v2/client.go @@ -0,0 +1,113 @@ +package meteora_damm_v2 + +import ( + "context" + "math/big" + "sort" + + bin "github.com/gagliardetto/binary" + "github.com/gagliardetto/solana-go" + "github.com/gagliardetto/solana-go/programs/token" + "github.com/gagliardetto/solana-go/rpc" + "go.uber.org/zap" +) + +type RpcClient interface { + GetAccountDataBorshInto(ctx context.Context, account solana.PublicKey, out interface{}) error +} + +type Client struct { + client *rpc.Client + logger *zap.Logger +} + +func NewClient( + client *rpc.Client, + logger *zap.Logger, +) *Client { + return &Client{ + client: client, + logger: logger, + } +} + +// Gets the current Pool state. +func (c *Client) GetPool(ctx context.Context, account solana.PublicKey) (*Pool, error) { + var pool Pool + err := c.client.GetAccountDataBorshInto(ctx, account, &pool) + if err != nil { + return nil, err + } + return &pool, nil +} + +// Gets a position by its address. +func (c *Client) GetPosition(ctx context.Context, account solana.PublicKey) (*PositionState, error) { + var position PositionState + err := c.client.GetAccountDataBorshInto(ctx, account, &position) + if err != nil { + return nil, err + } + return &position, nil +} + +// Gets all position NFTs held by a wallet. +func (c *Client) GetPositionNFTs(ctx context.Context, owner solana.PublicKey) ([]solana.PublicKey, error) { + accounts, err := c.client.GetTokenAccountsByOwner(ctx, owner, &rpc.GetTokenAccountsConfig{ + ProgramId: &solana.Token2022ProgramID, + }, &rpc.GetTokenAccountsOpts{}) + if err != nil { + return nil, err + } + + var positionNFTs []solana.PublicKey + for _, acc := range accounts.Value { + data := token.Account{} + bin.NewBorshDecoder(acc.Account.Data.GetBinary()).Decode(&data) + if data.Amount == uint64(1) { + positionNFTs = append(positionNFTs, data.Mint) + } + } + + return positionNFTs, nil +} + +// Gets all the positions by an owner, sorted by total liquidity (descending). +func (c *Client) GetPositionsByOwner(ctx context.Context, owner solana.PublicKey) ([]*PositionState, error) { + positionNFTs, err := c.GetPositionNFTs(ctx, owner) + if err != nil { + return nil, err + } + + var positions []*PositionState + for _, nft := range positionNFTs { + pda, err := DerivePositionPDA(nft) + if err != nil { + return nil, err + } + position, err := c.GetPosition(ctx, pda) + if err != nil { + return nil, err + } + positions = append(positions, position) + } + + // Sort positions by total liquidity + sort.Slice(positions, func(i, j int) bool { + vestedLiquidity := (&big.Int{}).SetBytes(positions[i].VestedLiquidity.Bytes()) + permanentLockedLiquidity := (&big.Int{}).SetBytes(positions[i].PermanentLockedLiquidity.Bytes()) + unlockedLiquidity := (&big.Int{}).SetBytes(positions[i].UnlockedLiquidity.Bytes()) + totalLiquidityI := (&big.Int{}).Add(vestedLiquidity, permanentLockedLiquidity) + totalLiquidityI.Add(totalLiquidityI, unlockedLiquidity) + + vestedLiquidity = (&big.Int{}).SetBytes(positions[j].VestedLiquidity.Bytes()) + permanentLockedLiquidity = (&big.Int{}).SetBytes(positions[j].PermanentLockedLiquidity.Bytes()) + unlockedLiquidity = (&big.Int{}).SetBytes(positions[j].UnlockedLiquidity.Bytes()) + totalLiquidityJ := (&big.Int{}).Add(vestedLiquidity, permanentLockedLiquidity) + totalLiquidityJ.Add(totalLiquidityJ, unlockedLiquidity) + + return totalLiquidityJ.Cmp(totalLiquidityI) < 0 + }) + + return positions, nil +} diff --git a/solana/spl/programs/meteora_damm_v2/client_test.go b/solana/spl/programs/meteora_damm_v2/client_test.go new file mode 100644 index 00000000..12f1266a --- /dev/null +++ b/solana/spl/programs/meteora_damm_v2/client_test.go @@ -0,0 +1,24 @@ +package meteora_damm_v2_test + +import ( + "context" + "testing" + + "api.audius.co/solana/spl/programs/meteora_damm_v2" + "github.com/gagliardetto/solana-go" + "github.com/gagliardetto/solana-go/rpc" + "github.com/stretchr/testify/require" + "go.uber.org/zap" +) + +func TestActualFetch(t *testing.T) { + ctx := context.Background() + client := meteora_damm_v2.NewClient(rpc.New(rpc.MainNetBeta_RPC), zap.NewNop()) + + owner, err := solana.PublicKeyFromBase58("EF1zneAqA2mwjkD3Lj7sQnMhR2uorGqEHXNtAWfGdCu2") + require.NoError(t, err) + + positions, err := client.GetPositionsByOwner(ctx, owner) + require.NoError(t, err) + require.Greater(t, len(positions), 0) +} diff --git a/solana/spl/programs/meteora_damm_v2/instruction.go b/solana/spl/programs/meteora_damm_v2/instruction.go new file mode 100644 index 00000000..3e6d1d5d --- /dev/null +++ b/solana/spl/programs/meteora_damm_v2/instruction.go @@ -0,0 +1,9 @@ +package meteora_damm_v2 + +import "github.com/gagliardetto/solana-go" + +var ProgramID = solana.MustPublicKeyFromBase58("cpamdpZCGKUy5JxQXB4dcpGPiikHawvSWAd6mEn1sGG") + +func SetProgramID(pubkey solana.PublicKey) { + ProgramID = pubkey +} diff --git a/solana/spl/programs/meteora_damm_v2/types.go b/solana/spl/programs/meteora_damm_v2/types.go new file mode 100644 index 00000000..65cd22ee --- /dev/null +++ b/solana/spl/programs/meteora_damm_v2/types.go @@ -0,0 +1,131 @@ +package meteora_damm_v2 + +import ( + bin "github.com/gagliardetto/binary" + "github.com/gagliardetto/solana-go" +) + +type BaseFeeStruct struct { + CliffFeeNumerator uint64 + FeeSchedulerMode uint8 + Padding0 [5]uint8 + NumberOfPeriod uint16 + PeriodFrequency uint64 + ReductionFactor uint64 + Padding1 uint64 +} + +type DynamicFeeStruct struct { + Initialized uint8 + Padding [7]uint8 + MaxVolatilityAccumulator uint32 + VariableFeeControl uint32 + BinStep uint16 + FilterPeriod uint16 + DecayPeriod uint16 + ReductionFactor uint16 + LastUpdateTimestamp uint64 + BinStepU128 bin.Uint128 + SqrtPriceReference bin.Uint128 + VolatilityAccumulator bin.Uint128 + VolatilityReference bin.Uint128 +} + +type PoolFeesStruct struct { + BaseFee BaseFeeStruct + ProtocolFeePercent uint8 + PartnerFeePercent uint8 + ReferralFeePercent uint8 + Padding0 [5]uint8 + DynamicFee DynamicFeeStruct + Padding1 [2]uint64 +} + +type PoolMetrics struct { + TotalLpAFee bin.Uint128 + TotalLpBFee bin.Uint128 + TotalProtocolAFee uint64 + TotalProtocolBFee uint64 + TotalPartnerAFee uint64 + TotalPartnerBFee uint64 + TotalPosition uint64 + Padding uint64 +} + +type RewardInfo struct { + Initialized uint8 + RewardTokenFlag uint8 + Padding0 [6]uint8 + Padding1 [8]uint8 + Mint solana.PublicKey + Vault solana.PublicKey + Funder solana.PublicKey + RewardDuration uint64 + RewardDurationEnd uint64 + RewardRate bin.Uint128 + RewardPerTokenStored [32]uint8 + LastUpdateTime uint64 + CumulativeSecondsWithEmptyLiquidity uint64 +} + +type Pool struct { + Discriminator [8]uint8 + PoolFees PoolFeesStruct + TokenAMint solana.PublicKey + TokenBMint solana.PublicKey + TokenAVault solana.PublicKey + TokenBVault solana.PublicKey + WhitelistedVault solana.PublicKey + Partner solana.PublicKey + Liquidity bin.Uint128 + Padding bin.Uint128 + ProtocolAFee uint64 + ProtocolBFee uint64 + PartnerAFee uint64 + PartnerBFee uint64 + SqrtMinPrice bin.Uint128 + SqrtMaxPrice bin.Uint128 + SqrtPrice bin.Uint128 + ActivationPoint uint64 + ActivationType uint8 + PoolStatus uint8 + TokenAFlag uint8 + TokenBFlag uint8 + CollectFeeMode uint8 + PoolType uint8 + Padding0 [2]uint8 + FeeAPerLiquidity Uint256LE + FeeBPerLiquidity Uint256LE + PermanentLockLiquidity bin.Uint128 + Metrics PoolMetrics + Creator solana.PublicKey + Padding1 [6]uint64 + RewardInfos [2]RewardInfo +} + +type PositionMetrics struct { + TotalClaimedAFee uint64 + TotalClaimedBFee uint64 +} + +type UserRewardInfo struct { + RewardPerTokenCheckpoint [32]uint8 + RewardPendings uint64 + TotalClaimedRewards uint64 +} + +type PositionState struct { + Discriminator [8]uint8 + Pool solana.PublicKey + NftMint solana.PublicKey + FeeAPerTokenCheckpoint Uint256LE + FeeBPerTokenCheckpoint Uint256LE + FeeAPending uint64 + FeeBPending uint64 + UnlockedLiquidity bin.Uint128 + VestedLiquidity bin.Uint128 + PermanentLockedLiquidity bin.Uint128 + Metrics PositionMetrics + RewardInfos [2]UserRewardInfo + Padding [6]bin.Uint128 +} diff --git a/solana/spl/programs/meteora_damm_v2/types_test.go b/solana/spl/programs/meteora_damm_v2/types_test.go new file mode 100644 index 00000000..a08b148c --- /dev/null +++ b/solana/spl/programs/meteora_damm_v2/types_test.go @@ -0,0 +1,46 @@ +package meteora_damm_v2 + +import ( + "encoding/base64" + "testing" + + bin "github.com/gagliardetto/binary" + "github.com/test-go/testify/assert" + "github.com/test-go/testify/require" +) + +func TestDecodingPool(t *testing.T) { + // Example data from mainnet + // Source: https://explorer.solana.com/address/D9iJqMbgQJLFt5PAAiTJTMNsMAMueukzoe1EK2r1g3WH + data := "8ZptBBGxbbyAlpgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAFAAUAAAAAAABAAAAAAAAAGCk3AC8AwAAAQAKAHgAiBPGROhoAAAAAMsQx7q4jQYAAAAAAAAAAAChIqYBNRzVAQAAAAAAAAAA4CICAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACOkzYTTyijBQphnsA7NYukXXDff56Bp/GJdn5GamlMZ7/DPMLnXBSHbMN5KDkE9JB3ZpESJXuzrf82mLYCJJQHm/3HSrkp1wPzAbe6y0uFypnr4Yeci2kPU8TWr9TEmTY/2aZknQDPJED5N2M3ytBL5gl4lD8TdKznaJkMDHT44AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAANpjaB9yhrzIBnGeLCtQogFXJDv8lmixFSC8U4Q+3NsISFBg5M0NQHAAaMJHGBEGAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAFiinY8UAAAAAAAAAAAAAAAAAAAAAAAAAFA7AQABAAAAAAAAAAAAAACbV2lOqRpchLHE/v8AAAAAIiTN1Ql11QEAAAAAAAAAAIhx5mgAAAAAAQAAAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAJE91kBWow0AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAASFBg5M0NQHAAaMJHGBEGAAAAAAAAAAAAAAAAAAAAAAASYim9UgAAAAAAAAAAAAAAAAAAAAAAAABYop2PFAAAAAAAAAAAAAAAAAAAAAAAAAACAAAAAAAAAAAAAAAAAAAA2mNoH3KGvMgGcZ4sK1CiAVckO/yWaLEVILxThD7c2wgAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=" + bytes, err := base64.StdEncoding.DecodeString(data) + require.NoError(t, err) + + var pool Pool + err = bin.NewBorshDecoder(bytes).Decode(&pool) + require.NoError(t, err) + + assert.Equal(t, int(10000000), int(pool.PoolFees.BaseFee.CliffFeeNumerator)) + assert.Equal(t, "bnWKPK7YTUJTe3A3HTGEJrUEoAddRgRjWSwf7MwxMP3", pool.TokenAMint.String()) + assert.Equal(t, "9LzCMqDgTKYz9Drzqnpgee3SGa89up3a247ypMj2xrqM", pool.TokenBMint.String()) + assert.Equal(t, "31500505798829827035928817465053256", pool.Liquidity.String()) + assert.Equal(t, "3838765547535761", pool.FeeBPerLiquidity.String()) + assert.Equal(t, int(1759932808), int(pool.ActivationPoint)) + // assert.Equal(t, "", pool.FeeBPerLiquidity.String()) +} + +func TestDecodingPositionState(t *testing.T) { + // Example data from mainnet + // Source: https://explorer.solana.com/address/5bYLydDXt1K5zroychcbrVbhGRUpheXdq5w41uccazPB + data := "qryP5HpA99C0h5iaMb9or5qzYmaPKH7cBpP1GTyw5pa9SMlEQMuk4oeLsnqCTyioPLOFt664lEHr2woSYFq4Z3N6xFLWwGDSAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAADUszHGm5oNAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAACQoMPLmBiA4ADThI4wIAwAAAAAAAAAAABGmGkQpAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" + bytes, err := base64.StdEncoding.DecodeString(data) + require.NoError(t, err) + + var position PositionState + err = bin.NewBorshDecoder(bytes).Decode(&position) + require.NoError(t, err) + + assert.Equal(t, "D9iJqMbgQJLFt5PAAiTJTMNsMAMueukzoe1EK2r1g3WH", position.Pool.String()) + assert.Equal(t, "A87b7M7UnQCicj6Ui7ktCL9CoN9xnnLbp3bezoDS26uX", position.NftMint.String()) + assert.Equal(t, "15750252899414913517964408732526628", position.PermanentLockedLiquidity.String()) +} diff --git a/solana/spl/programs/meteora_damm_v2/uint256le.go b/solana/spl/programs/meteora_damm_v2/uint256le.go new file mode 100644 index 00000000..523cb206 --- /dev/null +++ b/solana/spl/programs/meteora_damm_v2/uint256le.go @@ -0,0 +1,49 @@ +package meteora_damm_v2 + +import ( + "database/sql/driver" + "fmt" + "math/big" + + bin "github.com/gagliardetto/binary" +) + +// Struct wrapper for little-endian uint256 (as big.Int). +// Implements Borsh serialization and database Valuer interface +type Uint256LE struct { + big.Int +} + +func (i *Uint256LE) UnmarshalWithDecoder(decoder *bin.Decoder) error { + var b [32]byte + err := decoder.Decode(&b) + if err != nil { + return err + } + i.SetBytes(reverseBytes(b[:])) + return nil +} + +func (i Uint256LE) MarshalWithEncoder(encoder *bin.Encoder) error { + b := i.Bytes() + if len(b) > 32 { + return fmt.Errorf("Int256LE: integer too large to encode") + } + padded := make([]byte, 32) + copy(padded[32-len(b):], b) + _, err := encoder.Write(reverseBytes(padded)) + return err +} + +func (i Uint256LE) Value() (driver.Value, error) { + return i.String(), nil +} + +// reverseBytes reverses a byte slice to match TypeScript Buffer.reverse() behavior +func reverseBytes(b []byte) []byte { + reversed := make([]byte, len(b)) + for i, j := 0, len(b)-1; i < len(b); i, j = i+1, j-1 { + reversed[i] = b[j] + } + return reversed +} diff --git a/solana/spl/programs/meteora_damm_v2/utils.go b/solana/spl/programs/meteora_damm_v2/utils.go new file mode 100644 index 00000000..771cdb39 --- /dev/null +++ b/solana/spl/programs/meteora_damm_v2/utils.go @@ -0,0 +1,27 @@ +package meteora_damm_v2 + +import ( + "math/big" +) + +const LIQUIDITY_SCALE = 128 + +func GetUnclaimedFees(pool *Pool, position *PositionState) (*big.Int, *big.Int) { + totalPositionLiquidity := big.NewInt(0).Add( + big.NewInt(0).Add(position.UnlockedLiquidity.BigInt(), position.VestedLiquidity.BigInt()), + position.PermanentLockedLiquidity.BigInt(), + ) + + feeA := big.NewInt(0).Sub(&pool.FeeAPerLiquidity.Int, &position.FeeAPerTokenCheckpoint.Int) + feeB := big.NewInt(0).Sub(&pool.FeeBPerLiquidity.Int, &position.FeeBPerTokenCheckpoint.Int) + + feeA.Mul(feeA, totalPositionLiquidity) + feeB.Mul(feeB, totalPositionLiquidity) + feeA.Rsh(feeA, LIQUIDITY_SCALE) + feeB.Rsh(feeB, LIQUIDITY_SCALE) + + feeA.Add(feeA, big.NewInt(0).SetUint64(position.FeeAPending)) + feeB.Add(feeB, big.NewInt(0).SetUint64(position.FeeBPending)) + + return feeA, feeB +} diff --git a/solana/spl/programs/meteora_dbc/MigrationDammV2.go b/solana/spl/programs/meteora_dbc/MigrationDammV2.go new file mode 100644 index 00000000..5e6c24aa --- /dev/null +++ b/solana/spl/programs/meteora_dbc/MigrationDammV2.go @@ -0,0 +1,107 @@ +package meteora_dbc + +import "github.com/gagliardetto/solana-go" + +type MigrationDammV2 struct { + solana.AccountMetaSlice `bin:"-" borsh_skip:"true"` +} + +func (inst *MigrationDammV2) GetVirtualPool() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(0) +} + +func (inst *MigrationDammV2) GetMigrationMetadata() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(1) +} + +func (inst *MigrationDammV2) GetConfig() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(2) +} + +func (inst *MigrationDammV2) GetPoolAuthority() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(3) +} + +func (inst *MigrationDammV2) GetPool() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(4) +} + +func (inst *MigrationDammV2) GetFirstPositionNftMint() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(5) +} + +func (inst *MigrationDammV2) GetFirstPositionNftAccount() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(6) +} + +func (inst *MigrationDammV2) GetFirstPosition() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(7) +} + +func (inst *MigrationDammV2) GetSecondPositionNftMint() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(8) +} + +func (inst *MigrationDammV2) GetSecondPositionNftAccount() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(9) +} + +func (inst *MigrationDammV2) GetSecondPosition() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(10) +} + +func (inst *MigrationDammV2) GetDammPoolAuthority() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(11) +} + +func (inst *MigrationDammV2) GetAmmProgram() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(12) +} + +func (inst *MigrationDammV2) GetBaseMint() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(13) +} + +func (inst *MigrationDammV2) GetQuoteMint() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(14) +} + +func (inst *MigrationDammV2) GetTokenAVault() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(15) +} + +func (inst *MigrationDammV2) GetTokenBVault() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(16) +} + +func (inst *MigrationDammV2) GetBaseVault() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(17) +} + +func (inst *MigrationDammV2) GetQuoteVault() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(18) +} + +func (inst *MigrationDammV2) GetPayer() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(19) +} + +func (inst *MigrationDammV2) GetTokenBaseProgram() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(20) +} + +func (inst *MigrationDammV2) GetTokenQuoteProgram() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(21) +} + +func (inst *MigrationDammV2) GetToken2022Program() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(22) +} + +func (inst *MigrationDammV2) GetDammEventAuthority() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(23) +} + +func (inst *MigrationDammV2) GetSystemProgram() *solana.AccountMeta { + return inst.AccountMetaSlice.Get(24) +} diff --git a/solana/spl/programs/meteora_dbc/client.go b/solana/spl/programs/meteora_dbc/client.go index 4e854c7e..824bbfe6 100644 --- a/solana/spl/programs/meteora_dbc/client.go +++ b/solana/spl/programs/meteora_dbc/client.go @@ -7,8 +7,6 @@ import ( "go.uber.org/zap" ) -var DbcProgramID = solana.MustPublicKeyFromBase58("dbcij3LWUppWqq96dh6gJWwBifmcGfLSB5D4DuSMaqN") - type RpcClient interface { GetAccountDataBorshInto(ctx context.Context, account solana.PublicKey, out interface{}) error } diff --git a/solana/spl/programs/meteora_dbc/instruction.go b/solana/spl/programs/meteora_dbc/instruction.go new file mode 100644 index 00000000..fe1baa0b --- /dev/null +++ b/solana/spl/programs/meteora_dbc/instruction.go @@ -0,0 +1,120 @@ +package meteora_dbc + +import ( + "bytes" + "fmt" + + "github.com/davecgh/go-spew/spew" + "github.com/gagliardetto/solana-go" + "github.com/gagliardetto/solana-go/text" + "github.com/gagliardetto/treeout" + + bin "github.com/gagliardetto/binary" +) + +const ( + Instruction_MigrationDammV2 = "migration_damm_v2" +) + +var ProgramID = solana.MustPublicKeyFromBase58("dbcij3LWUppWqq96dh6gJWwBifmcGfLSB5D4DuSMaqN") + +type Instruction struct { + bin.BaseVariant +} + +func init() { + solana.RegisterInstructionDecoder(ProgramID, registryDecodeInstruction) +} + +func SetProgramID(pubkey solana.PublicKey) { + ProgramID = pubkey +} + +func DecodeInstruction(accounts []*solana.AccountMeta, data []byte) (*Instruction, error) { + inst := new(Instruction) + if err := bin.NewBorshDecoder(data).Decode(inst); err != nil { + return nil, fmt.Errorf("unable to decode instruction: %w", err) + } + if v, ok := inst.Impl.(solana.AccountsSettable); ok { + err := v.SetAccounts(accounts) + if err != nil { + return nil, fmt.Errorf("unable to set accounts for instruction: %w", err) + } + } + return inst, nil +} + +func registryDecodeInstruction(accounts []*solana.AccountMeta, data []byte) (interface{}, error) { + inst, err := DecodeInstruction(accounts, data) + if err != nil { + return nil, err + } + return inst, nil +} + +var ( + _ solana.Instruction = (*Instruction)(nil) + _ text.TextEncodable = (*Instruction)(nil) + _ bin.BinaryUnmarshaler = (*Instruction)(nil) + _ bin.BinaryMarshaler = (*Instruction)(nil) + _ text.EncodableToTree = (*Instruction)(nil) +) + +// ----- solana.Instruction Implementation ----- + +func (inst *Instruction) ProgramID() solana.PublicKey { + return ProgramID +} + +func (inst *Instruction) Accounts() (out []*solana.AccountMeta) { + return inst.Impl.(solana.AccountsGettable).GetAccounts() +} + +func (inst *Instruction) Data() ([]byte, error) { + buf := new(bytes.Buffer) + if err := bin.NewBorshEncoder(buf).Encode(inst); err != nil { + return nil, fmt.Errorf("unable to encode instruction: %w", err) + } + return buf.Bytes(), nil +} + +// ----- text.TextEncodable Implementation ----- + +func (inst *Instruction) TextEncode(encoder *text.Encoder, option *text.Option) error { + return encoder.Encode(inst.Impl, option) +} + +// ----- text.EncodableToTree Implementation ----- + +func (inst *Instruction) EncodeToTree(parent treeout.Branches) { + if enToTree, ok := inst.Impl.(text.EncodableToTree); ok { + enToTree.EncodeToTree(parent) + } else { + parent.Child(spew.Sdump(inst)) + } +} + +// ----- bin.BinaryUnmarshaler Implementation ----- + +var InstructionImplDef = bin.NewVariantDefinition( + bin.AnchorTypeIDEncoding, + []bin.VariantType{ + { + Name: Instruction_MigrationDammV2, Type: (*MigrationDammV2)(nil), + }, + }, +) + +func (inst *Instruction) UnmarshalWithDecoder(decoder *bin.Decoder) error { + return inst.BaseVariant.UnmarshalBinaryVariant(decoder, InstructionImplDef) +} + +// ----- bin.BinaryMarshaler Implementation ----- + +func (inst Instruction) MarshalWithEncoder(encoder *bin.Encoder) error { + err := encoder.WriteBytes(inst.TypeID.Bytes(), false) + if err != nil { + return fmt.Errorf("unable to write variant type: %w", err) + } + return encoder.Encode(inst.Impl) +} From 88681cc2b5ec2cabf98914bad00db3837b22adca Mon Sep 17 00:00:00 2001 From: Marcus Pasell <3690498+rickyrombo@users.noreply.github.com> Date: Mon, 13 Oct 2025 10:29:42 -0700 Subject: [PATCH 2/4] add slot to remove race conditions, move pgNotify watcher --- ddl/functions/calculate_artist_coin_fees.sql | 3 + ddl/migrations/0169_damm_and_positions.sql | 6 ++ solana/indexer/damm_v2.go | 101 +++++++++---------- solana/indexer/dbc.go | 6 +- solana/indexer/utils.go | 51 ++++++++++ 5 files changed, 108 insertions(+), 59 deletions(-) diff --git a/ddl/functions/calculate_artist_coin_fees.sql b/ddl/functions/calculate_artist_coin_fees.sql index 0c5c3dd4..7cdd1114 100644 --- a/ddl/functions/calculate_artist_coin_fees.sql +++ b/ddl/functions/calculate_artist_coin_fees.sql @@ -11,6 +11,9 @@ RETURNS TABLE ( ) LANGUAGE sql AS $function$ WITH damm_fees AS ( + -- fee = totalLiquidity * feePerTokenStore + -- precision: (totalLiquidity * feePerTokenStore) >> 128 + -- See: https://github.com/MeteoraAg/damm-v2-sdk/blob/70d1af59689039a1dc700dee8f741db48024d02d/src/helpers/utils.ts#L190-L191 SELECT pool.token_a_mint AS mint, ( diff --git a/ddl/migrations/0169_damm_and_positions.sql b/ddl/migrations/0169_damm_and_positions.sql index 6bb0ddae..73f3399e 100644 --- a/ddl/migrations/0169_damm_and_positions.sql +++ b/ddl/migrations/0169_damm_and_positions.sql @@ -58,6 +58,7 @@ COMMENT ON TABLE sol_meteora_damm_v2_pools IS 'Tracks DAMM V2 pool state. Join w CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_pool_metrics ( pool TEXT PRIMARY KEY REFERENCES sol_meteora_damm_v2_pools(address) ON DELETE CASCADE, + slot BIGINT NOT NULL, total_lp_a_fee NUMERIC NOT NULL, total_lp_b_fee NUMERIC NOT NULL, total_protocol_a_fee NUMERIC NOT NULL, @@ -72,6 +73,7 @@ COMMENT ON TABLE sol_meteora_damm_v2_pool_metrics IS 'Tracks aggregated metrics CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_pool_fees ( pool TEXT PRIMARY KEY REFERENCES sol_meteora_damm_v2_pools(address) ON DELETE CASCADE, + slot BIGINT NOT NULL, protocol_fee_percent SMALLINT NOT NULL, partner_fee_percent SMALLINT NOT NULL, referral_fee_percent SMALLINT NOT NULL, @@ -82,6 +84,7 @@ COMMENT ON TABLE sol_meteora_damm_v2_pool_fees IS 'Tracks fee configuration for CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_pool_base_fees ( pool TEXT PRIMARY KEY REFERENCES sol_meteora_damm_v2_pools(address) ON DELETE CASCADE, + slot BIGINT NOT NULL, cliff_fee_numerator BIGINT NOT NULL, fee_scheduler_mode SMALLINT NOT NULL, number_of_period SMALLINT NOT NULL, @@ -94,6 +97,7 @@ COMMENT ON TABLE sol_meteora_damm_v2_pool_base_fees IS 'Tracks base fee configur CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_pool_dynamic_fees ( pool TEXT PRIMARY KEY REFERENCES sol_meteora_damm_v2_pools(address) ON DELETE CASCADE, + slot BIGINT NOT NULL, initialized SMALLINT NOT NULL, max_volatility_accumulator INTEGER NOT NULL, variable_fee_control INTEGER NOT NULL, @@ -113,6 +117,7 @@ COMMENT ON TABLE sol_meteora_damm_v2_pool_dynamic_fees IS 'Tracks dynamic fee co CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_positions ( address TEXT PRIMARY KEY, + slot BIGINT NOT NULL, pool TEXT NOT NULL REFERENCES sol_meteora_damm_v2_pools(address) ON DELETE CASCADE, nft_mint TEXT NOT NULL, fee_a_per_token_checkpoint BIGINT NOT NULL, @@ -129,6 +134,7 @@ COMMENT ON TABLE sol_meteora_damm_v2_positions IS 'Tracks DAMM V2 positions repr CREATE TABLE IF NOT EXISTS sol_meteora_damm_v2_position_metrics ( position TEXT PRIMARY KEY REFERENCES sol_meteora_damm_v2_positions(address) ON DELETE CASCADE, + slot BIGINT NOT NULL, total_claimed_a_fee BIGINT NOT NULL, total_claimed_b_fee BIGINT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, diff --git a/solana/indexer/damm_v2.go b/solana/indexer/damm_v2.go index dc381e6e..617b5e64 100644 --- a/solana/indexer/damm_v2.go +++ b/solana/indexer/damm_v2.go @@ -14,8 +14,6 @@ import ( "go.uber.org/zap" ) -type notificationCallback func(ctx context.Context, notification *pgconn.Notification) - type DammV2Indexer struct { pool database.DbPool grpcConfig GrpcConfig @@ -124,53 +122,6 @@ func subscribeToDammV2Pools(ctx context.Context, db database.DBTX, grpcConfig Gr return grpcClients, nil } -func watchPgNotification(ctx context.Context, pool database.DbPool, notification string, callback notificationCallback, logger *zap.Logger) error { - if logger == nil { - logger = zap.NewNop() - } - - childLogger := logger.With(zap.String("notification", notification)) - - conn, err := pool.Acquire(ctx) - if err != nil { - return fmt.Errorf("failed to acquire database connection: %w", err) - } - - rawConn := conn.Conn() - _, err = rawConn.Exec(ctx, fmt.Sprintf(`LISTEN %s`, notification)) - if err != nil { - return fmt.Errorf("failed to listen for %s changes: %w", notification, err) - } - - go func() { - defer func() { - if rawConn != nil && !rawConn.PgConn().IsClosed() && ctx.Err() != nil { - _, _ = rawConn.Exec(ctx, fmt.Sprintf(`UNLISTEN %s`, notification)) - } - childLogger.Info("received shutdown signal, stopping notification watcher") - conn.Release() - }() - for { - select { - case <-ctx.Done(): - return - default: - } - - notif, err := rawConn.WaitForNotification(ctx) - if err != nil { - childLogger.Error("failed waiting for notification", zap.Error(err)) - } - if notif == nil { - childLogger.Warn("received nil notification, continuing to wait for notifications") - continue - } - callback(ctx, notif) - } - }() - return nil -} - func makeDammV2SubscriptionRequest(dammV2Pools []string) *pb.SubscribeRequest { commitment := pb.CommitmentLevel_CONFIRMED subscription := &pb.SubscribeRequest{ @@ -254,23 +205,23 @@ func processDammV2PoolUpdate( if err != nil { return err } - err = upsertDammV2Pool(ctx, db, account, &pool) + err = upsertDammV2Pool(ctx, db, update.Slot, account, &pool) if err != nil { return err } - err = upsertDammV2PoolMetrics(ctx, db, account, &pool.Metrics) + err = upsertDammV2PoolMetrics(ctx, db, update.Slot, account, &pool.Metrics) if err != nil { return err } - err = upsertDammV2PoolFees(ctx, db, account, &pool.PoolFees) + err = upsertDammV2PoolFees(ctx, db, update.Slot, account, &pool.PoolFees) if err != nil { return err } - err = upsertDammV2PoolBaseFee(ctx, db, account, &pool.PoolFees.BaseFee) + err = upsertDammV2PoolBaseFee(ctx, db, update.Slot, account, &pool.PoolFees.BaseFee) if err != nil { return err } - err = upsertDammV2PoolDynamicFee(ctx, db, account, &pool.PoolFees.DynamicFee) + err = upsertDammV2PoolDynamicFee(ctx, db, update.Slot, account, &pool.PoolFees.DynamicFee) if err != nil { return err } @@ -288,11 +239,11 @@ func processDammV2PositionUpdate( if err != nil { return err } - err = upsertDammV2Position(ctx, db, account, &position) + err = upsertDammV2Position(ctx, db, update.Slot, account, &position) if err != nil { return err } - err = upsertDammV2PositionMetrics(ctx, db, account, &position.Metrics) + err = upsertDammV2PositionMetrics(ctx, db, update.Slot, account, &position.Metrics) if err != nil { return err } @@ -328,12 +279,14 @@ func getWatchedDammV2Pools(ctx context.Context, db database.DBTX, limit int, off func upsertDammV2Pool( ctx context.Context, db database.DBTX, + slot uint64, account solana.PublicKey, pool *meteora_damm_v2.Pool, ) error { sqlPool := ` INSERT INTO sol_meteora_damm_v2_pools ( address, + slot, token_a_mint, token_b_mint, token_a_vault, @@ -363,6 +316,7 @@ func upsertDammV2Pool( updated_at ) VALUES ( @address, + @slot, @token_a_mint, @token_b_mint, @token_a_vault, @@ -392,6 +346,7 @@ func upsertDammV2Pool( NOW() ) ON CONFLICT (address) DO UPDATE SET + slot = EXCLUDED.slot, token_a_mint = EXCLUDED.token_a_mint, token_b_mint = EXCLUDED.token_b_mint, token_a_vault = EXCLUDED.token_a_vault, @@ -418,9 +373,11 @@ func upsertDammV2Pool( permanent_lock_liquidity = EXCLUDED.permanent_lock_liquidity, creator = EXCLUDED.creator, updated_at = NOW() + WHERE EXCLUDED.slot > sol_meteora_damm_v2_pools.slot ` args := pgx.NamedArgs{ "address": account.String(), + "slot": slot, "token_a_mint": pool.TokenAMint.String(), "token_b_mint": pool.TokenBMint.String(), "token_a_vault": pool.TokenAVault.String(), @@ -455,12 +412,14 @@ func upsertDammV2Pool( func upsertDammV2PoolMetrics( ctx context.Context, db database.DBTX, + slot uint64, account solana.PublicKey, metrics *meteora_damm_v2.PoolMetrics, ) error { sqlMetrics := ` INSERT INTO sol_meteora_damm_v2_pool_metrics ( pool, + slot, total_lp_a_fee, total_lp_b_fee, total_protocol_a_fee, @@ -472,6 +431,7 @@ func upsertDammV2PoolMetrics( updated_at ) VALUES ( @pool, + @slot, @total_lp_a_fee, @total_lp_b_fee, @total_protocol_a_fee, @@ -483,6 +443,7 @@ func upsertDammV2PoolMetrics( NOW() ) ON CONFLICT (pool) DO UPDATE SET + slot = EXCLUDED.slot, total_lp_a_fee = EXCLUDED.total_lp_a_fee, total_lp_b_fee = EXCLUDED.total_lp_b_fee, total_protocol_a_fee = EXCLUDED.total_protocol_a_fee, @@ -491,10 +452,12 @@ func upsertDammV2PoolMetrics( total_partner_b_fee = EXCLUDED.total_partner_b_fee, total_position = EXCLUDED.total_position, updated_at = NOW() + WHERE EXCLUDED.slot > sol_meteora_damm_v2_pool_metrics.slot ` _, err := db.Exec(ctx, sqlMetrics, pgx.NamedArgs{ "pool": account.String(), + "slot": slot, "total_lp_a_fee": metrics.TotalLpAFee, "total_lp_b_fee": metrics.TotalLpBFee, "total_protocol_a_fee": metrics.TotalProtocolAFee, @@ -509,12 +472,14 @@ func upsertDammV2PoolMetrics( func upsertDammV2PoolFees( ctx context.Context, db database.DBTX, + slot uint64, account solana.PublicKey, fees *meteora_damm_v2.PoolFeesStruct, ) error { sqlFees := ` INSERT INTO sol_meteora_damm_v2_pool_fees ( pool, + slot, partner_fee_percent, protocol_fee_percent, referral_fee_percent, @@ -522,6 +487,7 @@ func upsertDammV2PoolFees( updated_at ) VALUES ( @pool, + @slot, @partner_fee_percent, @protocol_fee_percent, @referral_fee_percent, @@ -529,10 +495,12 @@ func upsertDammV2PoolFees( NOW() ) ON CONFLICT (pool) DO UPDATE SET + slot = EXCLUDED.slot, partner_fee_percent = EXCLUDED.partner_fee_percent, protocol_fee_percent = EXCLUDED.protocol_fee_percent, referral_fee_percent = EXCLUDED.referral_fee_percent, updated_at = NOW() + WHERE EXCLUDED.slot > sol_meteora_damm_v2_pool_fees.slot ` _, err := db.Exec(ctx, sqlFees, pgx.NamedArgs{ @@ -547,12 +515,14 @@ func upsertDammV2PoolFees( func upsertDammV2PoolBaseFee( ctx context.Context, db database.DBTX, + slot uint64, account solana.PublicKey, baseFee *meteora_damm_v2.BaseFeeStruct, ) error { sqlBaseFee := ` INSERT INTO sol_meteora_damm_v2_pool_base_fees ( pool, + slot, cliff_fee_numerator, fee_scheduler_mode, number_of_period, @@ -562,6 +532,7 @@ func upsertDammV2PoolBaseFee( updated_at ) VALUES ( @pool, + @slot, @cliff_fee_numerator, @fee_scheduler_mode, @number_of_period, @@ -571,16 +542,19 @@ func upsertDammV2PoolBaseFee( NOW() ) ON CONFLICT (pool) DO UPDATE SET + slot = EXCLUDED.slot, cliff_fee_numerator = EXCLUDED.cliff_fee_numerator, fee_scheduler_mode = EXCLUDED.fee_scheduler_mode, number_of_period = EXCLUDED.number_of_period, period_frequency = EXCLUDED.period_frequency, reduction_factor = EXCLUDED.reduction_factor, updated_at = NOW() + WHERE EXCLUDED.slot > sol_meteora_damm_v2_pool_base_fees.slot ` _, err := db.Exec(ctx, sqlBaseFee, pgx.NamedArgs{ "pool": account.String(), + "slot": slot, "cliff_fee_numerator": baseFee.CliffFeeNumerator, "fee_scheduler_mode": baseFee.FeeSchedulerMode, "number_of_period": baseFee.NumberOfPeriod, @@ -593,12 +567,14 @@ func upsertDammV2PoolBaseFee( func upsertDammV2PoolDynamicFee( ctx context.Context, db database.DBTX, + slot uint64, account solana.PublicKey, dynamicFee *meteora_damm_v2.DynamicFeeStruct, ) error { sqlDynamicFee := ` INSERT INTO sol_meteora_damm_v2_pool_dynamic_fees ( pool, + slot, initialized, max_volatility_accumulator, variable_fee_control, @@ -615,6 +591,7 @@ func upsertDammV2PoolDynamicFee( updated_at ) VALUES ( @pool, + @slot, @initialized, @max_volatility_accumulator, @variable_fee_control, @@ -631,6 +608,7 @@ func upsertDammV2PoolDynamicFee( NOW() ) ON CONFLICT (pool) DO UPDATE SET + slot = EXCLUDED.slot, initialized = EXCLUDED.initialized, max_volatility_accumulator = EXCLUDED.max_volatility_accumulator, variable_fee_control = EXCLUDED.variable_fee_control, @@ -644,6 +622,7 @@ func upsertDammV2PoolDynamicFee( volatility_accumulator = EXCLUDED.volatility_accumulator, volatility_reference = EXCLUDED.volatility_reference, updated_at = NOW() + WHERE EXCLUDED.slot > sol_meteora_damm_v2_pool_dynamic_fees.slot ` _, err := db.Exec(ctx, sqlDynamicFee, pgx.NamedArgs{ @@ -667,12 +646,14 @@ func upsertDammV2PoolDynamicFee( func upsertDammV2Position( ctx context.Context, db database.DBTX, + slot uint64, account solana.PublicKey, position *meteora_damm_v2.PositionState, ) error { sql := ` INSERT INTO sol_meteora_damm_v2_positions ( address, + slot, pool, nft_mint, fee_a_per_token_checkpoint, @@ -686,6 +667,7 @@ func upsertDammV2Position( created_at ) VALUES ( @address, + @slot, @pool, @nft_mint, @fee_a_per_token_checkpoint, @@ -699,6 +681,7 @@ func upsertDammV2Position( NOW() ) ON CONFLICT (address) DO UPDATE SET + slot = EXCLUDED.slot, pool = EXCLUDED.pool, nft_mint = EXCLUDED.nft_mint, fee_a_per_token_checkpoint = EXCLUDED.fee_a_per_token_checkpoint, @@ -709,6 +692,7 @@ func upsertDammV2Position( vested_liquidity = EXCLUDED.vested_liquidity, permanent_locked_liquidity = EXCLUDED.permanent_locked_liquidity, updated_at = NOW() + WHERE EXCLUDED.slot > sol_meteora_damm_v2_positions.slot ` _, err := db.Exec(ctx, sql, pgx.NamedArgs{ @@ -729,27 +713,32 @@ func upsertDammV2Position( func upsertDammV2PositionMetrics( ctx context.Context, db database.DBTX, + slot uint64, account solana.PublicKey, metrics *meteora_damm_v2.PositionMetrics, ) error { sql := ` INSERT INTO sol_meteora_damm_v2_position_metrics ( position, + slot, total_claimed_a_fee, total_claimed_b_fee, created_at, updated_at ) VALUES ( @position, + @slot, @total_claimed_a_fee, @total_claimed_b_fee, NOW(), NOW() ) ON CONFLICT (position) DO UPDATE SET + slot = EXCLUDED.slot, total_claimed_a_fee = EXCLUDED.total_claimed_a_fee, total_claimed_b_fee = EXCLUDED.total_claimed_b_fee, updated_at = NOW() + WHERE EXCLUDED.slot > sol_meteora_damm_v2_position_metrics.slot ` _, err := db.Exec(ctx, sql, pgx.NamedArgs{ diff --git a/solana/indexer/dbc.go b/solana/indexer/dbc.go index b1be576e..4460344b 100644 --- a/solana/indexer/dbc.go +++ b/solana/indexer/dbc.go @@ -83,7 +83,7 @@ func processDbcInstruction( if err != nil { return fmt.Errorf("failed to get damm v2 pool account data after retries: %w", err) } else { - err = upsertDammV2Pool(ctx, db, migrationInst.GetPool().PublicKey, &dammPool) + err = upsertDammV2Pool(ctx, db, slot, migrationInst.GetPool().PublicKey, &dammPool) if err != nil { return fmt.Errorf("failed to upsert damm v2 pool: %w", err) } @@ -96,7 +96,7 @@ func processDbcInstruction( if err != nil { return fmt.Errorf("failed to get first damm v2 position account data: %w", err) } else { - err = upsertDammV2Position(ctx, db, migrationInst.GetFirstPosition().PublicKey, &firstPosition) + err = upsertDammV2Position(ctx, db, slot, migrationInst.GetFirstPosition().PublicKey, &firstPosition) if err != nil { return fmt.Errorf("failed to upsert first damm v2 position: %w", err) } @@ -109,7 +109,7 @@ func processDbcInstruction( if err != nil { return fmt.Errorf("failed to get second damm v2 position account data: %w", err) } else { - err = upsertDammV2Position(ctx, db, migrationInst.GetSecondPosition().PublicKey, &secondPosition) + err = upsertDammV2Position(ctx, db, slot, migrationInst.GetSecondPosition().PublicKey, &secondPosition) if err != nil { return fmt.Errorf("failed to upsert second damm v2 position: %w", err) } diff --git a/solana/indexer/utils.go b/solana/indexer/utils.go index 3716f440..625536f3 100644 --- a/solana/indexer/utils.go +++ b/solana/indexer/utils.go @@ -7,6 +7,8 @@ import ( "api.audius.co/database" "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" + "go.uber.org/zap" ) func withRetries(f func() error, maxRetries int, interval time.Duration) error { @@ -59,3 +61,52 @@ func getArtistCoins(ctx context.Context, db database.DBTX, forceRefresh bool) ([ mintsCache = mintAddresses return mintAddresses, nil } + +type notificationCallback func(ctx context.Context, notification *pgconn.Notification) + +func watchPgNotification(ctx context.Context, pool database.DbPool, notification string, callback notificationCallback, logger *zap.Logger) error { + if logger == nil { + logger = zap.NewNop() + } + + childLogger := logger.With(zap.String("notification", notification)) + + conn, err := pool.Acquire(ctx) + if err != nil { + return fmt.Errorf("failed to acquire database connection: %w", err) + } + + rawConn := conn.Conn() + _, err = rawConn.Exec(ctx, fmt.Sprintf(`LISTEN %s`, notification)) + if err != nil { + return fmt.Errorf("failed to listen for %s changes: %w", notification, err) + } + + go func() { + defer func() { + if rawConn != nil && !rawConn.PgConn().IsClosed() && ctx.Err() != nil { + _, _ = rawConn.Exec(ctx, fmt.Sprintf(`UNLISTEN %s`, notification)) + } + childLogger.Info("received shutdown signal, stopping notification watcher") + conn.Release() + }() + for { + select { + case <-ctx.Done(): + return + default: + } + + notif, err := rawConn.WaitForNotification(ctx) + if err != nil { + childLogger.Error("failed waiting for notification", zap.Error(err)) + } + if notif == nil { + childLogger.Warn("received nil notification, continuing to wait for notifications") + continue + } + callback(ctx, notif) + } + }() + return nil +} From 8add5704d2c1db087fe3d0667417aea30109c7af Mon Sep 17 00:00:00 2001 From: Marcus Pasell <3690498+rickyrombo@users.noreply.github.com> Date: Mon, 13 Oct 2025 10:31:48 -0700 Subject: [PATCH 3/4] typo --- solana/indexer/damm_v2.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/solana/indexer/damm_v2.go b/solana/indexer/damm_v2.go index 617b5e64..8da7e7de 100644 --- a/solana/indexer/damm_v2.go +++ b/solana/indexer/damm_v2.go @@ -22,7 +22,7 @@ type DammV2Indexer struct { const MAX_DAMM_V2_POOLS_PER_SUBSCRIPTION = 10000 const DAMM_V2_POOL_SUBSCRIPTION_KEY = "dammV2Pools" -const DBC__MIGRATION_NOTIFICATION_NAME = "meteora_dbc_migration" +const DBC_MIGRATION_NOTIFICATION_NAME = "meteora_dbc_migration" func (d *DammV2Indexer) Start(ctx context.Context) { // To ensure only one subscription task is running at a time, keep track of @@ -64,7 +64,7 @@ func (d *DammV2Indexer) Start(ctx context.Context) { grpcClients = clients // Watch for new pools to be added - err = watchPgNotification(ctx, d.pool, DBC__MIGRATION_NOTIFICATION_NAME, handleNotif, d.logger) + err = watchPgNotification(ctx, d.pool, DBC_MIGRATION_NOTIFICATION_NAME, handleNotif, d.logger) if err != nil { d.logger.Error("failed to watch for DAMM V2 pool changes", zap.Error(err)) return From b3b792faaf4704218ed44bd4a3de60c49d0c90fa Mon Sep 17 00:00:00 2001 From: Marcus Pasell <3690498+rickyrombo@users.noreply.github.com> Date: Mon, 13 Oct 2025 12:25:14 -0700 Subject: [PATCH 4/4] remove unused --- .../spl/programs/meteora_damm_v2/accounts.go | 13 -- solana/spl/programs/meteora_damm_v2/client.go | 113 ------------------ .../programs/meteora_damm_v2/client_test.go | 24 ---- solana/spl/programs/meteora_damm_v2/utils.go | 27 ----- 4 files changed, 177 deletions(-) delete mode 100644 solana/spl/programs/meteora_damm_v2/accounts.go delete mode 100644 solana/spl/programs/meteora_damm_v2/client.go delete mode 100644 solana/spl/programs/meteora_damm_v2/client_test.go delete mode 100644 solana/spl/programs/meteora_damm_v2/utils.go diff --git a/solana/spl/programs/meteora_damm_v2/accounts.go b/solana/spl/programs/meteora_damm_v2/accounts.go deleted file mode 100644 index 21c54f0a..00000000 --- a/solana/spl/programs/meteora_damm_v2/accounts.go +++ /dev/null @@ -1,13 +0,0 @@ -package meteora_damm_v2 - -import "github.com/gagliardetto/solana-go" - -// Derives the position PDA from a position NFT mint -func DerivePositionPDA(positionNft solana.PublicKey) (solana.PublicKey, error) { - seeds := [][]byte{[]byte("position"), positionNft.Bytes()} - address, _, err := solana.FindProgramAddress(seeds, ProgramID) - if err != nil { - return solana.PublicKey{}, err - } - return address, nil -} diff --git a/solana/spl/programs/meteora_damm_v2/client.go b/solana/spl/programs/meteora_damm_v2/client.go deleted file mode 100644 index 0775ede5..00000000 --- a/solana/spl/programs/meteora_damm_v2/client.go +++ /dev/null @@ -1,113 +0,0 @@ -package meteora_damm_v2 - -import ( - "context" - "math/big" - "sort" - - bin "github.com/gagliardetto/binary" - "github.com/gagliardetto/solana-go" - "github.com/gagliardetto/solana-go/programs/token" - "github.com/gagliardetto/solana-go/rpc" - "go.uber.org/zap" -) - -type RpcClient interface { - GetAccountDataBorshInto(ctx context.Context, account solana.PublicKey, out interface{}) error -} - -type Client struct { - client *rpc.Client - logger *zap.Logger -} - -func NewClient( - client *rpc.Client, - logger *zap.Logger, -) *Client { - return &Client{ - client: client, - logger: logger, - } -} - -// Gets the current Pool state. -func (c *Client) GetPool(ctx context.Context, account solana.PublicKey) (*Pool, error) { - var pool Pool - err := c.client.GetAccountDataBorshInto(ctx, account, &pool) - if err != nil { - return nil, err - } - return &pool, nil -} - -// Gets a position by its address. -func (c *Client) GetPosition(ctx context.Context, account solana.PublicKey) (*PositionState, error) { - var position PositionState - err := c.client.GetAccountDataBorshInto(ctx, account, &position) - if err != nil { - return nil, err - } - return &position, nil -} - -// Gets all position NFTs held by a wallet. -func (c *Client) GetPositionNFTs(ctx context.Context, owner solana.PublicKey) ([]solana.PublicKey, error) { - accounts, err := c.client.GetTokenAccountsByOwner(ctx, owner, &rpc.GetTokenAccountsConfig{ - ProgramId: &solana.Token2022ProgramID, - }, &rpc.GetTokenAccountsOpts{}) - if err != nil { - return nil, err - } - - var positionNFTs []solana.PublicKey - for _, acc := range accounts.Value { - data := token.Account{} - bin.NewBorshDecoder(acc.Account.Data.GetBinary()).Decode(&data) - if data.Amount == uint64(1) { - positionNFTs = append(positionNFTs, data.Mint) - } - } - - return positionNFTs, nil -} - -// Gets all the positions by an owner, sorted by total liquidity (descending). -func (c *Client) GetPositionsByOwner(ctx context.Context, owner solana.PublicKey) ([]*PositionState, error) { - positionNFTs, err := c.GetPositionNFTs(ctx, owner) - if err != nil { - return nil, err - } - - var positions []*PositionState - for _, nft := range positionNFTs { - pda, err := DerivePositionPDA(nft) - if err != nil { - return nil, err - } - position, err := c.GetPosition(ctx, pda) - if err != nil { - return nil, err - } - positions = append(positions, position) - } - - // Sort positions by total liquidity - sort.Slice(positions, func(i, j int) bool { - vestedLiquidity := (&big.Int{}).SetBytes(positions[i].VestedLiquidity.Bytes()) - permanentLockedLiquidity := (&big.Int{}).SetBytes(positions[i].PermanentLockedLiquidity.Bytes()) - unlockedLiquidity := (&big.Int{}).SetBytes(positions[i].UnlockedLiquidity.Bytes()) - totalLiquidityI := (&big.Int{}).Add(vestedLiquidity, permanentLockedLiquidity) - totalLiquidityI.Add(totalLiquidityI, unlockedLiquidity) - - vestedLiquidity = (&big.Int{}).SetBytes(positions[j].VestedLiquidity.Bytes()) - permanentLockedLiquidity = (&big.Int{}).SetBytes(positions[j].PermanentLockedLiquidity.Bytes()) - unlockedLiquidity = (&big.Int{}).SetBytes(positions[j].UnlockedLiquidity.Bytes()) - totalLiquidityJ := (&big.Int{}).Add(vestedLiquidity, permanentLockedLiquidity) - totalLiquidityJ.Add(totalLiquidityJ, unlockedLiquidity) - - return totalLiquidityJ.Cmp(totalLiquidityI) < 0 - }) - - return positions, nil -} diff --git a/solana/spl/programs/meteora_damm_v2/client_test.go b/solana/spl/programs/meteora_damm_v2/client_test.go deleted file mode 100644 index 12f1266a..00000000 --- a/solana/spl/programs/meteora_damm_v2/client_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package meteora_damm_v2_test - -import ( - "context" - "testing" - - "api.audius.co/solana/spl/programs/meteora_damm_v2" - "github.com/gagliardetto/solana-go" - "github.com/gagliardetto/solana-go/rpc" - "github.com/stretchr/testify/require" - "go.uber.org/zap" -) - -func TestActualFetch(t *testing.T) { - ctx := context.Background() - client := meteora_damm_v2.NewClient(rpc.New(rpc.MainNetBeta_RPC), zap.NewNop()) - - owner, err := solana.PublicKeyFromBase58("EF1zneAqA2mwjkD3Lj7sQnMhR2uorGqEHXNtAWfGdCu2") - require.NoError(t, err) - - positions, err := client.GetPositionsByOwner(ctx, owner) - require.NoError(t, err) - require.Greater(t, len(positions), 0) -} diff --git a/solana/spl/programs/meteora_damm_v2/utils.go b/solana/spl/programs/meteora_damm_v2/utils.go deleted file mode 100644 index 771cdb39..00000000 --- a/solana/spl/programs/meteora_damm_v2/utils.go +++ /dev/null @@ -1,27 +0,0 @@ -package meteora_damm_v2 - -import ( - "math/big" -) - -const LIQUIDITY_SCALE = 128 - -func GetUnclaimedFees(pool *Pool, position *PositionState) (*big.Int, *big.Int) { - totalPositionLiquidity := big.NewInt(0).Add( - big.NewInt(0).Add(position.UnlockedLiquidity.BigInt(), position.VestedLiquidity.BigInt()), - position.PermanentLockedLiquidity.BigInt(), - ) - - feeA := big.NewInt(0).Sub(&pool.FeeAPerLiquidity.Int, &position.FeeAPerTokenCheckpoint.Int) - feeB := big.NewInt(0).Sub(&pool.FeeBPerLiquidity.Int, &position.FeeBPerTokenCheckpoint.Int) - - feeA.Mul(feeA, totalPositionLiquidity) - feeB.Mul(feeB, totalPositionLiquidity) - feeA.Rsh(feeA, LIQUIDITY_SCALE) - feeB.Rsh(feeB, LIQUIDITY_SCALE) - - feeA.Add(feeA, big.NewInt(0).SetUint64(position.FeeAPending)) - feeB.Add(feeB, big.NewInt(0).SetUint64(position.FeeBPending)) - - return feeA, feeB -}