From 659741ff159973ccdbcf1dd9074f98a6adf00a91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 28 Nov 2024 16:13:49 +0100 Subject: [PATCH] wip: feat: paramseed - cluster-p2p paramfetch --- .../sql/20241128-paramfetch-reseed.sql | 7 + lib/fastparamfetch/paramfetch.go | 122 ++++++++++++----- lib/fastparamfetch/paramserve.go | 124 ++++++++++++++++++ 3 files changed, 221 insertions(+), 32 deletions(-) create mode 100644 harmony/harmonydb/sql/20241128-paramfetch-reseed.sql create mode 100644 lib/fastparamfetch/paramserve.go diff --git a/harmony/harmonydb/sql/20241128-paramfetch-reseed.sql b/harmony/harmonydb/sql/20241128-paramfetch-reseed.sql new file mode 100644 index 000000000..5db43d5c6 --- /dev/null +++ b/harmony/harmonydb/sql/20241128-paramfetch-reseed.sql @@ -0,0 +1,7 @@ +CREATE TABLE paramfetch_urls ( + machine INT, + cid TEXT, + + PRIMARY KEY (machine, cid), + FOREIGN KEY (machine) REFERENCES harmony_machines (id) ON DELETE CASCADE +); diff --git a/lib/fastparamfetch/paramfetch.go b/lib/fastparamfetch/paramfetch.go index 46e7290a3..644c9ce19 100644 --- a/lib/fastparamfetch/paramfetch.go +++ b/lib/fastparamfetch/paramfetch.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "encoding/json" "errors" + "fmt" "io" "net/http" "net/url" @@ -16,6 +17,7 @@ import ( "sync" "time" + "github.com/ipfs/go-cid" fslock "github.com/ipfs/go-fs-lock" logging "github.com/ipfs/go-log/v2" "github.com/minio/blake2b-simd" @@ -25,7 +27,6 @@ import ( var log = logging.Logger("paramfetch") -// const gateway = "http://198.211.99.118/ipfs/" const gateway = "https://proofs.filecoin.io/ipfs/" const paramdir = "/var/tmp/filecoin-proof-parameters" const dirEnv = "FIL_PROOFS_PARAMETER_CACHE" @@ -49,6 +50,8 @@ type fetch struct { fsLockRelease func() fsLockOnce sync.Once lockFail bool // true if we failed to acquire the lock at least once, meaning that is was claimed by another process + + ps *ParamServe } func getParamDir() string { @@ -58,7 +61,7 @@ func getParamDir() string { return os.Getenv(dirEnv) } -func GetParams(ctx context.Context, paramBytes []byte, srsBytes []byte, storageSize uint64) error { +func GetParams(ctx context.Context, ps *ParamServe, paramBytes []byte, srsBytes []byte, storageSize uint64) error { if err := os.Mkdir(getParamDir(), 0755); err != nil && !os.IsExist(err) { return err } @@ -69,7 +72,9 @@ func GetParams(ctx context.Context, paramBytes []byte, srsBytes []byte, storageS return err } - ft := &fetch{} + ft := &fetch{ + ps: ps, + } defer func() { if ft.fsLockRelease != nil { @@ -152,7 +157,7 @@ func (ft *fetch) maybeFetchAsync(ctx context.Context, name string, info paramFil return } - if err := doFetch(ctx, path, info); err != nil { + if err := ft.doFetch(ctx, path, info); err != nil { ft.errs = append(ft.errs, xerrors.Errorf("fetching file %s failed: %w", path, err)) return } @@ -166,7 +171,7 @@ func (ft *fetch) maybeFetchAsync(ctx context.Context, name string, info paramFil return } - if err := doFetch(ctx, path, info); err != nil { + if err := ft.doFetch(ctx, path, info); err != nil { ft.errs = append(ft.errs, xerrors.Errorf("fetching file %s failed: %w", path, err)) return } @@ -226,6 +231,16 @@ func (ft *fetch) checkFile(path string, info paramFile) error { checked[path] = struct{}{} checkedLk.Unlock() + // Call ps.allowCid + if ft.ps != nil { + c, err := cid.Parse(info.Cid) + if err != nil { + log.Errorf("Invalid CID %s: %v", info.Cid, err) + } else { + ft.ps.allowCid(context.Background(), c, path) + } + } + return nil } @@ -250,26 +265,78 @@ func (ft *fetch) wait(ctx context.Context) error { return multierr.Combine(ft.errs...) } -func doFetch(ctx context.Context, out string, info paramFile) error { +func (ft *fetch) doFetch(ctx context.Context, out string, info paramFile) error { + c, err := cid.Parse(info.Cid) + if err != nil { + return err + } + + var urls []string + + if ft.ps != nil { + // Get URLs from paramserve + u, err := ft.ps.urlsForCid(ctx, c) + if err != nil { + log.Warnf("Failed to get URLs for CID %s: %v", c.String(), err) + } else { + for _, hostAndPort := range u { + // Build URL + urlStr := fmt.Sprintf("http://%s/params/ipfs/%s", hostAndPort, info.Cid) + urls = append(urls, urlStr) + } + } + } + + // Append the default gateway at the end gw := os.Getenv("IPFS_GATEWAY") if gw == "" { gw = gateway } - log.Infof("Fetching %s from %s", out, gw) + urls = append(urls, gw+info.Cid) + + for _, urlStr := range urls { + log.Infof("Fetching %s from %s", out, urlStr) + u, err := url.Parse(urlStr) + if err != nil { + log.Warnf("Invalid URL %s: %v", urlStr, err) + continue + } + // Try aria2c first + if err := fetchWithAria2c(ctx, out, u.String()); err == nil { + return nil + } else { + log.Warnf("aria2c fetch failed: %s", err) + } - url, err := url.Parse(gw + info.Cid) + // Try HTTP client + if err := fetchWithHTTPClient(ctx, out, u); err == nil { + return nil + } else { + log.Warnf("HTTP fetch failed: %s", err) + } + } + + return xerrors.Errorf("failed to fetch %s from any source", info.Cid) +} + +func fetchWithAria2c(ctx context.Context, out, url string) error { + aria2cPath, err := exec.LookPath("aria2c") if err != nil { - return err + return xerrors.New("aria2c not found in PATH") } - log.Infof("GET %s", url) - // Try aria2c first - if err := fetchWithAria2c(ctx, out, url.String()); err == nil { - return nil - } else { - log.Warnf("aria2c fetch failed: %s", err) + cmd := exec.CommandContext(ctx, aria2cPath, "--continue", "-x16", "-s16", "--dir", filepath.Dir(out), "-o", filepath.Base(out), url) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + return err } + return nil +} + +func fetchWithHTTPClient(ctx context.Context, out string, u *url.URL) error { outf, err := os.OpenFile(out, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { return err @@ -283,7 +350,7 @@ func doFetch(ctx context.Context, out string, info paramFile) error { header := http.Header{} header.Set("Range", "bytes="+strconv.FormatInt(fStat.Size(), 10)+"-") - req, err := http.NewRequestWithContext(ctx, "GET", url.String(), nil) + req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil) if err != nil { return err } @@ -296,24 +363,15 @@ func doFetch(ctx context.Context, out string, info paramFile) error { } defer resp.Body.Close() - _, err = io.Copy(outf, resp.Body) - - return err -} - -func fetchWithAria2c(ctx context.Context, out, url string) error { - aria2cPath, err := exec.LookPath("aria2c") - if err != nil { - return xerrors.New("aria2c not found in PATH") + if resp.StatusCode == http.StatusNotFound { + return xerrors.New("file not found on server") } - cmd := exec.CommandContext(ctx, aria2cPath, "--continue", "-x16", "-s16", "--dir", filepath.Dir(out), "-o", filepath.Base(out), url) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - if err := cmd.Run(); err != nil { - return err + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusPartialContent { + return xerrors.Errorf("unexpected HTTP status: %s", resp.Status) } - return nil + _, err = io.Copy(outf, resp.Body) + + return err } diff --git a/lib/fastparamfetch/paramserve.go b/lib/fastparamfetch/paramserve.go new file mode 100644 index 000000000..7c0243181 --- /dev/null +++ b/lib/fastparamfetch/paramserve.go @@ -0,0 +1,124 @@ +package fastparamfetch + +import ( + "context" + "fmt" + "net/http" + "sync" + + "github.com/filecoin-project/curio/deps" + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/gorilla/mux" + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" +) + +type ParamServe struct { + db *harmonydb.DB + + lk sync.Mutex + allow map[string]bool // file CIDs + + cidToFile map[string]string // mapping from CID string to file path + + machineID int +} + +func NewParamServe(db *harmonydb.DB, machineID int) *ParamServe { + return &ParamServe{ + db: db, + allow: make(map[string]bool), + cidToFile: make(map[string]string), + machineID: machineID, + } +} + +func (ps *ParamServe) allowCid(ctx context.Context, c cid.Cid, path string) { + ps.lk.Lock() + defer ps.lk.Unlock() + + ps.allow[c.String()] = true + ps.cidToFile[c.String()] = path + + // Insert into the database that this machine has this CID + err := ps.insertCidForMachine(ctx, c.String()) + if err != nil { + log.Errorf("Failed to insert CID %s for machine: %v", c.String(), err) + } +} + +func (ps *ParamServe) insertCidForMachine(ctx context.Context, cidStr string) error { + // Insert into paramfetch_urls (machine, cid) + _, err := ps.db.Exec(ctx, `INSERT INTO paramfetch_urls (machine, cid) VALUES ($1, $2) ON CONFLICT DO NOTHING`, ps.machineID, cidStr) + return err +} + +func (ps *ParamServe) urlsForCid(ctx context.Context, c cid.Cid) ([]string, error) { + rows, err := ps.db.Query(ctx, `SELECT harmony_machines.host_and_port FROM harmony_machines + JOIN paramfetch_urls ON harmony_machines.id = paramfetch_urls.machine + WHERE paramfetch_urls.cid = $1`, c.String()) + if err != nil { + return nil, err + } + defer rows.Close() + + var urls []string + for rows.Next() { + var hostAndPort string + if err := rows.Scan(&hostAndPort); err != nil { + return nil, err + } + urls = append(urls, hostAndPort) + } + if err := rows.Err(); err != nil { + return nil, err + } + return urls, nil +} + +func (ps *ParamServe) getFilePathForCid(c cid.Cid) (string, error) { + ps.lk.Lock() + defer ps.lk.Unlock() + + filePath, ok := ps.cidToFile[c.String()] + if !ok { + return "", xerrors.Errorf("file path for CID %s not found", c.String()) + } + return filePath, nil +} + +func (ps *ParamServe) ServeHTTP(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + cidStr := vars["cid"] + if cidStr == "" { + http.Error(w, "CID not specified", http.StatusBadRequest) + return + } + + // Parse the CID + c, err := cid.Parse(cidStr) + if err != nil { + http.Error(w, "Invalid CID", http.StatusBadRequest) + return + } + + ps.lk.Lock() + allowed := ps.allow[c.String()] + ps.lk.Unlock() + if !allowed { + http.Error(w, "CID not allowed", http.StatusNotFound) + return + } + + filePath, err := ps.getFilePathForCid(c) + if err != nil { + http.Error(w, "File not found", http.StatusNotFound) + return + } + + http.ServeFile(w, r, filePath) +} + +func Routes(r *mux.Router, deps *deps.Deps, serve *ParamServe) { + r.Methods("GET", "HEAD").Path("/params/ipfs/{cid}").HandlerFunc(serve.ServeHTTP) +}