Skip to content

Commit 69348ba

Browse files
committed
feat(pdp): db & extras rename "proof set"->"data set", "root"->"piece"
1 parent bea4dce commit 69348ba

File tree

10 files changed

+615
-438
lines changed

10 files changed

+615
-438
lines changed

cmd/curio/tasks/tasks.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,8 @@ var log = logging.Logger("curio/deps")
6363
func WindowPostScheduler(ctx context.Context, fc config.CurioFees, pc config.CurioProvingConfig,
6464
api api.Chain, verif storiface.Verifier, paramck func() (bool, error), sender *message.Sender, chainSched *chainsched.CurioChainSched,
6565
as *multictladdr.MultiAddressSelector, addresses map[dtypes.MinerAddress]bool, db *harmonydb.DB,
66-
stor paths.Store, idx paths.SectorIndex, max int) (*window2.WdPostTask, *window2.WdPostSubmitTask, *window2.WdPostRecoverDeclareTask, error) {
67-
66+
stor paths.Store, idx paths.SectorIndex, max int,
67+
) (*window2.WdPostTask, *window2.WdPostSubmitTask, *window2.WdPostRecoverDeclareTask, error) {
6868
// todo config
6969
ft := window2.NewSimpleFaultTracker(stor, idx, pc.ParallelCheckLimit, pc.SingleCheckTimeout, pc.PartitionCheckTimeout)
7070

@@ -114,14 +114,13 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
114114
var fetchOnce sync.Once
115115
var fetchResult atomic.Pointer[result.Result[bool]]
116116

117-
var asyncParams = func() func() (bool, error) {
117+
asyncParams := func() func() (bool, error) {
118118
fetchOnce.Do(func() {
119119
go func() {
120120
for spt := range dependencies.ProofTypes {
121121

122122
provingSize := uint64(must.One(spt.SectorSize()))
123123
err := fastparamfetch.GetParams(context.TODO(), proofparams.ParametersJSON(), proofparams.SrsJSON(), provingSize)
124-
125124
if err != nil {
126125
log.Errorw("failed to fetch params", "error", err)
127126
fetchResult.Store(&result.Result[bool]{Value: false, Error: err})
@@ -145,7 +144,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
145144
// eth message sender as needed
146145
var senderEth *message.SenderETH
147146
var senderEthOnce sync.Once
148-
var getSenderEth = func() *message.SenderETH {
147+
getSenderEth := func() *message.SenderETH {
149148
senderEthOnce.Do(func() {
150149
ec, err := dependencies.EthClient.Val()
151150
if err != nil {
@@ -170,7 +169,6 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
170169
wdPostTask, wdPoStSubmitTask, derlareRecoverTask, err := WindowPostScheduler(
171170
ctx, cfg.Fees, cfg.Proving, full, verif, asyncParams(), sender, chainSched,
172171
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
173-
174172
if err != nil {
175173
return nil, err
176174
}
@@ -284,7 +282,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps, shutdownChan chan
284282
sdeps.EthSender = es
285283

286284
pdp.NewWatcherCreate(db, must.One(dependencies.EthClient.Val()), chainSched)
287-
pdp.NewWatcherRootAdd(db, must.One(dependencies.EthClient.Val()), chainSched)
285+
pdp.NewWatcherPieceAdd(db, must.One(dependencies.EthClient.Val()), chainSched)
288286

289287
pdpProveTask := pdp.NewProveTask(chainSched, db, must.One(dependencies.EthClient.Val()), dependencies.Chain, es, dependencies.CachedPieceReader)
290288
pdpNextProvingPeriodTask := pdp.NewNextProvingPeriodTask(db, must.One(dependencies.EthClient.Val()), dependencies.Chain, chainSched, es)
@@ -354,7 +352,8 @@ func addSealingTasks(
354352
ctx context.Context, hasAnySealingTask bool, db *harmonydb.DB, full api.Chain, sender *message.Sender,
355353
as *multictladdr.MultiAddressSelector, cfg *config.CurioConfig, slrLazy *lazy.Lazy[*ffi.SealCalls],
356354
asyncParams func() func() (bool, error), si paths.SectorIndex, stor *paths.Remote,
357-
bstore curiochain.CurioBlockstore, machineHostPort string, prover storiface.Prover) ([]harmonytask.TaskInterface, error) {
355+
bstore curiochain.CurioBlockstore, machineHostPort string, prover storiface.Prover,
356+
) ([]harmonytask.TaskInterface, error) {
358357
var activeTasks []harmonytask.TaskInterface
359358
// Sealing / Snap
360359

@@ -490,7 +489,6 @@ func machineDetails(deps *deps.Deps, activeTasks []harmonytask.TaskInterface, ma
490489
ON CONFLICT (machine_id) DO UPDATE SET tasks=$1, layers=$2, startup_time=$3, miners=$4, machine_id=$5, machine_name=$6`,
491490
strings.Join(taskNames, ","), strings.Join(deps.Layers, ","),
492491
time.Now(), strings.Join(miners, ","), machineID, machineName)
493-
494492
if err != nil {
495493
log.Errorf("failed to update machine details: %s", err)
496494
return

cmd/pdptool/main.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -747,7 +747,7 @@ var uploadFileCmd = &cli.Command{
747747
subPieceStr: "",
748748
})
749749
pieceSize := uint64(0)
750-
maxRootSize, err := abi.RegisteredSealProof_StackedDrg64GiBV1_1.SectorSize()
750+
maxPieceSize, err := abi.RegisteredSealProof_StackedDrg64GiBV1_1.SectorSize()
751751
if err != nil {
752752
return fmt.Errorf("failed to get sector size: %v", err)
753753
}
@@ -808,7 +808,7 @@ var uploadFileCmd = &cli.Command{
808808
return fmt.Errorf("failed to write chunk to file: %v", err)
809809
}
810810
}
811-
if pieceSize+paddedPieceSize > uint64(maxRootSize) {
811+
if pieceSize+paddedPieceSize > uint64(maxPieceSize) {
812812
pieceSets = append(pieceSets, pieceSetInfo{
813813
pieces: make([]abi.PieceInfo, 0),
814814
subPieceStr: "",
@@ -1111,7 +1111,7 @@ var getDataSetCmd = &cli.Command{
11111111
PieceCid string `json:"pieceCid"`
11121112
SubPieceCid string `json:"subPieceCid"`
11131113
SubPieceOffset int64 `json:"subPieceOffset"`
1114-
} `json:"roots"`
1114+
} `json:"pieces"`
11151115
}
11161116
err = json.Unmarshal(bodyBytes, &response)
11171117
if err != nil {
@@ -1122,11 +1122,11 @@ var getDataSetCmd = &cli.Command{
11221122
fmt.Printf("Data Set ID: %d\n", response.ID)
11231123
fmt.Printf("Next Challenge Epoch: %d\n", response.NextChallengeEpoch)
11241124
fmt.Printf("Pieces:\n")
1125-
for _, root := range response.Pieces {
1126-
fmt.Printf(" - Root ID: %d\n", root.PieceId)
1127-
fmt.Printf(" Root CID: %s\n", root.PieceCid)
1128-
fmt.Printf(" SubPiece CID: %s\n", root.SubPieceCid)
1129-
fmt.Printf(" SubPiece Offset: %d\n", root.SubPieceOffset)
1125+
for _, piece := range response.Pieces {
1126+
fmt.Printf(" - Piece ID: %d\n", piece.PieceId)
1127+
fmt.Printf(" Piece CID: %s\n", piece.PieceCid)
1128+
fmt.Printf(" SubPiece CID: %s\n", piece.SubPieceCid)
1129+
fmt.Printf(" SubPiece Offset: %d\n", piece.SubPieceOffset)
11301130
fmt.Println()
11311131
}
11321132
} else {
Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
-- PDP terminology rename migration
2+
-- Renames: proofset->data_set, proof_set->data_set, root->piece, subroot->sub_piece, sub_root->sub_piece
3+
-- This migration handles database schema changes for PDP terminology to match the current PDPVerifier
4+
5+
-- Check if migration has already been run (idempotency check)
6+
DO $$
7+
BEGIN
8+
IF EXISTS (SELECT 1 FROM information_schema.tables
9+
WHERE table_schema = current_schema()
10+
AND table_name = 'pdp_data_sets') THEN
11+
RAISE NOTICE 'Migration already applied, skipping...';
12+
RETURN;
13+
END IF;
14+
END
15+
$$;
16+
17+
-- Step 1: Drop old triggers BEFORE renaming tables (using original table names)
18+
DROP TRIGGER IF EXISTS pdp_proofset_root_insert ON pdp_proofset_roots;
19+
DROP TRIGGER IF EXISTS pdp_proofset_root_delete ON pdp_proofset_roots;
20+
DROP TRIGGER IF EXISTS pdp_proofset_root_update ON pdp_proofset_roots;
21+
DROP TRIGGER IF EXISTS pdp_proofset_create_message_status_change ON message_waits_eth;
22+
DROP TRIGGER IF EXISTS pdp_proofset_add_message_status_change ON message_waits_eth;
23+
24+
-- Step 2: Drop old functions
25+
DROP FUNCTION IF EXISTS increment_proofset_refcount();
26+
DROP FUNCTION IF EXISTS decrement_proofset_refcount();
27+
DROP FUNCTION IF EXISTS adjust_proofset_refcount_on_update();
28+
DROP FUNCTION IF EXISTS update_pdp_proofset_creates();
29+
DROP FUNCTION IF EXISTS update_pdp_proofset_roots();
30+
31+
-- Step 3: Rename tables
32+
ALTER TABLE pdp_proof_sets RENAME TO pdp_data_sets;
33+
ALTER TABLE pdp_proofset_creates RENAME TO pdp_data_set_creates;
34+
ALTER TABLE pdp_proofset_roots RENAME TO pdp_data_set_pieces;
35+
ALTER TABLE pdp_proofset_root_adds RENAME TO pdp_data_set_piece_adds;
36+
37+
-- Step 4: Rename columns in pdp_data_sets (formerly pdp_proof_sets)
38+
-- No column renames needed in this table as it uses 'id' not 'proofset_id'
39+
40+
-- Step 5: Rename columns in pdp_data_set_creates (formerly pdp_proofset_creates)
41+
ALTER TABLE pdp_data_set_creates RENAME COLUMN proofset_created TO data_set_created;
42+
43+
-- Step 6: Rename columns in pdp_data_set_pieces (formerly pdp_proofset_roots)
44+
ALTER TABLE pdp_data_set_pieces RENAME COLUMN proofset TO data_set;
45+
ALTER TABLE pdp_data_set_pieces RENAME COLUMN root TO piece;
46+
ALTER TABLE pdp_data_set_pieces RENAME COLUMN root_id TO piece_id;
47+
ALTER TABLE pdp_data_set_pieces RENAME COLUMN subroot TO sub_piece;
48+
ALTER TABLE pdp_data_set_pieces RENAME COLUMN subroot_offset TO sub_piece_offset;
49+
ALTER TABLE pdp_data_set_pieces RENAME COLUMN subroot_size TO sub_piece_size;
50+
51+
-- Step 7: Rename columns in pdp_data_set_piece_adds (formerly pdp_proofset_root_adds)
52+
ALTER TABLE pdp_data_set_piece_adds RENAME COLUMN proofset TO data_set;
53+
ALTER TABLE pdp_data_set_piece_adds RENAME COLUMN root TO piece;
54+
ALTER TABLE pdp_data_set_piece_adds RENAME COLUMN roots_added TO pieces_added;
55+
ALTER TABLE pdp_data_set_piece_adds RENAME COLUMN subroot TO sub_piece;
56+
ALTER TABLE pdp_data_set_piece_adds RENAME COLUMN subroot_offset TO sub_piece_offset;
57+
ALTER TABLE pdp_data_set_piece_adds RENAME COLUMN subroot_size TO sub_piece_size;
58+
59+
-- Step 8: Rename columns in pdp_piecerefs
60+
ALTER TABLE pdp_piecerefs RENAME COLUMN proofset_refcount TO data_set_refcount;
61+
62+
-- Step 9: Rename columns in pdp_prove_tasks
63+
ALTER TABLE pdp_prove_tasks RENAME COLUMN proofset TO data_set;
64+
65+
-- Step 10: Rename constraints (PostgreSQL automatically renames constraints when tables are renamed, but let's be explicit)
66+
-- First, get the actual constraint names after table rename
67+
DO $$
68+
DECLARE
69+
pieces_constraint_name text;
70+
piece_adds_constraint_name text;
71+
BEGIN
72+
-- Find the primary key constraint name for pdp_data_set_pieces
73+
SELECT conname INTO pieces_constraint_name
74+
FROM pg_constraint
75+
WHERE conrelid = 'pdp_data_set_pieces'::regclass
76+
AND contype = 'p';
77+
78+
-- Find the primary key constraint name for pdp_data_set_piece_adds
79+
SELECT conname INTO piece_adds_constraint_name
80+
FROM pg_constraint
81+
WHERE conrelid = 'pdp_data_set_piece_adds'::regclass
82+
AND contype = 'p';
83+
84+
-- Rename constraints if they still have old names
85+
IF pieces_constraint_name = 'pdp_proofset_roots_root_id_unique' THEN
86+
EXECUTE format('ALTER TABLE pdp_data_set_pieces RENAME CONSTRAINT %I TO pdp_data_set_pieces_piece_id_unique', pieces_constraint_name);
87+
END IF;
88+
89+
IF piece_adds_constraint_name = 'pdp_proofset_root_adds_root_id_unique' THEN
90+
EXECUTE format('ALTER TABLE pdp_data_set_piece_adds RENAME CONSTRAINT %I TO pdp_data_set_piece_adds_piece_id_unique', piece_adds_constraint_name);
91+
END IF;
92+
END
93+
$$;
94+
95+
-- Step 11: Create new functions with updated column names
96+
CREATE OR REPLACE FUNCTION increment_data_set_refcount()
97+
RETURNS TRIGGER AS $$
98+
BEGIN
99+
UPDATE pdp_piecerefs
100+
SET data_set_refcount = data_set_refcount + 1
101+
WHERE id = NEW.pdp_pieceref;
102+
RETURN NEW;
103+
END;
104+
$$ LANGUAGE plpgsql;
105+
106+
CREATE OR REPLACE FUNCTION decrement_data_set_refcount()
107+
RETURNS TRIGGER AS $$
108+
BEGIN
109+
UPDATE pdp_piecerefs
110+
SET data_set_refcount = data_set_refcount - 1
111+
WHERE id = OLD.pdp_pieceref;
112+
RETURN OLD;
113+
END;
114+
$$ LANGUAGE plpgsql;
115+
116+
CREATE OR REPLACE FUNCTION adjust_data_set_refcount_on_update()
117+
RETURNS TRIGGER AS $$
118+
BEGIN
119+
IF OLD.pdp_pieceref IS DISTINCT FROM NEW.pdp_pieceref THEN
120+
-- Decrement count for old reference if not null
121+
IF OLD.pdp_pieceref IS NOT NULL THEN
122+
UPDATE pdp_piecerefs
123+
SET data_set_refcount = data_set_refcount - 1
124+
WHERE id = OLD.pdp_pieceref;
125+
END IF;
126+
-- Increment count for new reference if not null
127+
IF NEW.pdp_pieceref IS NOT NULL THEN
128+
UPDATE pdp_piecerefs
129+
SET data_set_refcount = data_set_refcount + 1
130+
WHERE id = NEW.pdp_pieceref;
131+
END IF;
132+
END IF;
133+
RETURN NEW;
134+
END;
135+
$$ LANGUAGE plpgsql;
136+
137+
-- Step 12: Create message status change functions
138+
CREATE OR REPLACE FUNCTION update_pdp_data_set_creates()
139+
RETURNS TRIGGER AS $$
140+
BEGIN
141+
IF OLD.tx_status = 'pending' AND (NEW.tx_status = 'confirmed' OR NEW.tx_status = 'failed') THEN
142+
-- Update the ok field in pdp_data_set_creates if a matching entry exists
143+
UPDATE pdp_data_set_creates
144+
SET ok = CASE
145+
WHEN NEW.tx_status = 'failed' OR NEW.tx_success = FALSE THEN FALSE
146+
WHEN NEW.tx_status = 'confirmed' AND NEW.tx_success = TRUE THEN TRUE
147+
ELSE ok
148+
END
149+
WHERE create_message_hash = NEW.signed_tx_hash AND data_set_created = FALSE;
150+
END IF;
151+
RETURN NEW;
152+
END;
153+
$$ LANGUAGE plpgsql;
154+
155+
CREATE OR REPLACE FUNCTION update_pdp_data_set_piece_adds()
156+
RETURNS TRIGGER AS $$
157+
BEGIN
158+
IF OLD.tx_status = 'pending' AND (NEW.tx_status = 'confirmed' OR NEW.tx_status = 'failed') THEN
159+
-- Update the add_message_ok field in pdp_data_set_piece_adds if a matching entry exists
160+
UPDATE pdp_data_set_piece_adds
161+
SET add_message_ok = CASE
162+
WHEN NEW.tx_status = 'failed' OR NEW.tx_success = FALSE THEN FALSE
163+
WHEN NEW.tx_status = 'confirmed' AND NEW.tx_success = TRUE THEN TRUE
164+
ELSE add_message_ok
165+
END
166+
WHERE add_message_hash = NEW.signed_tx_hash;
167+
END IF;
168+
RETURN NEW;
169+
END;
170+
$$ LANGUAGE plpgsql;
171+
172+
-- Step 13: Create new triggers with updated names
173+
CREATE TRIGGER pdp_data_set_piece_insert
174+
AFTER INSERT ON pdp_data_set_pieces
175+
FOR EACH ROW
176+
WHEN (NEW.pdp_pieceref IS NOT NULL)
177+
EXECUTE FUNCTION increment_data_set_refcount();
178+
179+
CREATE TRIGGER pdp_data_set_piece_delete
180+
AFTER DELETE ON pdp_data_set_pieces
181+
FOR EACH ROW
182+
WHEN (OLD.pdp_pieceref IS NOT NULL)
183+
EXECUTE FUNCTION decrement_data_set_refcount();
184+
185+
CREATE TRIGGER pdp_data_set_piece_update
186+
AFTER UPDATE ON pdp_data_set_pieces
187+
FOR EACH ROW
188+
EXECUTE FUNCTION adjust_data_set_refcount_on_update();
189+
190+
CREATE TRIGGER pdp_data_set_create_message_status_change
191+
AFTER UPDATE OF tx_status, tx_success ON message_waits_eth
192+
FOR EACH ROW
193+
EXECUTE PROCEDURE update_pdp_data_set_creates();
194+
195+
CREATE TRIGGER pdp_data_set_add_message_status_change
196+
AFTER UPDATE OF tx_status, tx_success ON message_waits_eth
197+
FOR EACH ROW
198+
EXECUTE PROCEDURE update_pdp_data_set_piece_adds();
199+
200+
-- Step 14: Update indexes
201+
-- Handle the index that was added in a later migration
202+
DO $$
203+
BEGIN
204+
-- Check if the old index exists and rename it
205+
IF EXISTS (SELECT 1 FROM pg_indexes WHERE indexname = 'idx_pdp_proofset_root_adds_roots_added') THEN
206+
ALTER INDEX idx_pdp_proofset_root_adds_roots_added RENAME TO idx_pdp_data_set_piece_adds_pieces_added;
207+
END IF;
208+
END
209+
$$;
210+
211+
-- Step 15: Update foreign key constraints
212+
-- PostgreSQL automatically updates foreign key references when tables are renamed,
213+
-- but let's add a verification step
214+
DO $$
215+
DECLARE
216+
fk_count integer;
217+
BEGIN
218+
-- Verify that foreign keys were updated correctly
219+
SELECT COUNT(*) INTO fk_count
220+
FROM information_schema.table_constraints tc
221+
JOIN information_schema.constraint_column_usage ccu
222+
ON tc.constraint_name = ccu.constraint_name
223+
AND tc.constraint_schema = ccu.constraint_schema
224+
WHERE tc.constraint_type = 'FOREIGN KEY'
225+
AND tc.table_schema = current_schema()
226+
AND ccu.table_name IN ('pdp_data_sets', 'pdp_data_set_creates',
227+
'pdp_data_set_pieces', 'pdp_data_set_piece_adds');
228+
229+
IF fk_count = 0 THEN
230+
RAISE WARNING 'No foreign key constraints found - this may be expected depending on schema';
231+
END IF;
232+
END
233+
$$;
234+
235+
-- Step 16: Add comments to document the migration
236+
COMMENT ON TABLE pdp_data_sets IS 'Formerly pdp_proof_sets - renamed in Phase 2 external terminology update';
237+
COMMENT ON TABLE pdp_data_set_creates IS 'Formerly pdp_proofset_creates - renamed in Phase 2 external terminology update';
238+
COMMENT ON TABLE pdp_data_set_pieces IS 'Formerly pdp_proofset_roots - renamed in Phase 2 external terminology update';
239+
COMMENT ON TABLE pdp_data_set_piece_adds IS 'Formerly pdp_proofset_root_adds - renamed in Phase 2 external terminology update';
240+
241+
-- Step 17: Final verification
242+
DO $$
243+
DECLARE
244+
table_count integer;
245+
column_count integer;
246+
BEGIN
247+
-- Verify all tables were renamed
248+
SELECT COUNT(*) INTO table_count
249+
FROM information_schema.tables
250+
WHERE table_schema = current_schema()
251+
AND table_name IN ('pdp_data_sets', 'pdp_data_set_creates',
252+
'pdp_data_set_pieces', 'pdp_data_set_piece_adds');
253+
254+
IF table_count != 4 THEN
255+
RAISE EXCEPTION 'Not all tables were renamed successfully';
256+
END IF;
257+
258+
-- Verify critical columns were renamed
259+
SELECT COUNT(*) INTO column_count
260+
FROM information_schema.columns
261+
WHERE table_schema = current_schema()
262+
AND (
263+
(table_name = 'pdp_data_set_creates' AND column_name = 'data_set_created') OR
264+
(table_name = 'pdp_data_set_pieces' AND column_name = 'data_set') OR
265+
(table_name = 'pdp_data_set_pieces' AND column_name = 'piece_id') OR
266+
(table_name = 'pdp_piecerefs' AND column_name = 'data_set_refcount')
267+
);
268+
269+
IF column_count != 4 THEN
270+
RAISE EXCEPTION 'Not all columns were renamed successfully';
271+
END IF;
272+
273+
RAISE NOTICE 'Migration completed successfully!';
274+
END
275+
$$;

0 commit comments

Comments
 (0)