From 3a9f22e9761f0a1bd2992a3d9081ab4ab57fb29b Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Wed, 7 Aug 2024 23:24:45 +0400 Subject: [PATCH 01/15] basic mk12 scafolding --- cmd/curio/guidedsetup/shared_cbor_gen.go | 1 + deps/config/doc_gen.go | 1 - go.mod | 2 +- .../sql/20240730-market-migration.sql | 205 ++++++++ lib/indexing/indexstore/create.cql | 9 + lib/indexing/indexstore/indexstore.go | 359 ++++++++++++++ lib/libp2p/libp2p.go | 207 ++++++++ lib/paths/mocks/index.go | 4 +- lib/paths/mocks/pf.go | 3 +- lib/paths/mocks/store.go | 6 +- market/mk12/libp2pimpl/libp2pimpl.go | 449 ++++++++++++++++++ market/mk12/types.go | 2 + market/storageingest/deal_ingest_seal.go | 2 +- tasks/gc/pipeline_meta_gc.go | 3 + tasks/indexing/task_indexing.go | 5 +- tasks/seal/task_movestorage.go | 2 +- tasks/snap/task_movestorage.go | 2 +- 17 files changed, 1247 insertions(+), 15 deletions(-) create mode 100644 harmony/harmonydb/sql/20240730-market-migration.sql create mode 100644 lib/indexing/indexstore/create.cql create mode 100644 lib/indexing/indexstore/indexstore.go create mode 100644 lib/libp2p/libp2p.go create mode 100644 market/mk12/libp2pimpl/libp2pimpl.go diff --git a/cmd/curio/guidedsetup/shared_cbor_gen.go b/cmd/curio/guidedsetup/shared_cbor_gen.go index c88b9e31f..1f2effd07 100644 --- a/cmd/curio/guidedsetup/shared_cbor_gen.go +++ b/cmd/curio/guidedsetup/shared_cbor_gen.go @@ -10,6 +10,7 @@ import ( storiface "github.com/filecoin-project/curio/lib/storiface" sector "github.com/filecoin-project/curio/lib/types/sector" + abi "github.com/filecoin-project/go-state-types/abi" cid "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" diff --git a/deps/config/doc_gen.go b/deps/config/doc_gen.go index 904de0000..664bc799e 100644 --- a/deps/config/doc_gen.go +++ b/deps/config/doc_gen.go @@ -712,7 +712,6 @@ also be bounded by resources available on the machine.`, { Name: "EnableDealMarket", Type: "bool", - Comment: `EnableDealMarket enabled the deal market on the node. This would also enable libp2p on the node, if configured.`, }, { diff --git a/go.mod b/go.mod index 4434ba23e..9b4381f72 100644 --- a/go.mod +++ b/go.mod @@ -85,7 +85,6 @@ require ( go.opencensus.io v0.24.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 - golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 golang.org/x/net v0.26.0 golang.org/x/sync v0.7.0 golang.org/x/sys v0.23.0 @@ -320,6 +319,7 @@ require ( go.uber.org/mock v0.4.0 // indirect go4.org v0.0.0-20230225012048-214862532bf5 // indirect golang.org/x/crypto v0.25.0 // indirect + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/time v0.5.0 // indirect diff --git a/harmony/harmonydb/sql/20240730-market-migration.sql b/harmony/harmonydb/sql/20240730-market-migration.sql new file mode 100644 index 000000000..d936f9bde --- /dev/null +++ b/harmony/harmonydb/sql/20240730-market-migration.sql @@ -0,0 +1,205 @@ +CREATE TABLE market_mk12_deals ( + uuid TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP AT TIME ZONE 'UTC', + signed_proposal_cid TEXT NOT NULL, + proposal_signature BYTEA NOT NULL, + proposal jsonb NOT NULL, + piece_cid TEXT NOT NULL, + piece_size BIGINT NOT NULL, + offline BOOLEAN NOT NULL, + verified BOOLEAN NOT NULL, + sp_id BIGINT NOT NULL, + start_epoch BIGINT NOT NULL, + end_epoch BIGINT NOT NULL, + client_peer_id TEXT NOT NULL, + chain_deal_id BIGINT DEFAULT NULL, + publish_cid TEXT DEFAULT NULL, + length BIGINT DEFAULT NULL, + fast_retrieval BOOLEAN NOT NULL, + announce_to_ipni BOOLEAN NOT NULL, + url TEXT DEFAULT NULL, + url_headers jsonb NOT NULL DEFAULT '{}', + error TEXT DEFAULT NULL, + + primary key (uuid, sp_id, piece_cid, signed_proposal_cid), + unique (uuid), + unique (signed_proposal_cid) +); + +CREATE TABLE market_legacy_deals ( + signed_proposal_cid TEXT, + proposal_signature BYTEA, + proposal jsonb, + piece_cid TEXT, + piece_size BIGINT, + offline BOOLEAN, + verified BOOLEAN, + sp_id BIGINT, + start_epoch BIGINT, + end_epoch BIGINT, + publish_cid TEXT, + fast_retrieval BOOLEAN, + chain_deal_id BIGINT, + created_at TIMESTAMPTZ, + sector_num BIGINT, + + primary key (sp_id, piece_cid, signed_proposal_cid) +); + +CREATE TABLE market_piece_metadata ( + piece_cid TEXT NOT NULL PRIMARY KEY, + version INT NOT NULL DEFAULT 2, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP AT TIME ZONE 'UTC', + indexed BOOLEAN NOT NULL DEFAULT FALSE, + indexed_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP AT TIME ZONE 'UTC', + + constraint market_piece_meta_identity_key + unique (piece_cid) +); + +CREATE TABLE market_piece_deal ( + id TEXT NOT NULL, + piece_cid TEXT NOT NULL, + boost_deal BOOLEAN NOT NULL, + legacy_deal BOOLEAN NOT NULL DEFAULT FALSE, + chain_deal_id BIGINT NOT NULL DEFAULT 0, + sp_id BIGINT NOT NULL, + sector_num BIGINT NOT NULL, + piece_offset BIGINT NOT NULL, + piece_length BIGINT NOT NULL, + raw_size BIGINT NOT NULL, + + primary key (sp_id, piece_cid, deal), + constraint market_piece_deal_identity_key + unique (sp_id, id) +); + +CREATE OR REPLACE FUNCTION process_piece_deal( + _id TEXT, + _piece_cid TEXT, + _boost_deal BOOLEAN, + _legacy_deal BOOLEAN DEFAULT FALSE, + _chain_deal_id BIGINT DEFAULT 0, + _sp_id BIGINT, + _sector_num BIGINT, + _piece_offset BIGINT, + _piece_length BIGINT, + _raw_size BIGINT +) +RETURNS VOID AS $$ +BEGIN + -- Update or insert into market_piece_metadata +INSERT INTO market_piece_metadata (piece_cid, indexed, indexed_at) +VALUES (_piece_cid, TRUE, CURRENT_TIMESTAMP AT TIME ZONE 'UTC') + ON CONFLICT (piece_cid) DO UPDATE + SET indexed = TRUE, + indexed_at = CURRENT_TIMESTAMP AT TIME ZONE 'UTC'; + +-- Insert into market_piece_deal +INSERT INTO market_piece_deal ( + id, piece_cid, boost_deal, legacy_deal, chain_deal_id, + sp_id, sector_num, piece_offset, piece_length, raw_size +) +VALUES ( + _id, _piece_cid, _boost_deal, _legacy_deal, _chain_deal_id, + _sp_id, _sector_num, _piece_offset, _piece_length, _raw_size + ) + ON CONFLICT (sp_id, piece_cid, id) DO NOTHING; +END; +$$ LANGUAGE plpgsql; + +CREATE TABLE market_mk12_storage_ask ( + price BIGINT NOT NULL, + verified_price BIGINT NOT NULL, + min_size BIGINT NOT NULL, + max_size BIGINT NOT NULL, + sp_id BIGINT NOT NULL, + created_at BIGINT NOT NULL, + expiry BIGINT NOT NULL, + sequence BIGINT NOT NULL, + unique (sp_id) +); + +CREATE TABLE market_mk12_deal_pipeline ( + uuid TEXT NOT NULL, + sp_id BIGINT NOT NULL, + started BOOLEAN DEFAULT FALSE, + piece_cid TEXT NOT NULL, + piece_size BOOLEAN NOT NULL, + offline BOOLEAN NOT NULL, + downloaded BOOLEAN DEFAULT FALSE, + url TEXT DEFAULT NULL, + headers jsonb NOT NULL DEFAULT '{}', + file_size BIGINT DEFAULT NULL, + commp_task_id BIGINT DEFAULT NULL, + after_commp BOOLEAN DEFAULT FALSE, + find_task_id BIGINT DEFAULT NULL, + after_find BOOLEAN DEFAULT FALSE, + psd_task_id BIGINT DEFAULT NULL, + after_psd BOOLEAN DEFAULT FALSE, + psd_wait_time TIMESTAMPTZ, + find_deal_task_id BIGINT DEFAULT NULL, + after_find_deal BOOLEAN DEFAULT FALSE, + sector BIGINT, + sector_offset BIGINT, + sealed BOOLEAN DEFAULT FALSE, + indexed BOOLEAN DEFAULT FALSE, + + constraint market_mk12_deal_pipeline_identity_key unique (uuid) +); + +CREATE TABLE market_offline_urls ( + piece_cid TEXT NOT NULL, + url TEXT NOT NULL, + headers jsonb NOT NULL DEFAULT '{}', + raw_size BIGINT NOT NULL, + unique (piece_cid) +); + +CREATE TABLE market_indexing_tasks ( + id TEXT NOT NULL, + sp_id BIGINT NOT NULL, + sector_number BIGINT NOT NULL, + reg_seal_proof INT NOT NULL, + piece_offset BIGINT NOT NULL, + piece_size BIGINT NOT NULL, + raw_size BIGINT NOT NULL, + piece_cid TEXT NOT NULL, + created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP AT TIME ZONE 'UTC', + task_id BIGINT DEFAULT NULL, + + constraint market_indexing_tasks_identity_key + unique (id, sp_id, sector_number, piece_offset, piece_size, piece_cid, reg_seal_proof) +) + +CREATE TABLE libp2p_keys ( + sp_id BIGINT NOT NULL, + priv_key BYTEA NOT NULL, + listen_address TEXT NOT NULL, + announce_address TEXT NOT NULL, + no_announce_address TEXT NOT NULL, +); + +CREATE TABLE direct_deals ( + id TEXT, + created_at TIMESTAMPTZ, + piece_cid TEXT, + piece_size BIGINT, + cleanup_data BOOLEAN, + client_address TEXT, + provider_address TEXT, + allocation_id BIGINT, + start_epoch BIGINT, + end_epoch BIGINT, + inbound_file_path TEXT, + inbound_file_size BIGINT, + sector_id BIGINT, + offset BIGINT, + length BIGINT, + announce_to_ipni BOOLEAN, + keep_unsealed_copy BOOLEAN +); + + + + diff --git a/lib/indexing/indexstore/create.cql b/lib/indexing/indexstore/create.cql new file mode 100644 index 000000000..21157bb83 --- /dev/null +++ b/lib/indexing/indexstore/create.cql @@ -0,0 +1,9 @@ +CREATE TABLE IF NOT EXISTS PayloadToPiece ( + PieceCid BLOB, + PayloadMultihash BLOB, + BlockOffset BIGINT, + BlockSize BIGINT, + PRIMARY KEY (PayloadMultihash, PieceCid) +); + +CREATE INDEX piece_to_payload ON PayloadToPiece(PieceCid, PayloadMultihash); \ No newline at end of file diff --git a/lib/indexing/indexstore/indexstore.go b/lib/indexing/indexstore/indexstore.go new file mode 100644 index 000000000..5af4a5ce0 --- /dev/null +++ b/lib/indexing/indexstore/indexstore.go @@ -0,0 +1,359 @@ +package indexstore + +import ( + "context" + _ "embed" + "errors" + "fmt" + "strings" + "time" + + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "github.com/multiformats/go-multihash" + "github.com/yugabyte/gocql" + "golang.org/x/sync/errgroup" + "golang.org/x/xerrors" +) + +//go:embed create.cql +var createCQL string + +var log = logging.Logger("cassandra") + +type settings struct { + // Number of records per insert batch + InsertBatchSize int + // Number of concurrent inserts to split AddIndex/DeleteIndex calls to + InsertConcurrency int +} + +type IndexStore struct { + settings settings + cluster *gocql.ClusterConfig + session *gocql.Session + ctx context.Context +} + +type OffsetSize struct { + // Offset is the offset into the CAR file of the section, where a section + // is
+ Offset uint64 `json:"offset"` + // Size is the size of the block data (not the whole section) + Size uint64 `json:"size"` +} + +type Record struct { + Cid cid.Cid `json:"cid"` + OffsetSize `json:"offsetsize"` +} + +// Probability of a collision in two 20 byte hashes (birthday problem): +// 2^(20*8/2) = 1.4 x 10^24 +const multihashLimitBytes = 20 + +// trimMultihash trims the multihash to the last multihashLimitBytes bytes +func trimMultihash(mh multihash.Multihash) []byte { + var idx int + if len(mh) > multihashLimitBytes { + idx = len(mh) - multihashLimitBytes + } + return mh[idx:] +} + +var ErrNotFound = errors.New("not found") + +func normalizeMultihashError(m multihash.Multihash, err error) error { + if err == nil { + return nil + } + if isNotFoundErr(err) { + return fmt.Errorf("multihash %s: %w", m, ErrNotFound) + } + return err +} + +func isNotFoundErr(err error) bool { + if err == nil { + return false + } + + if errors.Is(err, gocql.ErrNotFound) { + return true + } + + // Unfortunately it seems like the Cassandra driver doesn't always return + // a specific not found error type, so we need to rely on string parsing + return strings.Contains(strings.ToLower(err.Error()), "not found") +} + +func NewIndexStore(hosts []string) (*IndexStore, error) { + if len(hosts) == 0 { + return nil, xerrors.Errorf("no hosts provided for cassandra") + } + + cluster := gocql.NewCluster(hosts...) + cluster.Timeout = time.Minute + cluster.Keyspace = "curio" + + store := &IndexStore{ + cluster: cluster, + } + + return store, store.Start(context.Background()) +} + +func (i *IndexStore) Start(ctx context.Context) error { + log.Info("Starting cassandra DB") + // Create cassandra keyspace + session, err := i.cluster.CreateSession() + if err != nil { + return xerrors.Errorf("creating cassandra session: %w", err) + + } + query := `CREATE KEYSPACE IF NOT EXISTS ` + i.cluster.Keyspace + + ` WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }` + err = session.Query(query).WithContext(ctx).Exec() + if err != nil { + return xerrors.Errorf("creating cassandra keyspace: %w", err) + } + + lines := strings.Split(createCQL, ";") + for _, line := range lines { + line = strings.Trim(line, "\n \t") + if line == "" { + continue + } + log.Debug(line) + err := session.Query(line).WithContext(ctx).Exec() + if err != nil { + return xerrors.Errorf("creating tables: executing\n%s\n%w", line, err) + } + } + + i.session = session + i.ctx = ctx + + log.Infow("Successfully started Cassandra DB") + return nil +} + +// AddIndex adds multihash -> piece cid mappings, along with offset / size information for the piece. +// It takes a context, the piece cid, and a slice of Record structs as arguments. +// It returns an error if any error occurs during the execution. +func (i *IndexStore) AddIndex(ctx context.Context, pieceCid cid.Cid, records []Record) error { + + Qry := `INSERT INTO PayloadToPiece (PieceCid, PayloadMultihash, BlockOffset, BlockSize) VALUES (?, ?, ?, ?)` + pieceCidBytes := pieceCid.Bytes() + + batchSize := len(records) / i.settings.InsertConcurrency // split the slice into go-routine batches + + if batchSize == 0 { + batchSize = len(records) + } + + log.Debugw("addIndex call", "BatchSize", batchSize, "Total Records", len(records)) + + var eg errgroup.Group + + // Batch to allow multiple goroutines concurrency to do batch inserts + // These batches will be further batched based on InsertBatchSize in the + // goroutines for each BatchInsert operation + for start := 0; start < len(records); start += batchSize { + start := start + end := start + batchSize + if end >= len(records) { + end = len(records) + } + + eg.Go(func() error { + var batch *gocql.Batch + recsb := records[start:end] + // Running a loop on all instead of creating batches require less memory + for allIdx, rec := range recsb { + if batch == nil { + batch = i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + batch.Entries = make([]gocql.BatchEntry, 0, i.settings.InsertBatchSize) + } + + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: Qry, + Args: []any{pieceCidBytes, rec.Cid.Hash(), rec.Offset, rec.Size}, + Idempotent: true, + }) + + if allIdx == len(recsb)-1 || len(batch.Entries) == i.settings.InsertBatchSize { + err := func() error { + defer func(start time.Time) { + log.Debugw("addIndex Batch Insert", "took", time.Since(start), "entries", len(batch.Entries)) + }(time.Now()) + + err := i.session.ExecuteBatch(batch) + if err != nil { + return fmt.Errorf("executing batch insert for piece %s: %w", pieceCid, err) + } + return nil + }() + if err != nil { + return err + } + batch = nil + } + } + return nil + }) + } + + err := eg.Wait() + if err != nil { + return err + } + + return nil +} + +// RemoveIndexes removes all multihash -> piece cid mappings, and all +// offset / size information for the piece. +func (i *IndexStore) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error { + Qry := `DELETE FROM PayloadToPiece WHERE PayloadMultihash = ? AND PieceCid = ?` + pieceCidBytes := pieceCid.Bytes() + + // Get multihashes for piece + recs, err := i.GetIndex(ctx, pieceCid) + if err != nil { + return fmt.Errorf("removing indexes for piece %s: getting recs: %w", pieceCid, err) + } + + // Batch delete with concurrency + var eg errgroup.Group + for j := 0; j < i.settings.InsertConcurrency; j++ { + eg.Go(func() error { + var batch *gocql.Batch + var num int + select { + case <-ctx.Done(): + return ctx.Err() + case rec, ok := <-recs: + if !ok { + // Finished adding all the queued items, exit the thread + err := i.session.ExecuteBatch(batch) + if err != nil { + return fmt.Errorf("executing batch delete for piece %s: %w", pieceCid, err) + } + return nil + } + + if batch == nil { + batch = i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + batch.Entries = make([]gocql.BatchEntry, 0, i.settings.InsertBatchSize) + } + + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: Qry, + Args: []any{pieceCidBytes, rec.Cid.Hash(), rec.Offset, rec.Size}, + Idempotent: true, + }) + + num++ + + if num == i.settings.InsertBatchSize { + err := i.session.ExecuteBatch(batch) + if err != nil { + return fmt.Errorf("executing batch delete for piece %s: %w", pieceCid, err) + } + } + + } + return nil + }) + } + err = eg.Wait() + if err != nil { + return err + } + + return nil +} + +type IndexRecord struct { + Record + Error error `json:"error,omitempty"` +} + +// GetIndex retrieves the multihashes and offset/size information for a given piece CID. +// It returns a channel of `IndexRecord` structs, which include the CID and the offset/size information. +// If no multihashes are found for the piece, it returns an error. +func (i *IndexStore) GetIndex(ctx context.Context, pieceCid cid.Cid) (<-chan IndexRecord, error) { + qry := `SELECT PayloadMultihash, BlockOffset, BlockSize FROM PayloadToPiece WHERE PieceCid = ?` + iter := i.session.Query(qry, pieceCid.Bytes()).WithContext(ctx).Iter() + + records := make(chan IndexRecord) + + parseRecord := func(payload []byte, off, s uint64) { + _, pmh, err := multihash.MHFromBytes(payload) + if err != nil { + records <- IndexRecord{Error: err} + return + } + + records <- IndexRecord{ + Record: Record{ + Cid: cid.NewCidV1(cid.Raw, pmh), + OffsetSize: OffsetSize{ + Offset: off, + Size: s, + }, + }, + } + } + + var payloadMHBz []byte + var offset, size uint64 + + // Try to find first record. If not found then we don't have index + // for this piece + found := iter.Scan(&payloadMHBz, &offset, &size) + if !found { + return nil, fmt.Errorf("no multihashed found for piece %s", pieceCid.String()) + } + parseRecord(payloadMHBz, offset, size) + + go func() { + defer close(records) + + for iter.Scan(&payloadMHBz, &offset, &size) { + parseRecord(payloadMHBz, offset, size) + } + + if err := iter.Close(); err != nil { + err = fmt.Errorf("getting piece index for piece %s: %w", pieceCid, err) + records <- IndexRecord{Error: err} + } + }() + + return records, nil +} + +// PiecesContainingMultihash gets all pieces that contain a multihash (used when retrieving by payload CID) +func (i *IndexStore) PiecesContainingMultihash(ctx context.Context, m multihash.Multihash) ([]cid.Cid, error) { + var pcids []cid.Cid + var bz []byte + qry := `SELECT PieceCid FROM PayloadToPiece WHERE PayloadMultihash = ?` + iter := i.session.Query(qry, trimMultihash(m)).WithContext(ctx).Iter() + for iter.Scan(&bz) { + pcid, err := cid.Parse(bz) + if err != nil { + return nil, fmt.Errorf("parsing piece cid: %w", err) + } + pcids = append(pcids, pcid) + } + if err := iter.Close(); err != nil { + return nil, fmt.Errorf("getting pieces containing multihash %s: %w", m, err) + } + + // No pieces found for multihash, return a "not found" error + if len(pcids) == 0 { + return nil, normalizeMultihashError(m, ErrNotFound) + } + return pcids, nil +} diff --git a/lib/libp2p/libp2p.go b/lib/libp2p/libp2p.go new file mode 100644 index 000000000..78c5688c1 --- /dev/null +++ b/lib/libp2p/libp2p.go @@ -0,0 +1,207 @@ +package libp2p + +import ( + "context" + "fmt" + "strings" + + logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/metrics" + "github.com/libp2p/go-libp2p/core/peer" + basichost "github.com/libp2p/go-libp2p/p2p/host/basic" + "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" + "github.com/multiformats/go-multiaddr" + mamask "github.com/whyrusleeping/multiaddr-filter" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + + "github.com/filecoin-project/curio/build" + "github.com/filecoin-project/curio/harmony/harmonydb" +) + +var log = logging.Logger("curio-libp2p") + +func NewLibp2pHost(ctx context.Context, db *harmonydb.DB, miners []string) ([]host.Host, error) { + cfg, err := getCfg(ctx, db, miners) + if err != nil { + return nil, err + } + + var ret []host.Host + + for miner, c := range cfg { + pstore, err := pstoremem.NewPeerstore() + if err != nil { + return nil, fmt.Errorf("creating peer store: %w", err) + } + + pubK := c.priv.GetPublic() + id, err := peer.IDFromPublicKey(pubK) + if err != nil { + return nil, fmt.Errorf("getting peer ID: %w", err) + } + + err = pstore.AddPrivKey(id, c.priv) + if err != nil { + return nil, fmt.Errorf("adding private key to peerstore: %w", err) + } + err = pstore.AddPubKey(id, pubK) + if err != nil { + return nil, fmt.Errorf("adding public key to peerstore: %w", err) + } + + addrFactory, err := MakeAddrsFactory(c.AnnounceAddr, c.NoAnnounceAddr) + if err != nil { + return nil, fmt.Errorf("creating address factory: %w", err) + } + + opts := []libp2p.Option{ + libp2p.DefaultTransports, + libp2p.ListenAddrs(c.ListenAddr...), + libp2p.AddrsFactory(addrFactory), + libp2p.Peerstore(pstore), + libp2p.UserAgent("curio-" + build.UserVersion()), + libp2p.Ping(true), + libp2p.DisableRelay(), + libp2p.EnableNATService(), + libp2p.BandwidthReporter(metrics.NewBandwidthCounter()), + } + + h, err := libp2p.New(opts...) + if err != nil { + return nil, xerrors.Errorf("creating libp2p host: %w", err) + } + + // Start listening + err = h.Network().Listen(c.ListenAddr...) + if err != nil { + return nil, xerrors.Errorf("failed to listen on addresses: %w", err) + } + + log.Infof("Libp2p started listening for miner %s", miner.String()) + ret = append(ret, h) + } + + return ret, err + +} + +type libp2pCfg struct { + priv crypto.PrivKey + ListenAddr []multiaddr.Multiaddr + AnnounceAddr []multiaddr.Multiaddr + NoAnnounceAddr []multiaddr.Multiaddr +} + +func getCfg(ctx context.Context, db *harmonydb.DB, miners []string) (map[address.Address]*libp2pCfg, error) { + mm := make(map[int64]address.Address) + var ms []int64 + + for _, miner := range miners { + maddr, err := address.NewFromString(miner) + if err != nil { + return nil, err + } + mid, err := address.IDFromAddress(maddr) + if err != nil { + return nil, err + } + mm[int64(mid)] = maddr + ms = append(ms, int64(mid)) + } + + var cfgs []struct { + SpID int64 `db:"sp_id"` + Key []byte `db:"priv_key"` + ListenAddr string `db:"listen_address"` + AnnounceAddr string `db:"announce_address"` + NoAnnounceAddr string `db:"no_announce_address"` + } + err := db.Select(ctx, &cfgs, `SELECT sp_id, priv_key, listen_address, announce_address FROM libp2p_keys WHERE sp_id = ANY($1)`, ms) + if err != nil { + return nil, xerrors.Errorf("getting libp2p details from DB: %w", err) + } + + if len(cfgs) != len(miners) { + return nil, fmt.Errorf("mismatched number of miners and libp2p configurations") + } + + ret := make(map[address.Address]*libp2pCfg) + + for _, cfg := range cfgs { + p, err := crypto.UnmarshalPrivateKey(cfg.Key) + if err != nil { + return nil, xerrors.Errorf("unmarshaling private key: %w", err) + } + + ret[mm[cfg.SpID]] = &libp2pCfg{ + priv: p, + } + + la := strings.Split(cfg.ListenAddr, ",") + for _, l := range la { + listenAddr, err := multiaddr.NewMultiaddr(l) + if err != nil { + return nil, xerrors.Errorf("parsing listen address: %w", err) + } + ret[mm[cfg.SpID]].ListenAddr = append(ret[mm[cfg.SpID]].ListenAddr, listenAddr) + } + + aa := strings.Split(cfg.AnnounceAddr, ",") + for _, a := range aa { + announceAddr, err := multiaddr.NewMultiaddr(a) + if err != nil { + return nil, xerrors.Errorf("parsing announce address: %w", err) + } + ret[mm[cfg.SpID]].AnnounceAddr = append(ret[mm[cfg.SpID]].AnnounceAddr, announceAddr) + } + + naa := strings.Split(cfg.NoAnnounceAddr, ",") + for _, na := range naa { + noAnnounceAddr, err := multiaddr.NewMultiaddr(na) + if err != nil { + return nil, xerrors.Errorf("parsing no announce address: %w", err) + } + ret[mm[cfg.SpID]].NoAnnounceAddr = append(ret[mm[cfg.SpID]].NoAnnounceAddr, noAnnounceAddr) + } + } + + return ret, nil +} + +func MakeAddrsFactory(announceAddrs, noAnnounce []multiaddr.Multiaddr) (basichost.AddrsFactory, error) { + filters := multiaddr.NewFilters() + noAnnAddrs := map[string]bool{} + for _, addr := range noAnnounce { + f, err := mamask.NewMask(addr.String()) + if err == nil { + filters.AddFilter(*f, multiaddr.ActionDeny) + continue + } + noAnnAddrs[string(addr.Bytes())] = true + } + + return func(allAddrs []multiaddr.Multiaddr) []multiaddr.Multiaddr { + var addrs []multiaddr.Multiaddr + if len(announceAddrs) > 0 { + addrs = announceAddrs + } else { + addrs = allAddrs + } + + var out []multiaddr.Multiaddr + for _, maddr := range addrs { + // check for exact matches + ok := noAnnAddrs[string(maddr.Bytes())] + // check for /ipcidr matches + if !ok && !filters.AddrBlocked(maddr) { + out = append(out, maddr) + } + } + return out + }, nil +} diff --git a/lib/paths/mocks/index.go b/lib/paths/mocks/index.go index 082d22908..347051491 100644 --- a/lib/paths/mocks/index.go +++ b/lib/paths/mocks/index.go @@ -8,14 +8,14 @@ import ( context "context" reflect "reflect" - gomock "github.com/golang/mock/gomock" - abi "github.com/filecoin-project/go-state-types/abi" paths "github.com/filecoin-project/curio/lib/paths" storiface "github.com/filecoin-project/curio/lib/storiface" fsutil "github.com/filecoin-project/lotus/storage/sealer/fsutil" + + gomock "github.com/golang/mock/gomock" ) // MockSectorIndex is a mock of SectorIndex interface. diff --git a/lib/paths/mocks/pf.go b/lib/paths/mocks/pf.go index 072e47b2e..cb0bb126e 100644 --- a/lib/paths/mocks/pf.go +++ b/lib/paths/mocks/pf.go @@ -8,12 +8,11 @@ import ( io "io" reflect "reflect" - gomock "github.com/golang/mock/gomock" - abi "github.com/filecoin-project/go-state-types/abi" partialfile "github.com/filecoin-project/curio/lib/partialfile" storiface "github.com/filecoin-project/curio/lib/storiface" + gomock "github.com/golang/mock/gomock" ) // MockPartialFileHandler is a mock of PartialFileHandler interface. diff --git a/lib/paths/mocks/store.go b/lib/paths/mocks/store.go index e7c3c184c..b44366c8d 100644 --- a/lib/paths/mocks/store.go +++ b/lib/paths/mocks/store.go @@ -8,14 +8,14 @@ import ( context "context" reflect "reflect" - gomock "github.com/golang/mock/gomock" - cid "github.com/ipfs/go-cid" - abi "github.com/filecoin-project/go-state-types/abi" storiface "github.com/filecoin-project/curio/lib/storiface" fsutil "github.com/filecoin-project/lotus/storage/sealer/fsutil" + + gomock "github.com/golang/mock/gomock" + cid "github.com/ipfs/go-cid" ) // MockStore is a mock of Store interface. diff --git a/market/mk12/libp2pimpl/libp2pimpl.go b/market/mk12/libp2pimpl/libp2pimpl.go new file mode 100644 index 000000000..7a1d56d5c --- /dev/null +++ b/market/mk12/libp2pimpl/libp2pimpl.go @@ -0,0 +1,449 @@ +package libp2pimpl + +import ( + "context" + "encoding/json" + "fmt" + "runtime/debug" + "time" + + "github.com/google/uuid" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/network" + "go.uber.org/zap" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/builtin/v9/market" + + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/curio/market/mk12" + "github.com/filecoin-project/curio/market/mk12/legacytypes" + + "github.com/filecoin-project/lotus/chain/types" +) + +var log = logging.Logger("mk12-net") +var propLog = logging.Logger("mk12-prop") + +const DealProtocolv120ID = "/fil/storage/mk/1.2.0" +const DealProtocolv121ID = "/fil/storage/mk/1.2.1" +const DealStatusV12ProtocolID = "/fil/storage/status/1.2.0" + +// The time limit to read a message from the client when the client opens a stream +const providerReadDeadline = 10 * time.Second + +// The time limit to write a response to the client +const providerWriteDeadline = 10 * time.Second + +func SafeHandle(h network.StreamHandler) network.StreamHandler { + defer func() { + if r := recover(); r != nil { + log.Error("panic occurred", "stack", debug.Stack()) + } + }() + + return h +} + +// DealProvider listens for incoming deal proposals over libp2p +type DealProvider struct { + ctx context.Context + host host.Host + prov *mk12.MK12 + api mk12libp2pAPI + db *harmonydb.DB +} + +type mk12libp2pAPI interface { + StateAccountKey(context.Context, address.Address, types.TipSetKey) (address.Address, error) +} + +func NewDealProvider(h host.Host, db *harmonydb.DB, prov *mk12.MK12, api mk12libp2pAPI) *DealProvider { + p := &DealProvider{ + host: h, + prov: prov, + api: api, + db: db, + } + return p +} + +func (p *DealProvider) Start(ctx context.Context) { + p.ctx = ctx + + // Note that the handling for deal protocol v1.2.0 and v1.2.1 is the same. + // Deal protocol v1.2.1 has a couple of new fields: SkipIPNIAnnounce and + // RemoveUnsealedCopy. + // If a client that supports deal protocol v1.2.0 sends a request to a + // boostd server that supports deal protocol v1.2.1, the DealParams struct + // will be missing these new fields. + // When the DealParams struct is unmarshalled the missing fields will be + // set to false, which maintains the previous behaviour: + // - SkipIPNIAnnounce=false: announce deal to IPNI + // - RemoveUnsealedCopy=false: keep unsealed copy of deal data + p.host.SetStreamHandler(DealProtocolv121ID, SafeHandle(p.handleNewDealStream)) + p.host.SetStreamHandler(DealProtocolv120ID, SafeHandle(p.handleNewDealStream)) + p.host.SetStreamHandler(DealStatusV12ProtocolID, SafeHandle(p.handleNewDealStatusStream)) + + // Handle Query Ask + p.host.SetStreamHandler(legacytypes.AskProtocolID, SafeHandle(p.handleNewAskStream)) + + // Wait for context cancellation + + <-p.ctx.Done() + p.host.RemoveStreamHandler(DealProtocolv121ID) + p.host.RemoveStreamHandler(DealProtocolv120ID) + p.host.RemoveStreamHandler(DealStatusV12ProtocolID) + p.host.RemoveStreamHandler(legacytypes.AskProtocolID) +} + +// Called when the client opens a libp2p stream with a new deal proposal +func (p *DealProvider) handleNewDealStream(s network.Stream) { + start := time.Now() + reqLogUuid := uuid.New() + reqLog := log.With("reqlog-uuid", reqLogUuid.String(), "client-peer", s.Conn().RemotePeer()) + reqLog.Debugw("new deal proposal request") + + defer func() { + err := s.Close() + if err != nil { + reqLog.Infow("closing stream", "err", err) + } + reqLog.Debugw("handled deal proposal request", "duration", time.Since(start).String()) + }() + + // Set a deadline on reading from the stream so it doesn't hang + _ = s.SetReadDeadline(time.Now().Add(providerReadDeadline)) + + // Read the deal proposal from the stream + var proposal mk12.DealParams + err := proposal.UnmarshalCBOR(s) + _ = s.SetReadDeadline(time.Time{}) // Clear read deadline so conn doesn't get closed + if err != nil { + reqLog.Warnw("reading storage deal proposal from stream", "err", err) + return + } + + reqLog = reqLog.With("id", proposal.DealUUID) + reqLog.Infow("received deal proposal") + + // Start executing the deal. + // Note: This method just waits for the deal to be accepted, it doesn't + // wait for deal execution to complete. + startExec := time.Now() + res, err := p.prov.ExecuteDeal(context.Background(), &proposal, s.Conn().RemotePeer()) + reqLog.Debugw("processed deal proposal accept") + if err != nil { + reqLog.Warnw("deal proposal failed", "err", err, "reason", res.Reason) + } + + // Log the response + propLog.Infow("send deal proposal response", + "id", proposal.DealUUID, + "accepted", res.Accepted, + "msg", res.Reason, + "peer id", s.Conn().RemotePeer(), + "client address", proposal.ClientDealProposal.Proposal.Client, + "provider address", proposal.ClientDealProposal.Proposal.Provider, + "piece cid", proposal.ClientDealProposal.Proposal.PieceCID.String(), + "piece size", proposal.ClientDealProposal.Proposal.PieceSize, + "verified", proposal.ClientDealProposal.Proposal.VerifiedDeal, + "label", proposal.ClientDealProposal.Proposal.Label, + "start epoch", proposal.ClientDealProposal.Proposal.StartEpoch, + "end epoch", proposal.ClientDealProposal.Proposal.EndEpoch, + "price per epoch", proposal.ClientDealProposal.Proposal.StoragePricePerEpoch, + "duration", time.Since(startExec).String(), + ) + + // Set a deadline on writing to the stream so it doesn't hang + _ = s.SetWriteDeadline(time.Now().Add(providerWriteDeadline)) + defer s.SetWriteDeadline(time.Time{}) // nolint + + // Write the response to the client + err = cborutil.WriteCborRPC(s, &mk12.DealResponse{Accepted: res.Accepted, Message: res.Reason}) + if err != nil { + reqLog.Warnw("writing deal response", "err", err) + } +} + +func (p *DealProvider) handleNewDealStatusStream(s network.Stream) { + start := time.Now() + reqLogUuid := uuid.New() + reqLog := log.With("reqlog-uuid", reqLogUuid.String(), "client-peer", s.Conn().RemotePeer()) + reqLog.Debugw("new deal status request") + + defer func() { + err := s.Close() + if err != nil { + reqLog.Infow("closing stream", "err", err) + } + reqLog.Debugw("handled deal status request", "duration", time.Since(start).String()) + }() + + // Read the deal status request from the stream + _ = s.SetReadDeadline(time.Now().Add(providerReadDeadline)) + var req mk12.DealStatusRequest + err := req.UnmarshalCBOR(s) + _ = s.SetReadDeadline(time.Time{}) // Clear read deadline so conn doesn't get closed + if err != nil { + reqLog.Warnw("reading deal status request from stream", "err", err) + return + } + reqLog = reqLog.With("id", req.DealUUID) + reqLog.Debugw("received deal status request") + + resp := p.getDealStatus(req, reqLog) + reqLog.Debugw("processed deal status request") + + // Set a deadline on writing to the stream so it doesn't hang + _ = s.SetWriteDeadline(time.Now().Add(providerWriteDeadline)) + defer s.SetWriteDeadline(time.Time{}) // nolint + + if err := cborutil.WriteCborRPC(s, &resp); err != nil { + reqLog.Errorw("failed to write deal status response", "err", err) + } +} + +func (p *DealProvider) getDealStatus(req mk12.DealStatusRequest, reqLog *zap.SugaredLogger) mk12.DealStatusResponse { + errResp := func(err string) mk12.DealStatusResponse { + return mk12.DealStatusResponse{DealUUID: req.DealUUID, Error: err} + } + + var pdeals []struct { + AfterPSD bool `db:"after_psd"` + Sealed bool `db:"sealed"` + Indexed bool `db:"indexed"` + } + + err := p.db.Select(p.ctx, &pdeals, `SELECT + after_psd, + sealed, + indexed + FROM + market_mk12_deal_pipeline + WHERE + uuid = $1;`, req.DealUUID) + + if err != nil { + return errResp(fmt.Sprintf("failed to query the db for deal status: %s", err)) + } + + if len(pdeals) > 1 { + return errResp("found multiple entries for the same UUID, inform the storage provider") + } + + // If deal is still in pipeline + if len(pdeals) == 1 { + pdeal := pdeals[0] + // If PSD is done + if pdeal.AfterPSD { + st, err := p.getSealedDealStatus(p.ctx, req.DealUUID.String(), true) + if err != nil { + reqLog.Errorw("failed to get sealed deal status", "err", err) + return errResp("failed to get sealed deal status") + } + ret := mk12.DealStatusResponse{ + DealUUID: req.DealUUID, + DealStatus: &mk12.DealStatus{ + Error: st.Error, + Status: "Sealing", + SealingStatus: "Sealed", + Proposal: st.Proposal, + SignedProposalCid: st.SignedProposalCID, + PublishCid: &st.PublishCID, + ChainDealID: st.ChainDealID, + }, + IsOffline: st.Offline, + TransferSize: 1, + NBytesReceived: 1, + } + if pdeal.Sealed { + ret.DealStatus.Status = "Sealed" + } + if pdeal.Indexed { + ret.DealStatus.Status = "Sealed and Indexed" + } + } + // ANything before PSD is processing + st, err := p.getSealedDealStatus(p.ctx, req.DealUUID.String(), false) + if err != nil { + reqLog.Errorw("failed to get sealed deal status", "err", err) + return errResp("failed to get sealed deal status") + } + return mk12.DealStatusResponse{ + DealUUID: req.DealUUID, + DealStatus: &mk12.DealStatus{ + Error: st.Error, + Status: "Processing", + SealingStatus: "Not assigned to sector", + Proposal: st.Proposal, + SignedProposalCid: st.SignedProposalCID, + PublishCid: &st.PublishCID, + ChainDealID: st.ChainDealID, + }, + IsOffline: st.Offline, + TransferSize: 1, + NBytesReceived: 1, + } + } + + // If deal is not in deal pipeline + st, err := p.getSealedDealStatus(p.ctx, req.DealUUID.String(), true) + if err != nil { + reqLog.Errorw("failed to get sealed deal status", "err", err) + return errResp("failed to get sealed deal status") + } + + return mk12.DealStatusResponse{ + DealUUID: req.DealUUID, + DealStatus: &mk12.DealStatus{ + Error: st.Error, + Status: "Sealed", + SealingStatus: "Sealed and Indexed", + Proposal: st.Proposal, + SignedProposalCid: st.SignedProposalCID, + PublishCid: &st.PublishCID, + ChainDealID: st.ChainDealID, + }, + IsOffline: st.Offline, + TransferSize: 1, + NBytesReceived: 1, + } +} + +type dealInfo struct { + Offline bool + Error string + Proposal market.DealProposal + SignedProposalCID cid.Cid + ChainDealID abi.DealID + PublishCID cid.Cid +} + +func (p *DealProvider) getSealedDealStatus(ctx context.Context, id string, onChain bool) (dealInfo, error) { + var dealInfos []struct { + Offline bool `db:"offline"` + Error string `db:"error"` + Proposal json.RawMessage `db:"proposal"` + SignedProposalCID string `db:"signed_proposal_cid"` + } + err := p.db.Select(ctx, &dealInfos, `SELECT + offline, + error, + proposal, + signed_proposal_cid + FROM + market_mk12_deals + WHERE + uuid = $1;`, id) + + if err != nil { + return dealInfo{}, xerrors.Errorf("failed to get deal details from DB: %w", err) + } + + if len(dealInfos) != 1 { + return dealInfo{}, xerrors.Errorf("expected 1 row but got %d", len(dealInfos)) + } + + di := dealInfos[0] + + var prop market.DealProposal + err = json.Unmarshal(di.Proposal, &prop) + if err != nil { + return dealInfo{}, xerrors.Errorf("failed to unmarshal deal proposal: %w", err) + } + + spc, err := cid.Parse(di.SignedProposalCID) + if err != nil { + return dealInfo{}, xerrors.Errorf("failed to parse signed proposal CID: %w", err) + } + + ret := dealInfo{ + Offline: di.Offline, + Error: di.Error, + Proposal: prop, + SignedProposalCID: spc, + ChainDealID: abi.DealID(0), + PublishCID: cid.Undef, + } + + if !onChain { + return ret, nil + } + + var cInfos []struct { + ChainDealID int64 `db:"chain_deal_id"` + PublishCID string `db:"publish_cid"` + } + err = p.db.Select(ctx, &dealInfos, `SELECT + chain_deal_id, + publish_cid + FROM + market_mk12_deals + WHERE + uuid = $1;`, id) + + if err != nil { + return dealInfo{}, xerrors.Errorf("failed to get deal details from DB: %w", err) + } + + if len(cInfos) != 1 { + return dealInfo{}, xerrors.Errorf("expected 1 row but got %d", len(dealInfos)) + } + + ci := cInfos[0] + + pc, err := cid.Parse(ci.PublishCID) + if err != nil { + return dealInfo{}, xerrors.Errorf("failed to parse publish CID: %w", err) + } + + ret.PublishCID = pc + ret.ChainDealID = abi.DealID(ci.ChainDealID) + + return ret, nil +} + +func (p *DealProvider) handleNewAskStream(s network.Stream) { + start := time.Now() + reqLog := log.With("client-peer", s.Conn().RemotePeer()) + reqLog.Debugw("new queryAsk request") + + defer func() { + err := s.Close() + if err != nil { + reqLog.Infow("closing stream", "err", err) + } + reqLog.Debugw("handled queryAsk request", "duration", time.Since(start).String()) + }() + + // Read the deal status request from the stream + _ = s.SetReadDeadline(time.Now().Add(providerReadDeadline)) + var req legacytypes.AskRequest + err := req.UnmarshalCBOR(s) + _ = s.SetReadDeadline(time.Time{}) // Clear read deadline so conn doesn't get closed + if err != nil { + reqLog.Warnw("reading queryAsk request from stream", "err", err) + return + } + + var resp legacytypes.AskResponse + + resp.Ask, err = p.prov.GetAsk(p.ctx, req.Miner) + if err != nil { + reqLog.Warnw("failed to get ask from storage provider", "err", err) + } + + // Set a deadline on writing to the stream so it doesn't hang + _ = s.SetWriteDeadline(time.Now().Add(providerWriteDeadline)) + defer s.SetWriteDeadline(time.Time{}) // nolint + + if err := cborutil.WriteCborRPC(s, &resp); err != nil { + reqLog.Errorw("failed to write queryAsk response", "err", err) + } +} diff --git a/market/mk12/types.go b/market/mk12/types.go index 3070b75f3..57245f703 100644 --- a/market/mk12/types.go +++ b/market/mk12/types.go @@ -266,6 +266,8 @@ type ProviderDealState struct { SectorID abi.SectorNumber Offset abi.PaddedPieceSize + Length abi.PaddedPieceSize + // set if there's an error Err string diff --git a/market/storageingest/deal_ingest_seal.go b/market/storageingest/deal_ingest_seal.go index 7a54c712e..99bc7e766 100644 --- a/market/storageingest/deal_ingest_seal.go +++ b/market/storageingest/deal_ingest_seal.go @@ -9,7 +9,6 @@ import ( "net/url" "time" - logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -19,6 +18,7 @@ import ( verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" "github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/network" + logging "github.com/ipfs/go-log/v2" "github.com/filecoin-project/curio/build" "github.com/filecoin-project/curio/deps/config" diff --git a/tasks/gc/pipeline_meta_gc.go b/tasks/gc/pipeline_meta_gc.go index 42fa80540..157b27a7f 100644 --- a/tasks/gc/pipeline_meta_gc.go +++ b/tasks/gc/pipeline_meta_gc.go @@ -31,6 +31,9 @@ func (s *PipelineGC) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done if err := s.cleanupUpgrade(); err != nil { return false, xerrors.Errorf("cleanupUpgrade: %w", err) } + if err := s.cleanupMK12DealPipeline(); err != nil { + return false, xerrors.Errorf("cleanupMK12DealPipeline: %w", err) + } if err := s.cleanupMK12DealPipeline(); err != nil { return false, xerrors.Errorf("cleanupMK12DealPipeline: %w", err) diff --git a/tasks/indexing/task_indexing.go b/tasks/indexing/task_indexing.go index 872cec8ea..835b4e90f 100644 --- a/tasks/indexing/task_indexing.go +++ b/tasks/indexing/task_indexing.go @@ -102,8 +102,8 @@ func (i *IndexingTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do return false, xerrors.Errorf("checking if piece is already indexed: %w", err) } - // Return early if already indexed or should not be indexed - if indexed || !task.ShouldIndex { + // Return early if already indexed + if indexed { err = i.recordCompletion(ctx, task, taskID, false) if err != nil { return false, err @@ -277,7 +277,6 @@ func (i *IndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T } func (i *IndexingTask) TypeDetails() harmonytask.TaskTypeDetails { - return harmonytask.TaskTypeDetails{ Name: "Indexing", Cost: resources.Resources{ diff --git a/tasks/seal/task_movestorage.go b/tasks/seal/task_movestorage.go index d64da0e04..f8e2f32b7 100644 --- a/tasks/seal/task_movestorage.go +++ b/tasks/seal/task_movestorage.go @@ -14,7 +14,7 @@ import ( "github.com/filecoin-project/curio/harmony/taskhelp" ffi2 "github.com/filecoin-project/curio/lib/ffi" "github.com/filecoin-project/curio/lib/paths" - storiface "github.com/filecoin-project/curio/lib/storiface" + "github.com/filecoin-project/curio/lib/storiface" ) type MoveStorageTask struct { diff --git a/tasks/snap/task_movestorage.go b/tasks/snap/task_movestorage.go index a4f941a97..d85171559 100644 --- a/tasks/snap/task_movestorage.go +++ b/tasks/snap/task_movestorage.go @@ -16,7 +16,7 @@ import ( "github.com/filecoin-project/curio/lib/ffi" "github.com/filecoin-project/curio/lib/passcall" "github.com/filecoin-project/curio/lib/paths" - storiface "github.com/filecoin-project/curio/lib/storiface" + "github.com/filecoin-project/curio/lib/storiface" "github.com/filecoin-project/curio/tasks/seal" ) From 9ade420828d3a2f6ca95a93ff27f3ae1da028c1c Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Thu, 8 Aug 2024 15:32:48 +0400 Subject: [PATCH 02/15] fix gen test --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 9b4381f72..4434ba23e 100644 --- a/go.mod +++ b/go.mod @@ -85,6 +85,7 @@ require ( go.opencensus.io v0.24.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 + golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 golang.org/x/net v0.26.0 golang.org/x/sync v0.7.0 golang.org/x/sys v0.23.0 @@ -319,7 +320,6 @@ require ( go.uber.org/mock v0.4.0 // indirect go4.org v0.0.0-20230225012048-214862532bf5 // indirect golang.org/x/crypto v0.25.0 // indirect - golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/term v0.22.0 // indirect golang.org/x/time v0.5.0 // indirect From a180b8c5215e6d69f341c99c15a4a474d3813dbb Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Tue, 13 Aug 2024 19:13:41 +0400 Subject: [PATCH 03/15] poller redesign --- .circleci/config.yml | 7 + deps/config/doc_gen.go | 8 + .../default-curio-configuration.md | 7 + documentation/en/curio-cli/curio.md | 20 ++ go.sum | 1 + harmony/harmonydb/sql/20240228-piece-park.sql | 4 + .../sql/20240730-market-migration.sql | 104 ++++++++- lib/dealdata/urlpiecereader.go | 30 ++- lib/indexing/indexstore/create.cql | 6 +- lib/indexing/indexstore/indexstore.go | 198 ++++++----------- lib/indexing/indexstore/indexstore_test.go | 125 +++++++++++ lib/libp2p/libp2p.go | 207 ------------------ market/mk12/libp2pimpl/libp2pimpl.go | 2 +- market/storageingest/deal_ingest_seal.go | 2 +- market/storageingest/deal_ingest_snap.go | 6 +- tasks/indexing/task_indexing.go | 3 + tasks/piece/task_park_piece.go | 50 ++++- tasks/storage-market/task_psd.go | 2 +- 18 files changed, 419 insertions(+), 363 deletions(-) create mode 100644 lib/indexing/indexstore/indexstore_test.go delete mode 100644 lib/libp2p/libp2p.go diff --git a/.circleci/config.yml b/.circleci/config.yml index b4dadc809..16235ffd3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -283,3 +283,10 @@ workflows: suite: test-all get-params: true resource_class: 2xlarge + - test: + name: test-idxStore + requires: + - build + suite: idxStore + target: "./lib/indexing/indexstore/indexstore_test.go" + get-params: true diff --git a/deps/config/doc_gen.go b/deps/config/doc_gen.go index 664bc799e..2d374eee3 100644 --- a/deps/config/doc_gen.go +++ b/deps/config/doc_gen.go @@ -847,6 +847,14 @@ This will be used to fail the deals which cannot be sealed on time.`, Comment: `SkipCommP can be used to skip doing a commP check before PublishDealMessage is sent on chain Warning: If this check is skipped and there is a commP mismatch, all deals in the +sector will need to be sent again`, + }, + { + Name: "SkipCommP", + Type: "bool", + + Comment: `SkipCommP can be used to skip doing a commP check before PublishDealMessage is sent on chain +Warning: If this check is skipped and there is a commP mismatch, all deals in the sector will need to be sent again`, }, }, diff --git a/documentation/en/configuration/default-curio-configuration.md b/documentation/en/configuration/default-curio-configuration.md index 503b74f37..8d78d5d79 100644 --- a/documentation/en/configuration/default-curio-configuration.md +++ b/documentation/en/configuration/default-curio-configuration.md @@ -493,6 +493,13 @@ description: The default curio configuration # type: []string #NoAnnounceAddresses = [] + # SkipCommP can be used to skip doing a commP check before PublishDealMessage is sent on chain + # Warning: If this check is skipped and there is a commP mismatch, all deals in the + # sector will need to be sent again + # + # type: bool + #SkipCommP = false + [Ingest] # Maximum number of sectors that can be queued waiting for deals to start processing. diff --git a/documentation/en/curio-cli/curio.md b/documentation/en/curio-cli/curio.md index f46bbf73c..48932e662 100644 --- a/documentation/en/curio-cli/curio.md +++ b/documentation/en/curio-cli/curio.md @@ -639,9 +639,15 @@ USAGE: curio market command [command options] [arguments...] COMMANDS: +<<<<<<< HEAD seal start sealing a deal sector early add-url Add URL to fetch data for offline deals help, h Shows a list of commands or help for one command +======= + seal start sealing a deal sector early + import-data Import data for offline deal + help, h Shows a list of commands or help for one command +>>>>>>> 48e953d (poller redesign) OPTIONS: --help, -h show help @@ -661,6 +667,7 @@ OPTIONS: --help, -h show help ``` +<<<<<<< HEAD ### curio market add-url ``` NAME: @@ -674,6 +681,19 @@ OPTIONS: --header HEADER, -H HEADER [ --header HEADER, -H HEADER ] Custom HEADER to include in the HTTP request --url URL, -u URL URL to send the request to --help, -h show help +======= +### curio market import-data +``` +NAME: + curio market import-data - Import data for offline deal + +USAGE: + curio market import-data [command options] + +OPTIONS: + --actor value Specify actor address to start sealing sectors for + --help, -h show help +>>>>>>> 48e953d (poller redesign) ``` ## curio fetch-params diff --git a/go.sum b/go.sum index 96454a26c..231a7dc55 100644 --- a/go.sum +++ b/go.sum @@ -191,6 +191,7 @@ github.com/cpuguy83/go-md2man/v2 v2.0.4 h1:wfIWP927BUkWJb2NmU/kNDYIBTh/ziUX91+lV github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/crackcomm/go-gitignore v0.0.0-20231225121904-e25f5bc08668 h1:ZFUue+PNxmHlu7pYv+IYMtqlaO/0VwaGEqKepZf9JpA= github.com/crackcomm/go-gitignore v0.0.0-20231225121904-e25f5bc08668/go.mod h1:p1d6YEZWvFzEh4KLyvBcVSnrfNDDvK2zfK/4x2v/4pE= +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= github.com/daaku/go.zipexe v1.0.2 h1:Zg55YLYTr7M9wjKn8SY/WcpuuEi+kR2u4E8RhvpyXmk= diff --git a/harmony/harmonydb/sql/20240228-piece-park.sql b/harmony/harmonydb/sql/20240228-piece-park.sql index efd529da7..053145211 100644 --- a/harmony/harmonydb/sql/20240228-piece-park.sql +++ b/harmony/harmonydb/sql/20240228-piece-park.sql @@ -34,7 +34,11 @@ create table parked_piece_refs ( data_headers jsonb not null default '{}', -- host Added in 202240730-market-migrations.sql +<<<<<<< HEAD -- host text, +======= + host text, +>>>>>>> 48e953d (poller redesign) foreign key (piece_id) references parked_pieces(id) on delete cascade ); diff --git a/harmony/harmonydb/sql/20240730-market-migration.sql b/harmony/harmonydb/sql/20240730-market-migration.sql index d936f9bde..be06f70cf 100644 --- a/harmony/harmonydb/sql/20240730-market-migration.sql +++ b/harmony/harmonydb/sql/20240730-market-migration.sql @@ -1,24 +1,39 @@ +-- Table for Mk12 or Boost deals CREATE TABLE market_mk12_deals ( uuid TEXT NOT NULL, +<<<<<<< HEAD created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP AT TIME ZONE 'UTC', +======= + sp_id BIGINT NOT NULL, + + created_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()), + +>>>>>>> 48e953d (poller redesign) signed_proposal_cid TEXT NOT NULL, proposal_signature BYTEA NOT NULL, proposal jsonb NOT NULL, - piece_cid TEXT NOT NULL, - piece_size BIGINT NOT NULL, + offline BOOLEAN NOT NULL, verified BOOLEAN NOT NULL, - sp_id BIGINT NOT NULL, + start_epoch BIGINT NOT NULL, end_epoch BIGINT NOT NULL, + client_peer_id TEXT NOT NULL, + chain_deal_id BIGINT DEFAULT NULL, publish_cid TEXT DEFAULT NULL, + + piece_cid TEXT NOT NULL, + piece_size BIGINT NOT NULL, length BIGINT DEFAULT NULL, + fast_retrieval BOOLEAN NOT NULL, announce_to_ipni BOOLEAN NOT NULL, + url TEXT DEFAULT NULL, url_headers jsonb NOT NULL DEFAULT '{}', + error TEXT DEFAULT NULL, primary key (uuid, sp_id, piece_cid, signed_proposal_cid), @@ -26,6 +41,8 @@ CREATE TABLE market_mk12_deals ( unique (signed_proposal_cid) ); +-- Table for old lotus market deals. This is just for deal +-- which are still alive. It should not be used for any processing CREATE TABLE market_legacy_deals ( signed_proposal_cid TEXT, proposal_signature BYTEA, @@ -46,6 +63,7 @@ CREATE TABLE market_legacy_deals ( primary key (sp_id, piece_cid, signed_proposal_cid) ); +-- This table is used for storing piece metadata (piece indexing) CREATE TABLE market_piece_metadata ( piece_cid TEXT NOT NULL PRIMARY KEY, version INT NOT NULL DEFAULT 2, @@ -57,14 +75,19 @@ CREATE TABLE market_piece_metadata ( unique (piece_cid) ); +-- This table binds the piece metadata to specific deals (piece indexing) CREATE TABLE market_piece_deal ( - id TEXT NOT NULL, + id TEXT NOT NULL, -- (UUID for new deals, PropCID for old) piece_cid TEXT NOT NULL, + boost_deal BOOLEAN NOT NULL, legacy_deal BOOLEAN NOT NULL DEFAULT FALSE, + chain_deal_id BIGINT NOT NULL DEFAULT 0, + sp_id BIGINT NOT NULL, sector_num BIGINT NOT NULL, + piece_offset BIGINT NOT NULL, piece_length BIGINT NOT NULL, raw_size BIGINT NOT NULL, @@ -74,6 +97,7 @@ CREATE TABLE market_piece_deal ( unique (sp_id, id) ); +-- This function is used to insert piece metadata and piece deal (piece indexing) CREATE OR REPLACE FUNCTION process_piece_deal( _id TEXT, _piece_cid TEXT, @@ -108,64 +132,95 @@ VALUES ( END; $$ LANGUAGE plpgsql; +-- Storage Ask for ask protocol CREATE TABLE market_mk12_storage_ask ( + sp_id BIGINT NOT NULL, + price BIGINT NOT NULL, verified_price BIGINT NOT NULL, + min_size BIGINT NOT NULL, max_size BIGINT NOT NULL, - sp_id BIGINT NOT NULL, + created_at BIGINT NOT NULL, expiry BIGINT NOT NULL, + sequence BIGINT NOT NULL, unique (sp_id) ); +-- Used for processing Mk12 deals CREATE TABLE market_mk12_deal_pipeline ( uuid TEXT NOT NULL, sp_id BIGINT NOT NULL, + started BOOLEAN DEFAULT FALSE, + piece_cid TEXT NOT NULL, +<<<<<<< HEAD piece_size BOOLEAN NOT NULL, +======= + piece_size BIGINT NOT NULL, + file_size BIGINT DEFAULT NULL, -- raw piece size + +>>>>>>> 48e953d (poller redesign) offline BOOLEAN NOT NULL, - downloaded BOOLEAN DEFAULT FALSE, + url TEXT DEFAULT NULL, headers jsonb NOT NULL DEFAULT '{}', - file_size BIGINT DEFAULT NULL, + commp_task_id BIGINT DEFAULT NULL, after_commp BOOLEAN DEFAULT FALSE, - find_task_id BIGINT DEFAULT NULL, - after_find BOOLEAN DEFAULT FALSE, + psd_task_id BIGINT DEFAULT NULL, after_psd BOOLEAN DEFAULT FALSE, + psd_wait_time TIMESTAMPTZ, + find_deal_task_id BIGINT DEFAULT NULL, after_find_deal BOOLEAN DEFAULT FALSE, + sector BIGINT, sector_offset BIGINT, + sealed BOOLEAN DEFAULT FALSE, indexed BOOLEAN DEFAULT FALSE, constraint market_mk12_deal_pipeline_identity_key unique (uuid) ); +-- This table can be used to track remote piece for offline deals +-- The entries must be created by users CREATE TABLE market_offline_urls ( piece_cid TEXT NOT NULL, + url TEXT NOT NULL, headers jsonb NOT NULL DEFAULT '{}', + raw_size BIGINT NOT NULL, + unique (piece_cid) ); +-- indexing tracker is separate from CREATE TABLE market_indexing_tasks ( - id TEXT NOT NULL, + uuid TEXT NOT NULL, + sp_id BIGINT NOT NULL, sector_number BIGINT NOT NULL, reg_seal_proof INT NOT NULL, + piece_offset BIGINT NOT NULL, piece_size BIGINT NOT NULL, raw_size BIGINT NOT NULL, piece_cid TEXT NOT NULL, +<<<<<<< HEAD created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP AT TIME ZONE 'UTC', +======= + + created_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()), + +>>>>>>> 48e953d (poller redesign) task_id BIGINT DEFAULT NULL, constraint market_indexing_tasks_identity_key @@ -200,6 +255,35 @@ CREATE TABLE direct_deals ( keep_unsealed_copy BOOLEAN ); +ALTER TABLE parked_piece_refs + ADD COLUMN host text; + +create table file_parked_pieces ( + id bigserial primary key, + created_at timestamp default current_timestamp, + piece_cid text not null, + piece_padded_size bigint not null, + piece_raw_size bigint not null, + complete boolean not null default false, + task_id bigint default null, + cleanup_task_id bigint default null, + unique (piece_cid) +); + +/* + * This table is used to keep track of the references to the file parked pieces + * so that we can delete them when they are no longer needed. + * + * All references into the file_parked_pieces table should be done through this table. + */ +create table file_parked_piece_refs ( + ref_id bigserial primary key, + piece_id bigint not null, + data_url text not null, + node text not null, + foreign key (piece_id) references file_parked_pieces(id) on delete cascade +); + diff --git a/lib/dealdata/urlpiecereader.go b/lib/dealdata/urlpiecereader.go index a7c5d683d..938c59caa 100644 --- a/lib/dealdata/urlpiecereader.go +++ b/lib/dealdata/urlpiecereader.go @@ -4,6 +4,7 @@ import ( "io" "net/http" "net/url" + "os" "golang.org/x/xerrors" ) @@ -62,8 +63,33 @@ func (u *UrlPieceReader) Read(p []byte) (n int, err error) { return 0, xerrors.Errorf("a non 200 response code: %s", resp.Status) } - // Set 'active' to the response body - u.active = resp.Body + if goUrl.Scheme == "file" { + fileUrl := goUrl.Path + file, err := os.Open(fileUrl) + if err != nil { + return 0, xerrors.Errorf("error opening file: %w", err) + } + u.active = file + } else { + req, err := http.NewRequest(http.MethodGet, u.Url, nil) + if err != nil { + return 0, xerrors.Errorf("error creating request: %w", err) + } + // Add custom headers for security and authentication + req.Header = u.Headers + // Create a client and make the request + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return 0, xerrors.Errorf("error making GET request: %w", err) + } + if resp.StatusCode != 200 { + return 0, xerrors.Errorf("a non 200 response code: %s", resp.Status) + } + + // Set 'active' to the response body + u.active = resp.Body + } } // Calculate the maximum number of bytes we can read without exceeding RawSize diff --git a/lib/indexing/indexstore/create.cql b/lib/indexing/indexstore/create.cql index 21157bb83..6f29a872e 100644 --- a/lib/indexing/indexstore/create.cql +++ b/lib/indexing/indexstore/create.cql @@ -1,9 +1,7 @@ CREATE TABLE IF NOT EXISTS PayloadToPiece ( - PieceCid BLOB, + PieceCid BLOB, -- 20 bytes trimmed PayloadMultihash BLOB, BlockOffset BIGINT, BlockSize BIGINT, PRIMARY KEY (PayloadMultihash, PieceCid) -); - -CREATE INDEX piece_to_payload ON PayloadToPiece(PieceCid, PayloadMultihash); \ No newline at end of file +); \ No newline at end of file diff --git a/lib/indexing/indexstore/indexstore.go b/lib/indexing/indexstore/indexstore.go index 5af4a5ce0..35807792a 100644 --- a/lib/indexing/indexstore/indexstore.go +++ b/lib/indexing/indexstore/indexstore.go @@ -14,8 +14,12 @@ import ( "github.com/yugabyte/gocql" "golang.org/x/sync/errgroup" "golang.org/x/xerrors" + + "github.com/filecoin-project/curio/deps/config" ) +const keyspace = "curio" + //go:embed create.cql var createCQL string @@ -87,17 +91,20 @@ func isNotFoundErr(err error) bool { return strings.Contains(strings.ToLower(err.Error()), "not found") } -func NewIndexStore(hosts []string) (*IndexStore, error) { +func NewIndexStore(hosts []string, cfg *config.CurioConfig) (*IndexStore, error) { if len(hosts) == 0 { return nil, xerrors.Errorf("no hosts provided for cassandra") } cluster := gocql.NewCluster(hosts...) cluster.Timeout = time.Minute - cluster.Keyspace = "curio" store := &IndexStore{ cluster: cluster, + settings: settings{ + InsertBatchSize: cfg.Market.StorageMarketConfig.Indexing.InsertBatchSize, + InsertConcurrency: cfg.Market.StorageMarketConfig.Indexing.InsertConcurrency, + }, } return store, store.Start(context.Background()) @@ -111,13 +118,23 @@ func (i *IndexStore) Start(ctx context.Context) error { return xerrors.Errorf("creating cassandra session: %w", err) } - query := `CREATE KEYSPACE IF NOT EXISTS ` + i.cluster.Keyspace + + query := `CREATE KEYSPACE IF NOT EXISTS ` + keyspace + ` WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 3 }` err = session.Query(query).WithContext(ctx).Exec() if err != nil { return xerrors.Errorf("creating cassandra keyspace: %w", err) } + session.Close() + + // Recreate session with the keyspace + i.cluster.Keyspace = keyspace + session, err = i.cluster.CreateSession() + if err != nil { + return xerrors.Errorf("creating cassandra session: %w", err) + + } + lines := strings.Split(createCQL, ";") for _, line := range lines { line = strings.Trim(line, "\n \t") @@ -141,48 +158,44 @@ func (i *IndexStore) Start(ctx context.Context) error { // AddIndex adds multihash -> piece cid mappings, along with offset / size information for the piece. // It takes a context, the piece cid, and a slice of Record structs as arguments. // It returns an error if any error occurs during the execution. -func (i *IndexStore) AddIndex(ctx context.Context, pieceCid cid.Cid, records []Record) error { - +func (i *IndexStore) AddIndex(ctx context.Context, pieceCid cid.Cid, recordsChan chan Record) error { Qry := `INSERT INTO PayloadToPiece (PieceCid, PayloadMultihash, BlockOffset, BlockSize) VALUES (?, ?, ?, ?)` pieceCidBytes := pieceCid.Bytes() - batchSize := len(records) / i.settings.InsertConcurrency // split the slice into go-routine batches - - if batchSize == 0 { - batchSize = len(records) - } - - log.Debugw("addIndex call", "BatchSize", batchSize, "Total Records", len(records)) - var eg errgroup.Group - // Batch to allow multiple goroutines concurrency to do batch inserts - // These batches will be further batched based on InsertBatchSize in the + // Start worker threads based on InsertConcurrency value + // These workers will be further batch based on InsertBatchSize in the // goroutines for each BatchInsert operation - for start := 0; start < len(records); start += batchSize { - start := start - end := start + batchSize - if end >= len(records) { - end = len(records) - } - + for worker := 0; worker < i.settings.InsertConcurrency; worker++ { eg.Go(func() error { var batch *gocql.Batch - recsb := records[start:end] // Running a loop on all instead of creating batches require less memory - for allIdx, rec := range recsb { + for { if batch == nil { batch = i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) batch.Entries = make([]gocql.BatchEntry, 0, i.settings.InsertBatchSize) } + rec, ok := <-recordsChan + + if !ok { + if len(batch.Entries) > 0 { + err := i.session.ExecuteBatch(batch) + if err != nil { + return fmt.Errorf("executing batch insert for piece %s: %w", pieceCid, err) + } + } + return nil + } + batch.Entries = append(batch.Entries, gocql.BatchEntry{ Stmt: Qry, - Args: []any{pieceCidBytes, rec.Cid.Hash(), rec.Offset, rec.Size}, + Args: []any{pieceCidBytes, trimMultihash(rec.Cid.Hash()), rec.Offset, rec.Size}, Idempotent: true, }) - if allIdx == len(recsb)-1 || len(batch.Entries) == i.settings.InsertBatchSize { + if len(batch.Entries) == i.settings.InsertBatchSize { err := func() error { defer func(start time.Time) { log.Debugw("addIndex Batch Insert", "took", time.Since(start), "entries", len(batch.Entries)) @@ -200,7 +213,6 @@ func (i *IndexStore) AddIndex(ctx context.Context, pieceCid cid.Cid, records []R batch = nil } } - return nil }) } @@ -215,123 +227,51 @@ func (i *IndexStore) AddIndex(ctx context.Context, pieceCid cid.Cid, records []R // RemoveIndexes removes all multihash -> piece cid mappings, and all // offset / size information for the piece. func (i *IndexStore) RemoveIndexes(ctx context.Context, pieceCid cid.Cid) error { - Qry := `DELETE FROM PayloadToPiece WHERE PayloadMultihash = ? AND PieceCid = ?` + delQry := `DELETE FROM PayloadToPiece WHERE PayloadMultihash = ? AND PieceCid = ?` pieceCidBytes := pieceCid.Bytes() // Get multihashes for piece - recs, err := i.GetIndex(ctx, pieceCid) - if err != nil { - return fmt.Errorf("removing indexes for piece %s: getting recs: %w", pieceCid, err) - } - - // Batch delete with concurrency - var eg errgroup.Group - for j := 0; j < i.settings.InsertConcurrency; j++ { - eg.Go(func() error { - var batch *gocql.Batch - var num int - select { - case <-ctx.Done(): - return ctx.Err() - case rec, ok := <-recs: - if !ok { - // Finished adding all the queued items, exit the thread - err := i.session.ExecuteBatch(batch) - if err != nil { - return fmt.Errorf("executing batch delete for piece %s: %w", pieceCid, err) - } - return nil - } - - if batch == nil { - batch = i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) - batch.Entries = make([]gocql.BatchEntry, 0, i.settings.InsertBatchSize) - } - - batch.Entries = append(batch.Entries, gocql.BatchEntry{ - Stmt: Qry, - Args: []any{pieceCidBytes, rec.Cid.Hash(), rec.Offset, rec.Size}, - Idempotent: true, - }) + getQry := `SELECT PayloadMultihash FROM PayloadToPiece WHERE PieceCid = ?` + iter := i.session.Query(getQry, pieceCidBytes).WithContext(ctx).Iter() - num++ + // Create batch for deletion + batch := i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + batch.Entries = make([]gocql.BatchEntry, 0, i.settings.InsertBatchSize) - if num == i.settings.InsertBatchSize { - err := i.session.ExecuteBatch(batch) - if err != nil { - return fmt.Errorf("executing batch delete for piece %s: %w", pieceCid, err) - } - } + var payloadMHBz []byte + for iter.Scan(&payloadMHBz) { + // Add each delete operation to batch + batch.Entries = append(batch.Entries, gocql.BatchEntry{ + Stmt: delQry, + Args: []any{payloadMHBz, pieceCidBytes}, + Idempotent: true, + }) + // Execute batch + if len(batch.Entries) >= i.settings.InsertBatchSize { + err := i.session.ExecuteBatch(batch) + if err != nil { + return xerrors.Errorf("executing batch delete for piece %s: %w", pieceCid, err) } - return nil - }) - } - err = eg.Wait() - if err != nil { - return err + // Create a new batch after executing + batch = i.session.NewBatch(gocql.UnloggedBatch).WithContext(ctx) + batch.Entries = make([]gocql.BatchEntry, 0, i.settings.InsertBatchSize) + } } - return nil -} - -type IndexRecord struct { - Record - Error error `json:"error,omitempty"` -} - -// GetIndex retrieves the multihashes and offset/size information for a given piece CID. -// It returns a channel of `IndexRecord` structs, which include the CID and the offset/size information. -// If no multihashes are found for the piece, it returns an error. -func (i *IndexStore) GetIndex(ctx context.Context, pieceCid cid.Cid) (<-chan IndexRecord, error) { - qry := `SELECT PayloadMultihash, BlockOffset, BlockSize FROM PayloadToPiece WHERE PieceCid = ?` - iter := i.session.Query(qry, pieceCid.Bytes()).WithContext(ctx).Iter() - - records := make(chan IndexRecord) - - parseRecord := func(payload []byte, off, s uint64) { - _, pmh, err := multihash.MHFromBytes(payload) + // Execute remaining operations in the batch + if len(batch.Entries) > 0 { + err := i.session.ExecuteBatch(batch) if err != nil { - records <- IndexRecord{Error: err} - return - } - - records <- IndexRecord{ - Record: Record{ - Cid: cid.NewCidV1(cid.Raw, pmh), - OffsetSize: OffsetSize{ - Offset: off, - Size: s, - }, - }, + return xerrors.Errorf("executing batch delete for piece %s: %w", pieceCid, err) } } - var payloadMHBz []byte - var offset, size uint64 - - // Try to find first record. If not found then we don't have index - // for this piece - found := iter.Scan(&payloadMHBz, &offset, &size) - if !found { - return nil, fmt.Errorf("no multihashed found for piece %s", pieceCid.String()) + if err := iter.Close(); err != nil { + return xerrors.Errorf("Getting piece index for piece %s: %w", pieceCid, err) } - parseRecord(payloadMHBz, offset, size) - go func() { - defer close(records) - - for iter.Scan(&payloadMHBz, &offset, &size) { - parseRecord(payloadMHBz, offset, size) - } - - if err := iter.Close(); err != nil { - err = fmt.Errorf("getting piece index for piece %s: %w", pieceCid, err) - records <- IndexRecord{Error: err} - } - }() - - return records, nil + return nil } // PiecesContainingMultihash gets all pieces that contain a multihash (used when retrieving by payload CID) diff --git a/lib/indexing/indexstore/indexstore_test.go b/lib/indexing/indexstore/indexstore_test.go new file mode 100644 index 000000000..79e174fb8 --- /dev/null +++ b/lib/indexing/indexstore/indexstore_test.go @@ -0,0 +1,125 @@ +package indexstore + +import ( + "context" + "io" + "os" + "testing" + "time" + + carv2 "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/blockstore" + "github.com/multiformats/go-multihash" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + "github.com/filecoin-project/go-commp-utils/writer" + + "github.com/filecoin-project/curio/deps/config" + "github.com/filecoin-project/curio/lib/testutils" +) + +func envElse(env, els string) string { + if v := os.Getenv(env); v != "" { + return v + } + return els +} + +func TestNewIndexStore(t *testing.T) { + // Set up the indexStore for testing + + ctx := context.Background() + cfg := config.DefaultCurioConfig() + + idxStore, err := NewIndexStore([]string{envElse("CURIO_HARMONYDB_HOSTS", "127.0.0.1")}, cfg) + require.NoError(t, err) + + // Create a car file and calculate commP + dir, err := os.MkdirTemp(os.TempDir(), "curio-indexstore") + require.NoError(t, err) + defer func() { + _ = os.RemoveAll(dir) + }() + + rf, err := testutils.CreateRandomFile(dir, int(time.Now().Unix()), 8000000) + require.NoError(t, err) + + caropts := []carv2.Option{ + blockstore.WriteAsCarV1(true), + } + + _, cn, err := testutils.CreateDenseCARWith(dir, rf, 1024, 1024, caropts) + require.NoError(t, err) + + f, err := os.Open(cn) + require.NoError(t, err) + + defer func() { + _ = f.Close() + }() + + w := &writer.Writer{} + _, err = io.CopyBuffer(w, f, make([]byte, writer.CommPBuf)) + require.NoError(t, err) + + commp, err := w.Sum() + require.NoError(t, err) + + _, err = f.Seek(0, io.SeekStart) + require.NoError(t, err) + + // Create recods + dealCfg := cfg.Market.StorageMarketConfig + chanSize := dealCfg.Indexing.InsertConcurrency * dealCfg.Indexing.InsertBatchSize + + recs := make(chan Record, chanSize) + opts := []carv2.Option{carv2.ZeroLengthSectionAsEOF(true)} + blockReader, err := carv2.NewBlockReader(f, opts...) + require.NoError(t, err) + + // Add index to the store + var eg errgroup.Group + eg.Go(func() error { + serr := idxStore.AddIndex(ctx, commp.PieceCID, recs) + return serr + }) + + var m multihash.Multihash + i := 0 + + blockMetadata, err := blockReader.SkipNext() + for err == nil { + if i == 0 { + m = blockMetadata.Hash() + } + recs <- Record{ + Cid: blockMetadata.Cid, + OffsetSize: OffsetSize{ + Offset: blockMetadata.SourceOffset, + Size: blockMetadata.Size, + }, + } + i++ + + blockMetadata, err = blockReader.SkipNext() + } + require.Error(t, io.EOF) + close(recs) + err = eg.Wait() + require.NoError(t, err) + + // Try to find a multihash + pcids, err := idxStore.PiecesContainingMultihash(ctx, m) + require.NoError(t, err) + require.Len(t, pcids, 1) + require.Equal(t, pcids[0], commp.PieceCID) + + // Remove all indexes from the store + err = idxStore.RemoveIndexes(ctx, pcids[0]) + require.NoError(t, err) + + // Drop the table + err = idxStore.session.Query("DROP TABLE PayloadToPiece").Exec() + require.NoError(t, err) +} diff --git a/lib/libp2p/libp2p.go b/lib/libp2p/libp2p.go deleted file mode 100644 index 78c5688c1..000000000 --- a/lib/libp2p/libp2p.go +++ /dev/null @@ -1,207 +0,0 @@ -package libp2p - -import ( - "context" - "fmt" - "strings" - - logging "github.com/ipfs/go-log/v2" - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/metrics" - "github.com/libp2p/go-libp2p/core/peer" - basichost "github.com/libp2p/go-libp2p/p2p/host/basic" - "github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem" - "github.com/multiformats/go-multiaddr" - mamask "github.com/whyrusleeping/multiaddr-filter" - "golang.org/x/xerrors" - - "github.com/filecoin-project/go-address" - - "github.com/filecoin-project/curio/build" - "github.com/filecoin-project/curio/harmony/harmonydb" -) - -var log = logging.Logger("curio-libp2p") - -func NewLibp2pHost(ctx context.Context, db *harmonydb.DB, miners []string) ([]host.Host, error) { - cfg, err := getCfg(ctx, db, miners) - if err != nil { - return nil, err - } - - var ret []host.Host - - for miner, c := range cfg { - pstore, err := pstoremem.NewPeerstore() - if err != nil { - return nil, fmt.Errorf("creating peer store: %w", err) - } - - pubK := c.priv.GetPublic() - id, err := peer.IDFromPublicKey(pubK) - if err != nil { - return nil, fmt.Errorf("getting peer ID: %w", err) - } - - err = pstore.AddPrivKey(id, c.priv) - if err != nil { - return nil, fmt.Errorf("adding private key to peerstore: %w", err) - } - err = pstore.AddPubKey(id, pubK) - if err != nil { - return nil, fmt.Errorf("adding public key to peerstore: %w", err) - } - - addrFactory, err := MakeAddrsFactory(c.AnnounceAddr, c.NoAnnounceAddr) - if err != nil { - return nil, fmt.Errorf("creating address factory: %w", err) - } - - opts := []libp2p.Option{ - libp2p.DefaultTransports, - libp2p.ListenAddrs(c.ListenAddr...), - libp2p.AddrsFactory(addrFactory), - libp2p.Peerstore(pstore), - libp2p.UserAgent("curio-" + build.UserVersion()), - libp2p.Ping(true), - libp2p.DisableRelay(), - libp2p.EnableNATService(), - libp2p.BandwidthReporter(metrics.NewBandwidthCounter()), - } - - h, err := libp2p.New(opts...) - if err != nil { - return nil, xerrors.Errorf("creating libp2p host: %w", err) - } - - // Start listening - err = h.Network().Listen(c.ListenAddr...) - if err != nil { - return nil, xerrors.Errorf("failed to listen on addresses: %w", err) - } - - log.Infof("Libp2p started listening for miner %s", miner.String()) - ret = append(ret, h) - } - - return ret, err - -} - -type libp2pCfg struct { - priv crypto.PrivKey - ListenAddr []multiaddr.Multiaddr - AnnounceAddr []multiaddr.Multiaddr - NoAnnounceAddr []multiaddr.Multiaddr -} - -func getCfg(ctx context.Context, db *harmonydb.DB, miners []string) (map[address.Address]*libp2pCfg, error) { - mm := make(map[int64]address.Address) - var ms []int64 - - for _, miner := range miners { - maddr, err := address.NewFromString(miner) - if err != nil { - return nil, err - } - mid, err := address.IDFromAddress(maddr) - if err != nil { - return nil, err - } - mm[int64(mid)] = maddr - ms = append(ms, int64(mid)) - } - - var cfgs []struct { - SpID int64 `db:"sp_id"` - Key []byte `db:"priv_key"` - ListenAddr string `db:"listen_address"` - AnnounceAddr string `db:"announce_address"` - NoAnnounceAddr string `db:"no_announce_address"` - } - err := db.Select(ctx, &cfgs, `SELECT sp_id, priv_key, listen_address, announce_address FROM libp2p_keys WHERE sp_id = ANY($1)`, ms) - if err != nil { - return nil, xerrors.Errorf("getting libp2p details from DB: %w", err) - } - - if len(cfgs) != len(miners) { - return nil, fmt.Errorf("mismatched number of miners and libp2p configurations") - } - - ret := make(map[address.Address]*libp2pCfg) - - for _, cfg := range cfgs { - p, err := crypto.UnmarshalPrivateKey(cfg.Key) - if err != nil { - return nil, xerrors.Errorf("unmarshaling private key: %w", err) - } - - ret[mm[cfg.SpID]] = &libp2pCfg{ - priv: p, - } - - la := strings.Split(cfg.ListenAddr, ",") - for _, l := range la { - listenAddr, err := multiaddr.NewMultiaddr(l) - if err != nil { - return nil, xerrors.Errorf("parsing listen address: %w", err) - } - ret[mm[cfg.SpID]].ListenAddr = append(ret[mm[cfg.SpID]].ListenAddr, listenAddr) - } - - aa := strings.Split(cfg.AnnounceAddr, ",") - for _, a := range aa { - announceAddr, err := multiaddr.NewMultiaddr(a) - if err != nil { - return nil, xerrors.Errorf("parsing announce address: %w", err) - } - ret[mm[cfg.SpID]].AnnounceAddr = append(ret[mm[cfg.SpID]].AnnounceAddr, announceAddr) - } - - naa := strings.Split(cfg.NoAnnounceAddr, ",") - for _, na := range naa { - noAnnounceAddr, err := multiaddr.NewMultiaddr(na) - if err != nil { - return nil, xerrors.Errorf("parsing no announce address: %w", err) - } - ret[mm[cfg.SpID]].NoAnnounceAddr = append(ret[mm[cfg.SpID]].NoAnnounceAddr, noAnnounceAddr) - } - } - - return ret, nil -} - -func MakeAddrsFactory(announceAddrs, noAnnounce []multiaddr.Multiaddr) (basichost.AddrsFactory, error) { - filters := multiaddr.NewFilters() - noAnnAddrs := map[string]bool{} - for _, addr := range noAnnounce { - f, err := mamask.NewMask(addr.String()) - if err == nil { - filters.AddFilter(*f, multiaddr.ActionDeny) - continue - } - noAnnAddrs[string(addr.Bytes())] = true - } - - return func(allAddrs []multiaddr.Multiaddr) []multiaddr.Multiaddr { - var addrs []multiaddr.Multiaddr - if len(announceAddrs) > 0 { - addrs = announceAddrs - } else { - addrs = allAddrs - } - - var out []multiaddr.Multiaddr - for _, maddr := range addrs { - // check for exact matches - ok := noAnnAddrs[string(maddr.Bytes())] - // check for /ipcidr matches - if !ok && !filters.AddrBlocked(maddr) { - out = append(out, maddr) - } - } - return out - }, nil -} diff --git a/market/mk12/libp2pimpl/libp2pimpl.go b/market/mk12/libp2pimpl/libp2pimpl.go index 7a1d56d5c..0d7911333 100644 --- a/market/mk12/libp2pimpl/libp2pimpl.go +++ b/market/mk12/libp2pimpl/libp2pimpl.go @@ -269,7 +269,7 @@ func (p *DealProvider) getDealStatus(req mk12.DealStatusRequest, reqLog *zap.Sug ret.DealStatus.Status = "Sealed and Indexed" } } - // ANything before PSD is processing + // Anything before PSD is processing st, err := p.getSealedDealStatus(p.ctx, req.DealUUID.String(), false) if err != nil { reqLog.Errorw("failed to get sealed deal status", "err", err) diff --git a/market/storageingest/deal_ingest_seal.go b/market/storageingest/deal_ingest_seal.go index 99bc7e766..7a54c712e 100644 --- a/market/storageingest/deal_ingest_seal.go +++ b/market/storageingest/deal_ingest_seal.go @@ -9,6 +9,7 @@ import ( "net/url" "time" + logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -18,7 +19,6 @@ import ( verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" "github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/network" - logging "github.com/ipfs/go-log/v2" "github.com/filecoin-project/curio/build" "github.com/filecoin-project/curio/deps/config" diff --git a/market/storageingest/deal_ingest_snap.go b/market/storageingest/deal_ingest_snap.go index 445f6d7aa..09d91a3ef 100644 --- a/market/storageingest/deal_ingest_snap.go +++ b/market/storageingest/deal_ingest_snap.go @@ -499,12 +499,12 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, tx *harmonyd var allocated bool var rerr error - head, err := p.api.ChainHead(ctx) + openSectors, err := p.getOpenSectors(tx, p.addToID[maddr]) if err != nil { - return false, api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err) + return false, api.SectorOffset{}, err } - openSectors, err := p.getOpenSectors(tx, p.addToID[maddr]) + head, err := p.api.ChainHead(ctx) if err != nil { return false, api.SectorOffset{}, err } diff --git a/tasks/indexing/task_indexing.go b/tasks/indexing/task_indexing.go index 835b4e90f..eb107ef51 100644 --- a/tasks/indexing/task_indexing.go +++ b/tasks/indexing/task_indexing.go @@ -277,6 +277,9 @@ func (i *IndexingTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.T } func (i *IndexingTask) TypeDetails() harmonytask.TaskTypeDetails { + //dealCfg := i.cfg.Market.StorageMarketConfig + //chanSize := dealCfg.Indexing.InsertConcurrency * dealCfg.Indexing.InsertBatchSize * 56 // (56 = size of each index.Record) + return harmonytask.TaskTypeDetails{ Name: "Indexing", Cost: resources.Resources{ diff --git a/tasks/piece/task_park_piece.go b/tasks/piece/task_park_piece.go index 4fb9caf82..1a89ffcd5 100644 --- a/tasks/piece/task_park_piece.go +++ b/tasks/piece/task_park_piece.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "net/http" + "net/url" "strconv" "time" @@ -46,12 +47,20 @@ func NewParkPieceTask(db *harmonydb.DB, sc *ffi2.SealCalls, max int) (*ParkPiece ctx := context.Background() - // We should delete all incomplete pieces before we start + // We should delete all incomplete pieces that do not have a file URL scheme before we start // as we would have lost reader for these. The RPC caller will get an error // when Curio shuts down before parking a piece. They can always retry. // Leaving these pieces we utilise unnecessary resources in the form of ParkPieceTask - _, err := db.Exec(ctx, `DELETE FROM parked_pieces WHERE complete = FALSE AND task_id IS NULL`) + _, err := db.Exec(ctx, `DELETE FROM parked_pieces pp + WHERE complete = FALSE + AND task_id IS NULL + AND NOT EXISTS ( + SELECT 1 + FROM parked_piece_refs ppr + WHERE ppr.piece_id = pp.id + AND ppr.data_url LIKE 'file:///%' + );`) if err != nil { return nil, xerrors.Errorf("failed to delete incomplete parked pieces: %w", err) } @@ -141,7 +150,8 @@ func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (d err = p.db.Select(ctx, &refData, ` SELECT data_url, data_headers FROM parked_piece_refs - WHERE piece_id = $1 AND data_url IS NOT NULL`, pieceData.PieceID) + WHERE piece_id = $1 AND data_url IS NOT NULL + ORDER BY CASE WHEN data_url LIKE 'file:///%' THEN 0 ELSE 1 END`, pieceData.PieceID) if err != nil { return false, xerrors.Errorf("fetching reference data: %w", err) } @@ -193,8 +203,38 @@ func (p *ParkPieceTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (d } func (p *ParkPieceTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { - id := ids[0] - return &id, nil + var pieces []struct { + Tid int64 `db:"task_id"` + URL string `db:"data_url"` + Host string `db:"host"` + } + err := p.db.Select(context.Background(), &pieces, `SELECT pp.task_id, ppr.host, ppr.data_url + FROM parked_pieces pp + JOIN parked_piece_refs ppr ON pp.id = ppr.piece_id + WHERE pp.task_id = ANY($1) + ORDER BY pp.task_id, + CASE WHEN ppr.data_url LIKE 'file:///%' THEN 0 ELSE 1 END;`, ids) + if err != nil { + return nil, xerrors.Errorf("failed to get pending task details from DB: %w", err) + } + + for _, p := range pieces { + ret := harmonytask.TaskID(int(p.Tid)) + goUrl, err := url.Parse(p.URL) + if err != nil { + return nil, xerrors.Errorf("failed to parse the URL: %w", err) + } + if goUrl.Scheme == "file" { + if p.Host == engine.Host() { + + return &ret, nil + } + continue + } + return &ret, nil + } + + return nil, xerrors.Errorf("Host %s is not sutiable for pending PiecePark tasks", engine.Host()) } func (p *ParkPieceTask) TypeDetails() harmonytask.TaskTypeDetails { diff --git a/tasks/storage-market/task_psd.go b/tasks/storage-market/task_psd.go index 21cfa95fc..0ad56a177 100644 --- a/tasks/storage-market/task_psd.go +++ b/tasks/storage-market/task_psd.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" - "github.com/filecoin-project/curio/harmony/taskhelp" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" @@ -20,6 +19,7 @@ import ( "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" "github.com/filecoin-project/curio/harmony/resources" + "github.com/filecoin-project/curio/harmony/taskhelp" "github.com/filecoin-project/curio/lib/multictladdr" "github.com/filecoin-project/curio/lib/promise" "github.com/filecoin-project/curio/tasks/message" From 8f94e1d8f5bb9436dc213a78196f37afc69002d2 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Thu, 15 Aug 2024 17:19:57 +0400 Subject: [PATCH 04/15] remove indexing table, add defaults --- .circleci/config.yml | 5 +- .../sql/20240730-market-migration.sql | 188 ++++++++++-------- market/storageingest/deal_ingest_seal.go | 2 + market/storageingest/deal_ingest_snap.go | 2 + tasks/indexing/task_indexing.go | 4 +- tasks/storage-market/storage_market.go | 2 +- 6 files changed, 112 insertions(+), 91 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 16235ffd3..83feedda5 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -144,7 +144,7 @@ jobs: description: Flags passed to go test. target: type: string - default: "./..." + default: "./... | grep -v itests" description: Import paths of packages to be tested. proofs-log-test: type: string @@ -287,6 +287,5 @@ workflows: name: test-idxStore requires: - build - suite: idxStore - target: "./lib/indexing/indexstore/indexstore_test.go" + suite: test-all get-params: true diff --git a/harmony/harmonydb/sql/20240730-market-migration.sql b/harmony/harmonydb/sql/20240730-market-migration.sql index be06f70cf..569f4884f 100644 --- a/harmony/harmonydb/sql/20240730-market-migration.sql +++ b/harmony/harmonydb/sql/20240730-market-migration.sql @@ -1,14 +1,9 @@ -- Table for Mk12 or Boost deals CREATE TABLE market_mk12_deals ( uuid TEXT NOT NULL, -<<<<<<< HEAD created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP AT TIME ZONE 'UTC', -======= sp_id BIGINT NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()), - ->>>>>>> 48e953d (poller redesign) signed_proposal_cid TEXT NOT NULL, proposal_signature BYTEA NOT NULL, proposal jsonb NOT NULL, @@ -41,33 +36,13 @@ CREATE TABLE market_mk12_deals ( unique (signed_proposal_cid) ); --- Table for old lotus market deals. This is just for deal --- which are still alive. It should not be used for any processing -CREATE TABLE market_legacy_deals ( - signed_proposal_cid TEXT, - proposal_signature BYTEA, - proposal jsonb, - piece_cid TEXT, - piece_size BIGINT, - offline BOOLEAN, - verified BOOLEAN, - sp_id BIGINT, - start_epoch BIGINT, - end_epoch BIGINT, - publish_cid TEXT, - fast_retrieval BOOLEAN, - chain_deal_id BIGINT, - created_at TIMESTAMPTZ, - sector_num BIGINT, - - primary key (sp_id, piece_cid, signed_proposal_cid) -); - -- This table is used for storing piece metadata (piece indexing) CREATE TABLE market_piece_metadata ( piece_cid TEXT NOT NULL PRIMARY KEY, + version INT NOT NULL DEFAULT 2, created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP AT TIME ZONE 'UTC', + indexed BOOLEAN NOT NULL DEFAULT FALSE, indexed_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP AT TIME ZONE 'UTC', @@ -157,13 +132,9 @@ CREATE TABLE market_mk12_deal_pipeline ( started BOOLEAN DEFAULT FALSE, piece_cid TEXT NOT NULL, -<<<<<<< HEAD piece_size BOOLEAN NOT NULL, -======= - piece_size BIGINT NOT NULL, - file_size BIGINT DEFAULT NULL, -- raw piece size + raw_size BIGINT DEFAULT NULL, ->>>>>>> 48e953d (poller redesign) offline BOOLEAN NOT NULL, url TEXT DEFAULT NULL, @@ -181,52 +152,94 @@ CREATE TABLE market_mk12_deal_pipeline ( after_find_deal BOOLEAN DEFAULT FALSE, sector BIGINT, + reg_seal_proof INT NOT NULL, sector_offset BIGINT, sealed BOOLEAN DEFAULT FALSE, + + should_index BOOLEAN DEFAULT FALSE, + indexing_created_at TIMESTAMPTZ, + indexing_task_id BIGINT DEFAULT NULL, indexed BOOLEAN DEFAULT FALSE, + complete BOOLEAN NOT NULL DEFAULT FALSE, + constraint market_mk12_deal_pipeline_identity_key unique (uuid) ); +-- This function creates indexing task based from move_storage tasks +CREATE OR REPLACE FUNCTION create_indexing_task(task_id BIGINT, sealing_table TEXT) +RETURNS VOID AS $$ +DECLARE +query TEXT; -- Holds the dynamic SQL query + pms RECORD; -- Holds each row returned by the query in the loop +BEGIN + -- Construct the dynamic SQL query based on the sealing_table + IF sealing_table = 'sectors_sdr_pipeline' THEN + query := format( + 'SELECT + dp.uuid, + ssp.reg_seal_proof + FROM + %I ssp + JOIN + market_mk12_deal_pipeline dp ON ssp.sp_id = dp.sp_id AND ssp.sector_num = dp.sector + WHERE + ssp.task_id_move_storage = $1', sealing_table); + ELSIF sealing_table = 'sectors_snap_pipeline' THEN + query := format( + 'SELECT + dp.uuid, + (SELECT reg_seal_proof FROM sectors_meta WHERE sp_id = ssp.sp_id AND sector_num = ssp.sector_num) AS reg_seal_proof + FROM + %I ssp + JOIN + market_mk12_deal_pipeline dp ON ssp.sp_id = dp.sp_id AND ssp.sector_num = dp.sector + WHERE + ssp.task_id_move_storage = $1', sealing_table); +ELSE + RAISE EXCEPTION 'Invalid sealing_table name: %', sealing_table; +END IF; + + -- Execute the dynamic SQL query with the task_id parameter +FOR pms IN EXECUTE query USING task_id + LOOP + -- Update the market_mk12_deal_pipeline table with the reg_seal_proof and indexing_created_at values +UPDATE market_mk12_deal_pipeline +SET + reg_seal_proof = pms.reg_seal_proof, + indexing_created_at = NOW() AT TIME ZONE 'UTC' +WHERE + uuid = pms.uuid; +END LOOP; + + -- If everything is successful, simply exit + RETURN; + +EXCEPTION + WHEN OTHERS THEN + -- Rollback the transaction and raise the exception for Go to catch + ROLLBACK; + RAISE EXCEPTION 'Failed to create indexing task: %', SQLERRM; +END; +$$ LANGUAGE plpgsql; + -- This table can be used to track remote piece for offline deals -- The entries must be created by users CREATE TABLE market_offline_urls ( - piece_cid TEXT NOT NULL, + uuid TEXT NOT NULL, - url TEXT NOT NULL, - headers jsonb NOT NULL DEFAULT '{}', + url TEXT NOT NULL, + headers jsonb NOT NULL DEFAULT '{}', - raw_size BIGINT NOT NULL, + raw_size BIGINT NOT NULL, - unique (piece_cid) + CONSTRAINT market_offline_urls_uuid_fk FOREIGN KEY (uuid) + REFERENCES market_mk12_deal_pipeline (uuid) + ON DELETE CASCADE, + CONSTRAINT market_offline_urls_uuid_unique UNIQUE (uuid) ); --- indexing tracker is separate from -CREATE TABLE market_indexing_tasks ( - uuid TEXT NOT NULL, - - sp_id BIGINT NOT NULL, - sector_number BIGINT NOT NULL, - reg_seal_proof INT NOT NULL, - - piece_offset BIGINT NOT NULL, - piece_size BIGINT NOT NULL, - raw_size BIGINT NOT NULL, - piece_cid TEXT NOT NULL, -<<<<<<< HEAD - created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP AT TIME ZONE 'UTC', -======= - - created_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()), - ->>>>>>> 48e953d (poller redesign) - task_id BIGINT DEFAULT NULL, - - constraint market_indexing_tasks_identity_key - unique (id, sp_id, sector_number, piece_offset, piece_size, piece_cid, reg_seal_proof) -) - CREATE TABLE libp2p_keys ( sp_id BIGINT NOT NULL, priv_key BYTEA NOT NULL, @@ -255,33 +268,38 @@ CREATE TABLE direct_deals ( keep_unsealed_copy BOOLEAN ); +-- Add host column to allow local file based +-- piece park ALTER TABLE parked_piece_refs ADD COLUMN host text; -create table file_parked_pieces ( - id bigserial primary key, - created_at timestamp default current_timestamp, - piece_cid text not null, - piece_padded_size bigint not null, - piece_raw_size bigint not null, - complete boolean not null default false, - task_id bigint default null, - cleanup_task_id bigint default null, - unique (piece_cid) -); +-- Table for old lotus market deals. This is just for deal +-- which are still alive. It should not be used for any processing +CREATE TABLE market_legacy_deals ( + signed_proposal_cid TEXT, + sp_id BIGINT, -/* - * This table is used to keep track of the references to the file parked pieces - * so that we can delete them when they are no longer needed. - * - * All references into the file_parked_pieces table should be done through this table. - */ -create table file_parked_piece_refs ( - ref_id bigserial primary key, - piece_id bigint not null, - data_url text not null, - node text not null, - foreign key (piece_id) references file_parked_pieces(id) on delete cascade + proposal_signature BYTEA, + proposal jsonb, + + piece_cid TEXT, + piece_size BIGINT, + + offline BOOLEAN, + verified BOOLEAN, + + start_epoch BIGINT, + end_epoch BIGINT, + + publish_cid TEXT, + chain_deal_id BIGINT, + + fast_retrieval BOOLEAN, + + created_at TIMESTAMPTZ, + sector_num BIGINT, + + primary key (sp_id, piece_cid, signed_proposal_cid) ); diff --git a/market/storageingest/deal_ingest_seal.go b/market/storageingest/deal_ingest_seal.go index 7a54c712e..8092902fb 100644 --- a/market/storageingest/deal_ingest_seal.go +++ b/market/storageingest/deal_ingest_seal.go @@ -9,6 +9,8 @@ import ( "net/url" "time" + "github.com/filecoin-project/curio/build" + "github.com/filecoin-project/curio/deps/config" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" diff --git a/market/storageingest/deal_ingest_snap.go b/market/storageingest/deal_ingest_snap.go index 09d91a3ef..ad2dd5efa 100644 --- a/market/storageingest/deal_ingest_snap.go +++ b/market/storageingest/deal_ingest_snap.go @@ -9,6 +9,8 @@ import ( "net/url" "time" + "github.com/filecoin-project/curio/build" + "github.com/filecoin-project/curio/deps/config" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" diff --git a/tasks/indexing/task_indexing.go b/tasks/indexing/task_indexing.go index eb107ef51..a505f98c2 100644 --- a/tasks/indexing/task_indexing.go +++ b/tasks/indexing/task_indexing.go @@ -102,8 +102,8 @@ func (i *IndexingTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do return false, xerrors.Errorf("checking if piece is already indexed: %w", err) } - // Return early if already indexed - if indexed { + // Return early if already indexed or should not be indexed + if indexed || !task.ShouldIndex { err = i.recordCompletion(ctx, task, taskID, false) if err != nil { return false, err diff --git a/tasks/storage-market/storage_market.go b/tasks/storage-market/storage_market.go index cf0e71229..21a137a9a 100644 --- a/tasks/storage-market/storage_market.go +++ b/tasks/storage-market/storage_market.go @@ -407,7 +407,7 @@ func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, dea if err != nil { return false, xerrors.Errorf("error making GET request: %w", err) } - + // Check the response code for 404 if resp.StatusCode != http.StatusOK { if resp.StatusCode != 404 { From 6a83233b4b57b7b294f792b42a86f8195e5ab5a8 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Thu, 15 Aug 2024 18:07:20 +0400 Subject: [PATCH 05/15] fix ingest, market tasks --- Makefile | 11 ++++++----- lib/paths/mocks/index.go | 2 ++ market/storageingest/deal_ingest_seal.go | 2 -- market/storageingest/deal_ingest_snap.go | 2 -- tasks/storage-market/storage_market.go | 2 +- 5 files changed, 9 insertions(+), 10 deletions(-) diff --git a/Makefile b/Makefile index e84088a1f..1b8b4a4c1 100644 --- a/Makefile +++ b/Makefile @@ -66,10 +66,11 @@ build/.update-modules: # end git modules ## CUDA Library Path -CUDA_PATH := $(shell dirname $$(dirname $$(which nvcc))) -CUDA_LIB_PATH := $(CUDA_PATH)/lib64 -LIBRARY_PATH ?= $(CUDA_LIB_PATH) -export LIBRARY_PATH +setup_cuda: + $(eval CUDA_PATH := $(shell dirname $$(dirname $$(which nvcc)))) + $(eval CUDA_LIB_PATH := $(CUDA_PATH)/lib64) + export LIBRARY_PATH=$(CUDA_LIB_PATH) +.PHONY: setup_cuda ## MAIN BINARIES @@ -97,7 +98,7 @@ BINS+=sptool ifeq ($(shell uname),Linux) -batchdep: build/.supraseal-install +batchdep: setup_cuda build/.supraseal-install batchdep: $(BUILD_DEPS) .PHONY: batchdep diff --git a/lib/paths/mocks/index.go b/lib/paths/mocks/index.go index 347051491..219bba06f 100644 --- a/lib/paths/mocks/index.go +++ b/lib/paths/mocks/index.go @@ -8,6 +8,8 @@ import ( context "context" reflect "reflect" + gomock "github.com/golang/mock/gomock" + abi "github.com/filecoin-project/go-state-types/abi" paths "github.com/filecoin-project/curio/lib/paths" diff --git a/market/storageingest/deal_ingest_seal.go b/market/storageingest/deal_ingest_seal.go index 8092902fb..7a54c712e 100644 --- a/market/storageingest/deal_ingest_seal.go +++ b/market/storageingest/deal_ingest_seal.go @@ -9,8 +9,6 @@ import ( "net/url" "time" - "github.com/filecoin-project/curio/build" - "github.com/filecoin-project/curio/deps/config" logging "github.com/ipfs/go-log/v2" "golang.org/x/xerrors" diff --git a/market/storageingest/deal_ingest_snap.go b/market/storageingest/deal_ingest_snap.go index ad2dd5efa..09d91a3ef 100644 --- a/market/storageingest/deal_ingest_snap.go +++ b/market/storageingest/deal_ingest_snap.go @@ -9,8 +9,6 @@ import ( "net/url" "time" - "github.com/filecoin-project/curio/build" - "github.com/filecoin-project/curio/deps/config" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" diff --git a/tasks/storage-market/storage_market.go b/tasks/storage-market/storage_market.go index 21a137a9a..cf0e71229 100644 --- a/tasks/storage-market/storage_market.go +++ b/tasks/storage-market/storage_market.go @@ -407,7 +407,7 @@ func (d *CurioStorageDealMarket) findURLForOfflineDeals(ctx context.Context, dea if err != nil { return false, xerrors.Errorf("error making GET request: %w", err) } - + // Check the response code for 404 if resp.StatusCode != http.StatusOK { if resp.StatusCode != 404 { From d2b3800cb579206426a5930ce4821256f729cab6 Mon Sep 17 00:00:00 2001 From: LexLuthr Date: Mon, 19 Aug 2024 16:20:24 +0400 Subject: [PATCH 06/15] incomplete basic UI code --- Makefile | 2 +- cmd/curio/rpc/rpc.go | 3 ++ cmd/curio/run.go | 1 + .../sql/20240730-market-migration.sql | 40 +++++++++++-------- lib/paths/mocks/index.go | 2 - web/static/ux/curio-ux.mjs | 1 - 6 files changed, 29 insertions(+), 20 deletions(-) diff --git a/Makefile b/Makefile index 1b8b4a4c1..e5aa1217f 100644 --- a/Makefile +++ b/Makefile @@ -134,7 +134,7 @@ debug: build all: build .PHONY: all -build: curio sptool +build: curio @[[ $$(type -P "curio") ]] && echo "Caution: you have \ an existing curio binary in your PATH. This may cause problems if you don't run 'sudo make install'" || true diff --git a/cmd/curio/rpc/rpc.go b/cmd/curio/rpc/rpc.go index 11d97050a..9058143fe 100644 --- a/cmd/curio/rpc/rpc.go +++ b/cmd/curio/rpc/rpc.go @@ -250,6 +250,7 @@ func (p *CurioAPI) LogSetLevel(ctx context.Context, subsystem, level string) err } func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan chan struct{}) error { + log.Errorf("ENTERED RPC SERVER") fh := &paths.FetchHandler{Local: dependencies.LocalStore, PfHandler: &paths.DefaultPartialFileHandler{}} remoteHandler := func(w http.ResponseWriter, r *http.Request) { if !auth.HasPerm(r.Context(), nil, lapi.PermAdmin) { @@ -261,6 +262,8 @@ func ListenAndServe(ctx context.Context, dependencies *deps.Deps, shutdownChan c fh.ServeHTTP(w, r) } + log.Errorf("CREATED STORAGE HANDLER") + var authVerify func(context.Context, string) ([]auth.Permission, error) { privateKey, err := base64.StdEncoding.DecodeString(dependencies.Cfg.Apis.StorageRPCSecret) diff --git a/cmd/curio/run.go b/cmd/curio/run.go index dedc78d3c..e8a221734 100644 --- a/cmd/curio/run.go +++ b/cmd/curio/run.go @@ -130,6 +130,7 @@ var runCmd = &cli.Command{ } defer taskEngine.GracefullyTerminate() + log.Infof("WILL START RPC SERVER NOW") err = rpc.ListenAndServe(ctx, dependencies, shutdownChan) // Monitor for shutdown. if err != nil { return err diff --git a/harmony/harmonydb/sql/20240730-market-migration.sql b/harmony/harmonydb/sql/20240730-market-migration.sql index 569f4884f..ce8fb6564 100644 --- a/harmony/harmonydb/sql/20240730-market-migration.sql +++ b/harmony/harmonydb/sql/20240730-market-migration.sql @@ -21,7 +21,6 @@ CREATE TABLE market_mk12_deals ( piece_cid TEXT NOT NULL, piece_size BIGINT NOT NULL, - length BIGINT DEFAULT NULL, fast_retrieval BOOLEAN NOT NULL, announce_to_ipni BOOLEAN NOT NULL, @@ -41,7 +40,12 @@ CREATE TABLE market_piece_metadata ( piece_cid TEXT NOT NULL PRIMARY KEY, version INT NOT NULL DEFAULT 2, +<<<<<<< HEAD created_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP AT TIME ZONE 'UTC', +======= + + created_at TIMESTAMPTZ NOT NULL DEFAULT TIMEZONE('UTC', NOW()), +>>>>>>> d1327dc (incomplete basic UI code) indexed BOOLEAN NOT NULL DEFAULT FALSE, indexed_at TIMESTAMPTZ NOT NULL DEFAULT CURRENT_TIMESTAMP AT TIME ZONE 'UTC', @@ -276,28 +280,32 @@ ALTER TABLE parked_piece_refs -- Table for old lotus market deals. This is just for deal -- which are still alive. It should not be used for any processing CREATE TABLE market_legacy_deals ( - signed_proposal_cid TEXT, - sp_id BIGINT, + signed_proposal_cid TEXT NOT NULL, + sp_id BIGINT NOT NULL, + client_peer_id TEXT NOT NULL, - proposal_signature BYTEA, - proposal jsonb, + proposal_signature BYTEA NOT NULL, + proposal jsonb NOT NULL, - piece_cid TEXT, - piece_size BIGINT, + piece_cid TEXT NOT NULL, + piece_size BIGINT NOT NULL, - offline BOOLEAN, - verified BOOLEAN, + offline BOOLEAN NOT NULL, + verified BOOLEAN NOT NULL, - start_epoch BIGINT, - end_epoch BIGINT, + start_epoch BIGINT NOT NULL, + end_epoch BIGINT NOT NULL, - publish_cid TEXT, - chain_deal_id BIGINT, + publish_cid TEXT NOT NULL, + chain_deal_id BIGINT NOT NULL, - fast_retrieval BOOLEAN, + piece_cid TEXT NOT NULL, + piece_size BIGINT NOT NULL, - created_at TIMESTAMPTZ, - sector_num BIGINT, + fast_retrieval BOOLEAN NOT NULL, + + created_at TIMESTAMPTZ NOT NULL, + sector_num BIGINT NOT NULL, primary key (sp_id, piece_cid, signed_proposal_cid) ); diff --git a/lib/paths/mocks/index.go b/lib/paths/mocks/index.go index 219bba06f..082d22908 100644 --- a/lib/paths/mocks/index.go +++ b/lib/paths/mocks/index.go @@ -16,8 +16,6 @@ import ( storiface "github.com/filecoin-project/curio/lib/storiface" fsutil "github.com/filecoin-project/lotus/storage/sealer/fsutil" - - gomock "github.com/golang/mock/gomock" ) // MockSectorIndex is a mock of SectorIndex interface. diff --git a/web/static/ux/curio-ux.mjs b/web/static/ux/curio-ux.mjs index 673545428..fb7a4d82f 100644 --- a/web/static/ux/curio-ux.mjs +++ b/web/static/ux/curio-ux.mjs @@ -164,7 +164,6 @@ class CurioUX extends LitElement { Curio -<<<<<<< HEAD