Skip to content

Commit

Permalink
Upgrade to [email protected] and socket improvements (keith1024)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jon Eyrick authored Feb 8, 2018
2 parents 80964e6 + 5a7e644 commit 43cb166
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 67 deletions.
121 changes: 55 additions & 66 deletions node-binance-api.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,51 +194,29 @@ LIMIT_MAKER
}, 'POST');
};
////////////////////////////
const _handleSocketClose = function(reconnect, error) {
/*
switch ( error ) {
case 1000: // CLOSE_NORMAL
options.log("WebSocket: closed");
break;
default: // Abnormal closure
this.reconnect(error);
break;
}
this.onclose(e);
*/
if ( reconnect && options.reconnect ) {
if ( this.endpoint && parseInt(this.endpoint.length, 10) === 60 ) options.log('Account data WebSocket reconnecting..');
else options.log('WebSocket reconnecting: '+this.endpoint);
const _handleSocketClose = function(reconnect, code, reason) {
delete subscriptions[this.endpoint];
options.log('WebSocket closed: '+this.endpoint+
(code ? ' ('+code+')' : '')+
(reason ? ' '+reason : '')
);
if ( options.reconnect && this.reconnect && reconnect ) {
if ( parseInt(this.endpoint.length, 10) === 60 ) options.log('Account data WebSocket reconnecting...');
else options.log('WebSocket reconnecting: '+this.endpoint+'...');
try {
reconnect();
} catch ( error ) {
options.log('WebSocket reconnect error: '+error.message);
}
} else options.log('WebSocket connection closed! '+this.endpoint);
};
const _handleSocketUnexpectedResponse = function(reconnect, req, res) {
//Thanks vaielab! https://github.com/jaggedsoft/node-binance-api/issues/77
options.log('WebSocket Unexpected response: '+this.endpoint);
if ( reconnect && options.reconnect ) {
options.log('..WebSocket reconnecting: '+this.endpoint);
reconnect();
}
return true;
}
};
const _handleSocketError = function(reconnect, error) {
options.log(error);
options.log("WebSocket Error: "+this.endpoint, error.code);
/*
switch ( error.code ) {
case 'ECONNREFUSED':
this.reconnect(error);
break;
default:
this.onerror(error);
break;
}
*/
return true;
const _handleSocketError = function(error) {
// Errors ultimately result in a `close` event.
// see: https://github.com/websockets/ws/blob/828194044bf247af852b31c49e2800d557fedeff/lib/websocket.js#L126
options.log('WebSocket error: '+this.endpoint+
(error.code ? ' ('+error.code+')' : '')+
(error.message ? ' '+error.message : '')
);
};
const _handleSocketHeartbeat = function() {
this.isAlive = true;
Expand All @@ -252,59 +230,58 @@ LIMIT_MAKER
const ws = subscriptions[endpointId];
if ( ws.isAlive ) {
ws.isAlive = false;
ws.ping(noop);
if ( ws.readyState === WebSocket.OPEN) ws.ping(noop);
} else {
if ( options.verbose ) options.log("Terminating inactive/broken WebSocket: "+ws.endpoint);
ws.terminate();
if ( ws.readyState === WebSocket.OPEN) ws.terminate();
}
}
}, 30000);
const subscribe = function(endpoint, callback, reconnect = false) {
if ( options.verbose ) options.log("Subscribed to "+endpoint);
const ws = new WebSocket(stream+endpoint);
ws.reconnect = options.reconnect;
ws.endpoint = endpoint;
ws.isAlive = false;
ws.on('open', function() {
//options.log('subscribe('+this.endpoint+')');
this.isAlive = true;
subscriptions[this.endpoint] = this;
});
ws.on('pong', _handleSocketHeartbeat);
ws.on('error', _handleSocketError);
ws.on('close', _handleSocketClose.bind(ws, reconnect));
ws.on('unexpected-response', _handleSocketUnexpectedResponse.bind(ws, reconnect));
ws.on('error', _handleSocketError.bind(ws, reconnect));
ws.on('message', function(data) {
//options.log(data);
try {
callback(JSON.parse(data));
} catch (error) {
options.log('Parse error: '+error.message);
}
});
subscriptions[endpoint] = ws;
return ws;
};
const subscribeCombined = function(streams, callback, reconnect = false) {
const queryParams = streams.join('/');
const ws = new WebSocket(combineStream+queryParams);
ws.reconnect = options.reconnect;
ws.endpoint = stringHash(queryParams);
ws.isAlive = false;
if ( options.verbose ) options.log('CombinedStream: Subscribed to ['+ws.endpoint+'] '+queryParams);
ws.on('open', function() {
//options.log('CombinedStream: WebSocket connection open: '+this.endpoint, queryParms);
this.isAlive = true;
subscriptions[this.endpoint] = this;
});
ws.on('pong', _handleSocketHeartbeat);
ws.on('error', _handleSocketError);
ws.on('close', _handleSocketClose.bind(ws, reconnect));
ws.on('unexpected-response', _handleSocketUnexpectedResponse.bind(ws, reconnect));
ws.on('error', _handleSocketError.bind(ws, reconnect));
ws.on('message', function(data) {
try {
callback(JSON.parse(data).data);
} catch (error) {
options.log('CombinedStream: Parse error: '+error.message);
}
});
subscriptions[ws.endpoint] = ws;
return ws;
};
const userDataHandler = function(data) {
Expand Down Expand Up @@ -807,7 +784,7 @@ LIMIT_MAKER
else if ( symbol.substr(-4) === 'USDT' ) return 'USDT';
},
websockets: {
userData: function userData(callback, execution_callback = false) {
userData: function userData(callback, execution_callback = false, subscribed_callback = false) {
let reconnect = function() {
if ( options.reconnect ) userData(callback, execution_callback);
};
Expand All @@ -822,7 +799,8 @@ LIMIT_MAKER
}, 60 * 30 * 1000); // 30 minute keepalive
options.balance_callback = callback;
options.execution_callback = execution_callback;
subscribe(options.listenKey, userDataHandler, reconnect);
const subscription = subscribe(options.listenKey, userDataHandler, reconnect);
if ( subscribed_callback ) subscribed_callback(subscription.endpoint);
},'POST');
},
subscribe: function(url, callback, reconnect = false) {
Expand All @@ -831,29 +809,30 @@ LIMIT_MAKER
subscriptions: function() {
return subscriptions;
},
terminate: function(endpoint, disable_reconnect = true) {
if ( disable_reconnect ) options.reconnect = false; // Disable auto reconnect by default
terminate: function(endpoint) {
let ws = subscriptions[endpoint];
if ( !ws ) return;
options.log('WebSocket terminated:', endpoint);
ws.reconnect = false;
ws.terminate();
delete subscriptions[endpoint];
},
depth: function depth(symbols, callback) {
let reconnect = function() {
if ( options.reconnect ) depth(symbols, callback);
};

let subscription = undefined;
if ( Array.isArray(symbols) ) {
if ( !isArrayUnique(symbols) ) throw Error('depth: "symbols" cannot contain duplicate elements.');
let streams = symbols.map(function(symbol) {
return symbol.toLowerCase()+'@depth';
});
subscribeCombined(streams, callback, reconnect);
subscription = subscribeCombined(streams, callback, reconnect);
} else {
let symbol = symbols;
subscribe(symbol.toLowerCase()+'@depth', callback, reconnect);
subscription = subscribe(symbol.toLowerCase()+'@depth', callback, reconnect);
}
return subscription.endpoint;
},
depthCache: function depthCacheFunction(symbols, callback, limit = 500) {
let reconnect = function() {
Expand Down Expand Up @@ -895,37 +874,41 @@ LIMIT_MAKER
// If an array of symbols are sent we use a combined stream connection rather.
// This is transparent to the developer, and results in a single socket connection.
// This essentially eliminates "unexpected response" errors when subscribing to a lot of data.
let subscription = undefined;
if ( Array.isArray(symbols) ) {
if ( !isArrayUnique(symbols) ) throw Error('depthCache: "symbols" cannot contain duplicate elements.');

symbols.forEach(symbolDepthInit);
let streams = symbols.map(function (symbol) {
return symbol.toLowerCase()+'@depth';
});
subscribeCombined(streams, handleDepthStreamData, reconnect);
subscription = subscribeCombined(streams, handleDepthStreamData, reconnect);
symbols.forEach(getSymbolDepthSnapshot);
} else {
let symbol = symbols;
symbolDepthInit(symbol);
subscribe(symbol.toLowerCase()+'@depth', handleDepthStreamData, reconnect);
subscription = subscribe(symbol.toLowerCase()+'@depth', handleDepthStreamData, reconnect);
getSymbolDepthSnapshot(symbol);
}
return subscription.endpoint;
},
trades: function trades(symbols, callback) {
let reconnect = function() {
if ( options.reconnect ) trades(symbols, callback);
};

let subscription = undefined;
if ( Array.isArray(symbols) ) {
if ( !isArrayUnique(symbols) ) throw Error('trades: "symbols" cannot contain duplicate elements.');
let streams = symbols.map(function(symbol) {
return symbol.toLowerCase()+'@aggTrade';
});
subscribeCombined(streams, callback, reconnect);
subscription = subscribeCombined(streams, callback, reconnect);
} else {
let symbol = symbols;
subscribe(symbol.toLowerCase()+'@aggTrade', callback, reconnect);
subscription = subscribe(symbol.toLowerCase()+'@aggTrade', callback, reconnect);
}
return subscription.endpoint;
},
chart: function chart(symbols, interval, callback) {
let reconnect = function() {
Expand Down Expand Up @@ -968,20 +951,22 @@ LIMIT_MAKER
});
};

let subscription = undefined;
if ( Array.isArray(symbols) ) {
if ( !isArrayUnique(symbols) ) throw Error('chart: "symbols" cannot contain duplicate elements.');
symbols.forEach(symbolChartInit);
let streams = symbols.map(function(symbol) {
return symbol.toLowerCase()+`@kline_`+interval;
});
subscribeCombined(streams, handleKlineStreamData, reconnect);
subscription = subscribeCombined(streams, handleKlineStreamData, reconnect);
symbols.forEach(getSymbolKlineSnapshot);
} else {
let symbol = symbols;
symbolChartInit(symbol);
subscribe(symbol.toLowerCase()+'@kline_'+interval, handleKlineStreamData, reconnect);
subscription = subscribe(symbol.toLowerCase()+'@kline_'+interval, handleKlineStreamData, reconnect);
getSymbolKlineSnapshot(symbol);
}
return subscription.endpoint;
},
candlesticks: function candlesticks(symbols, interval, callback) {
let reconnect = function() {
Expand All @@ -990,45 +975,49 @@ LIMIT_MAKER
// If an array of symbols are sent we use a combined stream connection rather.
// This is transparent to the developer, and results in a single socket connection.
// This essentially eliminates "unexpected response" errors when subscribing to a lot of data.
let subscription = undefined;
if ( Array.isArray(symbols) ) {
if ( !isArrayUnique(symbols) ) throw Error('candlesticks: "symbols" cannot contain duplicate elements.');
let streams = symbols.map(function (symbol) {
return symbol.toLowerCase()+'@kline_'+interval;
});
subscribeCombined(streams, callback, reconnect);
subscription = subscribeCombined(streams, callback, reconnect);
} else {
let symbol = symbols.toLowerCase();
subscribe(symbol+'@kline_'+interval, callback, reconnect);
subscription = subscribe(symbol+'@kline_'+interval, callback, reconnect);
}
return subscription.endpoint;
},
prevDay: function prevDay(symbols, callback) {
let reconnect = function() {
if ( options.reconnect ) prevDay(symbols, callback);
};

let subscription = undefined;
// Combine stream for array of symbols
if ( Array.isArray(symbols) ) {
if ( !isArrayUnique(symbols) ) throw Error('prevDay: "symbols" cannot contain duplicate elements.');
let streams = symbols.map(function(symbol) {
return symbol.toLowerCase()+'@ticker';
});
subscribeCombined(streams, function(data) {
subscription = subscribeCombined(streams, function(data) {
prevDayStreamHandler(data, callback);
}, reconnect);
// Raw stream for a single symbol
} else if ( symbols ) {
let symbol = symbols;
subscribe(symbol.toLowerCase()+'@ticker', function(data) {
subscription = subscribe(symbol.toLowerCase()+'@ticker', function(data) {
prevDayStreamHandler(data, callback);
}, reconnect);
// Raw stream of all listed symbols
} else {
subscribe('!ticker@arr', function(data) {
subscription = subscribe('!ticker@arr', function(data) {
for ( let line of data ) {
prevDayStreamHandler(line, callback);
}
}, reconnect);
}
return subscription.endpoint;
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"dependencies": {
"request": "^2.83.0",
"string-hash": "^1.1.3",
"ws": "3.2.0"
"ws": "^4.0.0"
},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
Expand Down

0 comments on commit 43cb166

Please sign in to comment.