From d8b9b2a85b297208b7fe699e842167dbce752160 Mon Sep 17 00:00:00 2001 From: Quentin McGaw Date: Fri, 19 Jan 2024 16:37:47 +0000 Subject: [PATCH] feat(natpmp): rpc error contain all failed attempt messages --- internal/natpmp/portmapping_test.go | 3 +- internal/natpmp/rpc.go | 61 ++++++++++++++++++++++++++--- internal/natpmp/rpc_test.go | 42 +++++++++++++++++++- 3 files changed, 98 insertions(+), 8 deletions(-) diff --git a/internal/natpmp/portmapping_test.go b/internal/natpmp/portmapping_test.go index c25006191..c07edfc50 100644 --- a/internal/natpmp/portmapping_test.go +++ b/internal/natpmp/portmapping_test.go @@ -49,7 +49,8 @@ func Test_Client_AddPortMapping(t *testing.T) { initialConnectionDuration: time.Millisecond, exchanges: []udpExchange{{close: true}}, err: ErrConnectionTimeout, - errMessage: "executing remote procedure call: connection timeout: after 1ms", + errMessage: "executing remote procedure call: connection timeout: failed attempts: " + + "read udp 127.0.0.1:[1-9][0-9]{0,4}->127.0.0.1:[1-9][0-9]{0,4}: i/o timeout \\(try 1\\)", }, "add_udp": { ctx: context.Background(), diff --git a/internal/natpmp/rpc.go b/internal/natpmp/rpc.go index 88f6e3d83..f94677462 100644 --- a/internal/natpmp/rpc.go +++ b/internal/natpmp/rpc.go @@ -6,6 +6,8 @@ import ( "fmt" "net" "net/netip" + "sort" + "strings" "time" ) @@ -65,9 +67,8 @@ func (c *Client) rpc(ctx context.Context, gateway netip.Addr, // Note it does not double if the source IP mismatches the gateway IP. connectionDuration := c.initialConnectionDuration - var totalRetryDuration time.Duration - var retryCount uint + var failedAttempts []string for retryCount = 0; retryCount < c.maxRetries; retryCount++ { deadline := time.Now().Add(connectionDuration) err = connection.SetDeadline(deadline) @@ -87,8 +88,8 @@ func (c *Client) rpc(ctx context.Context, gateway netip.Addr, } var netErr net.Error if errors.As(err, &netErr) && netErr.Timeout() { - totalRetryDuration += connectionDuration connectionDuration *= 2 + failedAttempts = append(failedAttempts, netErr.Error()) continue } return nil, fmt.Errorf("reading from udp connection: %w", err) @@ -98,6 +99,9 @@ func (c *Client) rpc(ctx context.Context, gateway netip.Addr, // Upon receiving a response packet, the client MUST check the source IP // address, and silently discard the packet if the address is not the // address of the gateway to which the request was sent. + failedAttempts = append(failedAttempts, + fmt.Sprintf("received response from %s instead of gateway IP %s", + receivedRemoteAddress.IP, gatewayAddress.IP)) continue } @@ -106,8 +110,8 @@ func (c *Client) rpc(ctx context.Context, gateway netip.Addr, } if retryCount == c.maxRetries { - return nil, fmt.Errorf("%w: after %s", - ErrConnectionTimeout, totalRetryDuration) + return nil, fmt.Errorf("%w: failed attempts: %s", + ErrConnectionTimeout, dedupFailedAttempts(failedAttempts)) } // Opcodes between 0 and 127 are client requests. Opcodes from 128 to @@ -121,3 +125,50 @@ func (c *Client) rpc(ctx context.Context, gateway netip.Addr, return response, nil } + +func dedupFailedAttempts(failedAttempts []string) (errorMessage string) { + type data struct { + message string + indices []int + } + messageToData := make(map[string]data, len(failedAttempts)) + for i, message := range failedAttempts { + metadata, ok := messageToData[message] + if !ok { + metadata.message = message + } + metadata.indices = append(metadata.indices, i) + sort.Slice(metadata.indices, func(i, j int) bool { + return metadata.indices[i] < metadata.indices[j] + }) + messageToData[message] = metadata + } + + // Sort by first index + dataSlice := make([]data, 0, len(messageToData)) + for _, metadata := range messageToData { + dataSlice = append(dataSlice, metadata) + } + sort.Slice(dataSlice, func(i, j int) bool { + return dataSlice[i].indices[0] < dataSlice[j].indices[0] + }) + + dedupedFailedAttempts := make([]string, 0, len(dataSlice)) + for _, data := range dataSlice { + newMessage := fmt.Sprintf("%s (%s)", data.message, + indicesToTryString(data.indices)) + dedupedFailedAttempts = append(dedupedFailedAttempts, newMessage) + } + return strings.Join(dedupedFailedAttempts, "; ") +} + +func indicesToTryString(indices []int) string { + if len(indices) == 1 { + return fmt.Sprintf("try %d", indices[0]+1) + } + tries := make([]string, len(indices)) + for i, index := range indices { + tries[i] = fmt.Sprintf("%d", index+1) + } + return fmt.Sprintf("tries %s", strings.Join(tries, ", ")) +} diff --git a/internal/natpmp/rpc_test.go b/internal/natpmp/rpc_test.go index 7ff534eed..08d5a3ae3 100644 --- a/internal/natpmp/rpc_test.go +++ b/internal/natpmp/rpc_test.go @@ -53,8 +53,9 @@ func Test_Client_rpc(t *testing.T) { exchanges: []udpExchange{ {request: []byte{0, 1}, close: true}, }, - err: ErrConnectionTimeout, - errMessage: "connection timeout: after 1ms", + err: ErrConnectionTimeout, + errMessage: "connection timeout: failed attempts: " + + "read udp 127.0.0.1:[1-9][0-9]{0,4}->127.0.0.1:[1-9][0-9]{0,4}: i/o timeout \\(try 1\\)", }, "response_too_small": { ctx: context.Background(), @@ -164,3 +165,40 @@ func Test_Client_rpc(t *testing.T) { }) } } + +func Test_dedupFailedAttempts(t *testing.T) { + t.Parallel() + + testCases := map[string]struct { + failedAttempts []string + expected string + }{ + "empty": {}, + "single_attempt": { + failedAttempts: []string{"test"}, + expected: "test (try 1)", + }, + "multiple_same_attempts": { + failedAttempts: []string{"test", "test", "test"}, + expected: "test (tries 1, 2, 3)", + }, + "multiple_different_attempts": { + failedAttempts: []string{"test1", "test2", "test3"}, + expected: "test1 (try 1); test2 (try 2); test3 (try 3)", + }, + "soup_mix": { + failedAttempts: []string{"test1", "test2", "test1", "test3", "test2"}, + expected: "test1 (tries 1, 3); test2 (tries 2, 5); test3 (try 4)", + }, + } + + for name, testCase := range testCases { + testCase := testCase + t.Run(name, func(t *testing.T) { + t.Parallel() + + actual := dedupFailedAttempts(testCase.failedAttempts) + assert.Equal(t, testCase.expected, actual) + }) + } +}