Skip to content

Commit

Permalink
Merge pull request #3226 from filecoin-project/fix/mpool-state-incons…
Browse files Browse the repository at this point in the history
…istencies

Address potential mpool state inconsistencies
  • Loading branch information
arajasek authored Aug 21, 2020
2 parents 5c315b4 + 817358f commit 9e2f56b
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 9 deletions.
3 changes: 3 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ type FullNode interface {
MpoolGetNonce(context.Context, address.Address) (uint64, error)
MpoolSub(context.Context) (<-chan MpoolUpdate, error)

// MpoolClear clears pending messages from the mpool
MpoolClear(context.Context, bool) error

// MpoolGetConfig returns (a copy of) the current mpool config
MpoolGetConfig(context.Context) (*types.MpoolConfig, error)
// MpoolSetConfig sets the mpool config to (a copy of) the supplied config
Expand Down
8 changes: 7 additions & 1 deletion api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ type FullNodeStruct struct {

MpoolSelect func(context.Context, types.TipSetKey, float64) ([]*types.SignedMessage, error) `perm:"read"`

MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
MpoolPending func(context.Context, types.TipSetKey) ([]*types.SignedMessage, error) `perm:"read"`
MpoolClear func(context.Context, bool) error `perm:"write"`

MpoolPush func(context.Context, *types.SignedMessage) (cid.Cid, error) `perm:"write"`
MpoolPushMessage func(context.Context, *types.Message, *api.MessageSendSpec) (*types.SignedMessage, error) `perm:"sign"`
MpoolGetNonce func(context.Context, address.Address) (uint64, error) `perm:"read"`
Expand Down Expand Up @@ -494,6 +496,10 @@ func (c *FullNodeStruct) MpoolPending(ctx context.Context, tsk types.TipSetKey)
return c.Internal.MpoolPending(ctx, tsk)
}

func (c *FullNodeStruct) MpoolClear(ctx context.Context, local bool) error {
return c.Internal.MpoolClear(ctx, local)
}

func (c *FullNodeStruct) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
return c.Internal.MpoolPush(ctx, smsg)
}
Expand Down
64 changes: 56 additions & 8 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/filecoin-project/specs-actors/actors/abi"
"github.com/filecoin-project/specs-actors/actors/crypto"
"github.com/hashicorp/go-multierror"
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand Down Expand Up @@ -744,30 +745,42 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
}
}

var merr error

for _, ts := range revert {
pts, err := mp.api.LoadTipSet(ts.Parents())
if err != nil {
return err
log.Errorf("error loading reverted tipset parent: %s", err)
merr = multierror.Append(merr, err)
continue
}

mp.curTs = pts

msgs, err := mp.MessagesForBlocks(ts.Blocks())
if err != nil {
return err
log.Errorf("error retrieving messages for reverted block: %s", err)
merr = multierror.Append(merr, err)
continue
}

mp.curTs = pts

for _, msg := range msgs {
add(msg)
}
}

for _, ts := range apply {
mp.curTs = ts

for _, b := range ts.Blocks() {
bmsgs, smsgs, err := mp.api.MessagesForBlock(b)
if err != nil {
return xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
xerr := xerrors.Errorf("failed to get messages for apply block %s(height %d) (msgroot = %s): %w", b.Cid(), b.Height, b.Messages, err)
log.Errorf("error retrieving messages for block: %s", xerr)
merr = multierror.Append(merr, xerr)
continue
}

for _, msg := range smsgs {
rm(msg.Message.From, msg.Message.Nonce)
maybeRepub(msg.Cid())
Expand All @@ -778,8 +791,6 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
maybeRepub(msg.Cid())
}
}

mp.curTs = ts
}

if repubTrigger {
Expand Down Expand Up @@ -862,7 +873,7 @@ func (mp *MessagePool) HeadChange(revert []*types.TipSet, apply []*types.TipSet)
}
}

return nil
return merr
}

type statBucket struct {
Expand Down Expand Up @@ -962,3 +973,40 @@ func (mp *MessagePool) loadLocal() error {

return nil
}

func (mp *MessagePool) Clear(local bool) {
mp.lk.Lock()
defer mp.lk.Unlock()

// remove everything if local is true, including removing local messages from
// the datastore
if local {
for a := range mp.localAddrs {
mset, ok := mp.pending[a]
if !ok {
continue
}

for _, m := range mset.msgs {
err := mp.localMsgs.Delete(datastore.NewKey(string(m.Cid().Bytes())))
if err != nil {
log.Warnf("error deleting local message: %s", err)
}
}
}

mp.pending = make(map[address.Address]*msgSet)
mp.republished = nil

return
}

// remove everything except the local messages
for a := range mp.pending {
_, isLocal := mp.localAddrs[a]
if isLocal {
continue
}
delete(mp.pending, a)
}
}
33 changes: 33 additions & 0 deletions cli/mpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var mpoolCmd = &cli.Command{
Usage: "Manage message pool",
Subcommands: []*cli.Command{
mpoolPending,
mpoolClear,
mpoolSub,
mpoolStat,
mpoolReplaceCmd,
Expand Down Expand Up @@ -83,6 +84,38 @@ var mpoolPending = &cli.Command{
},
}

var mpoolClear = &cli.Command{
Name: "clear",
Usage: "Clear all pending messages from the mpool (USE WITH CARE)",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "local",
Usage: "also clear local messages",
},
&cli.BoolFlag{
Name: "really-do-it",
Usage: "must be specified for the action to take effect",
},
},
Action: func(cctx *cli.Context) error {
api, closer, err := GetFullNodeAPI(cctx)
if err != nil {
return err
}
defer closer()

really := cctx.Bool("really-do-it")
if !really {
return fmt.Errorf("--really-do-it must be specified for this action to have an effect; you have been warned.")
}

local := cctx.Bool("local")

ctx := ReqContext(cctx)
return api.MpoolClear(ctx, local)
},
}

var mpoolSub = &cli.Command{
Name: "sub",
Usage: "Subscribe to mpool changes",
Expand Down
15 changes: 15 additions & 0 deletions documentation/en/mpool.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ The full node API defines the following methods for interacting with the mpool:
MpoolSub(context.Context) (<-chan MpoolUpdate, error)
MpoolGetConfig(context.Context) (*types.MpoolConfig, error)
MpoolSetConfig(context.Context, *types.MpoolConfig) error
MpoolClear(context.Context, local bool) error
```

### MpoolPending
Expand Down Expand Up @@ -61,6 +62,13 @@ Returns (a copy of) the current mpool configuration.

Sets the mpool configuration to (a copy of) the supplied configuration object.

### MpoolClear

Clears pending messages from the mpool; if `local` is `true` then local messages are also cleared and removed from the datastore.

This should be used with extreme care and only in the case of errors during head changes that
would leave the mpool in an inconsistent state.


## Command Line Interfae

Expand All @@ -75,6 +83,7 @@ lotus mpool stat [--local]
lotus mpool replace [--gas-feecap <feecap>] [--gas-premium <premium>] [--gas-limit <limit>] [from] [nonce]
lotus mpool find [--from <address>] [--to <address>] [--method <int>]
lotus mpool config [<configuration>]
lotus mpool clear [--local]
```

### lotus mpool pending
Expand All @@ -98,6 +107,12 @@ Searches for messages in the mpool.
### lotus mpool config
Gets or sets the current mpool configuration.

### lotus mpool clear
Unconditionally clears pending messages from the mpool.
If the `--local` flag is passed, then local messages are also cleared; otherwise local messages are retained.

*Warning*: this command should only be used in the case of head change errors leaving the mpool in an state.

## Configuration

The mpool a few parameters that can be configured by the user, either through the API
Expand Down
5 changes: 5 additions & 0 deletions node/impl/full/mpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ func (a *MpoolAPI) MpoolPending(ctx context.Context, tsk types.TipSetKey) ([]*ty
}
}

func (a *MpoolAPI) MpoolClear(ctx context.Context, local bool) error {
a.Mpool.Clear(local)
return nil
}

func (a *MpoolAPI) MpoolPush(ctx context.Context, smsg *types.SignedMessage) (cid.Cid, error) {
return a.Mpool.Push(smsg)
}
Expand Down

0 comments on commit 9e2f56b

Please sign in to comment.