Skip to content
This repository has been archived by the owner on Feb 17, 2022. It is now read-only.

Commit

Permalink
Merge pull request #25 from aiden/should_retry
Browse files Browse the repository at this point in the history
Add shouldRetry to Service.withRetry
  • Loading branch information
hchauvin authored Oct 11, 2019
2 parents eeac2bd + 1b408c2 commit f0a8f1c
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 8 deletions.
32 changes: 31 additions & 1 deletion src/client/__tests__/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,13 @@
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/
import { Service, ServiceRetrier, serviceInstance } from '../service';
import {
Service,
ServiceRetrier,
serviceInstance,
DEFAULT_SHOULD_RETRY,
} from '../service';
import { ClientProtocolError, ClientRpcError } from '../errors';
import * as sinon from 'sinon';
import { Stream } from '../stream';
import { EventEmitter } from 'events';
Expand Down Expand Up @@ -271,6 +277,30 @@ describe('rpc_ts', () => {
);
});
});

describe('DEFAULT_SHOULD_RETRY', () => {
it('retries on errors by default', () => {
const err = new Error('__error__');
expect(DEFAULT_SHOULD_RETRY(err)).to.be.true;
});

it('fails immediately on a protocol error', () => {
const err = new ClientProtocolError('__url__', '__message__');
expect(DEFAULT_SHOULD_RETRY(err)).to.be.false;
});

it('fails immediately on an invalid argument error', () => {
const err = new ClientRpcError(
ModuleRpcCommon.RpcErrorType.invalidArgument,
);
expect(DEFAULT_SHOULD_RETRY(err)).to.be.false;
});

it('retries on a "service unavailable" error', () => {
const err = new ClientRpcError(ModuleRpcCommon.RpcErrorType.unavailable);
expect(DEFAULT_SHOULD_RETRY(err)).to.be.true;
});
});
});

const testServiceDefinition = {
Expand Down
21 changes: 21 additions & 0 deletions src/client/__tests__/stream_retrier.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,27 @@ describe('rpc_ts', () => {
['error', '__error__'],
]);
});

it('abandons immediately if shouldRetry yields false', async () => {
const retryingStream = streamRetrier.retryStream(
() => {
const stream = new MockStream();
setTimeout(() => {
stream.emit('error', new Error('__error__'));
});
return stream;
},
undefined,
(err: Error) => err.message !== '__error__',
);
const events = getRetryingStreamEvents(retryingStream);
retryingStream.start();
await waitForRetryingStream(retryingStream);
expect(events).to.deep.equal([
['retryingError', '__error__', 0, true],
['error', '__error__'],
]);
});
});

it('can be canceled', async () => {
Expand Down
43 changes: 42 additions & 1 deletion src/client/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,42 @@ import { mapValuesWithStringKeys } from '../utils/utils';
import * as events from 'events';
import { retryStream } from './stream_retrier';

/**
* Default list of [[ModuleRpcCommon.RpcErrorType]] error types that
* should lead to an immediate failure and not to a retry.
*/
export const ERROR_TYPES_TO_FAIL_IMMEDIATELY: ModuleRpcCommon.RpcErrorType[] = [
ModuleRpcCommon.RpcErrorType.invalidArgument,
ModuleRpcCommon.RpcErrorType.notFound,
ModuleRpcCommon.RpcErrorType.alreadyExists,
ModuleRpcCommon.RpcErrorType.permissionDenied,
ModuleRpcCommon.RpcErrorType.failedPrecondition,
ModuleRpcCommon.RpcErrorType.unimplemented,
ModuleRpcCommon.RpcErrorType.internal,
ModuleRpcCommon.RpcErrorType.unauthenticated,
];

/**
* Default `shouldRetry` predicate.
*
* With this predicate, all errors lead to a retry except errors of
* class [[ClientProtocolError]] and of class [[ClientRpcError]] with
* type included in a specific subset of [[ModuleRpcCommon.RpcErrorType]].
*
* @see [[Service.withRetry]]
*/
export const DEFAULT_SHOULD_RETRY = (err: Error): boolean => {
if (err instanceof ClientProtocolError) {
return false;
} else if (
err instanceof ClientRpcError &&
ERROR_TYPES_TO_FAIL_IMMEDIATELY.includes(err.errorType)
) {
return false;
}
return true;
};

/**
* Implements a service from a [[StreamProducer]]. The StreamProducer
* is provided by a streaming protocol, for instance through [[getGrpcWebClient]] in the
Expand Down Expand Up @@ -130,11 +166,14 @@ export class Service<
* the methods of this service.
*
* @param backoffOptions Options for the exponential backoff.
* @param shouldRetry Determines whether an error should be retried (the
* function returns `true`) or the method should fail immediately.
*/
withRetry(
backoffOptions: Partial<BackoffOptions> = {},
shouldRetry: (err: Error) => boolean = DEFAULT_SHOULD_RETRY,
): ServiceRetrier<serviceDefinition, ResponseContext> {
return new ServiceRetrierImpl(this, backoffOptions);
return new ServiceRetrierImpl(this, backoffOptions, shouldRetry);
}

/**
Expand Down Expand Up @@ -331,6 +370,7 @@ class ServiceRetrierImpl<
constructor(
private readonly baseService: Service<serviceDefinition, ResponseContext>,
private readonly backoffOptions: Partial<BackoffOptions>,
private readonly shouldRetry: (err: Error) => boolean,
) {
super();
}
Expand All @@ -353,6 +393,7 @@ class ServiceRetrierImpl<
return retryStream<ModuleRpcCommon.ResponseFor<serviceDefinition, method>>(
() => this.baseService.stream(method, request),
this.backoffOptions,
this.shouldRetry,
)
.on('ready', () => {
this.emit('serviceReady', method, request);
Expand Down
21 changes: 15 additions & 6 deletions src/client/stream_retrier.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import { sleep } from '../utils/utils';
* stream emits a 'complete' or 'canceled' event, or the maximum number of retries,
* as per the exponential backoff schedule, has been reached.
* @param backoffOptions Options for the exponential backoff.
* @param shouldRetry Determines whether an error should be retried (the
* function returns `true`) or the method should fail immediately.
*
* @return A stream that is in effect a "concatenation" of all the streams initiated
* by `getStream`, with special events emitted to inform on the retrying process.
Expand All @@ -38,11 +40,16 @@ import { sleep } from '../utils/utils';
export function retryStream<Message>(
getStream: () => Stream<Message>,
backoffOptions?: Partial<BackoffOptions>,
shouldRetry: (err: Error) => boolean = () => true,
): RetryingStream<Message> {
return new RetryingStreamImpl<Message>(getStream, {
...DEFAULT_BACKOFF_OPTIONS,
...backoffOptions,
});
return new RetryingStreamImpl<Message>(
getStream,
{
...DEFAULT_BACKOFF_OPTIONS,
...backoffOptions,
},
shouldRetry,
);
}

/**
Expand Down Expand Up @@ -115,6 +122,7 @@ class RetryingStreamImpl<Message> extends events.EventEmitter
constructor(
private readonly getStream: () => Stream<Message>,
private readonly options: BackoffOptions,
private readonly shouldRetry: (err: Error) => boolean,
) {
super();
}
Expand All @@ -138,8 +146,9 @@ class RetryingStreamImpl<Message> extends events.EventEmitter
})
.on('error', async err => {
if (
this.options.maxRetries >= 0 &&
this.retriesSinceLastReady >= this.options.maxRetries
(this.options.maxRetries >= 0 &&
this.retriesSinceLastReady >= this.options.maxRetries) ||
!this.shouldRetry(err)
) {
this.state = RetryingStreamState.abandoned;
this.emit('retryingError', err, this.retriesSinceLastReady, true);
Expand Down

0 comments on commit f0a8f1c

Please sign in to comment.