diff --git a/node-binance-api.js b/node-binance-api.js index 8f744a3a..e6ec95db 100644 --- a/node-binance-api.js +++ b/node-binance-api.js @@ -194,51 +194,29 @@ LIMIT_MAKER }, 'POST'); }; //////////////////////////// - const _handleSocketClose = function(reconnect, error) { - /* - switch ( error ) { - case 1000: // CLOSE_NORMAL - options.log("WebSocket: closed"); - break; - default: // Abnormal closure - this.reconnect(error); - break; - } - this.onclose(e); - */ - if ( reconnect && options.reconnect ) { - if ( this.endpoint && parseInt(this.endpoint.length, 10) === 60 ) options.log('Account data WebSocket reconnecting..'); - else options.log('WebSocket reconnecting: '+this.endpoint); + const _handleSocketClose = function(reconnect, code, reason) { + delete subscriptions[this.endpoint]; + options.log('WebSocket closed: '+this.endpoint+ + (code ? ' ('+code+')' : '')+ + (reason ? ' '+reason : '') + ); + if ( options.reconnect && this.reconnect && reconnect ) { + if ( parseInt(this.endpoint.length, 10) === 60 ) options.log('Account data WebSocket reconnecting...'); + else options.log('WebSocket reconnecting: '+this.endpoint+'...'); try { reconnect(); } catch ( error ) { options.log('WebSocket reconnect error: '+error.message); } - } else options.log('WebSocket connection closed! '+this.endpoint); - }; - const _handleSocketUnexpectedResponse = function(reconnect, req, res) { - //Thanks vaielab! https://github.com/jaggedsoft/node-binance-api/issues/77 - options.log('WebSocket Unexpected response: '+this.endpoint); - if ( reconnect && options.reconnect ) { - options.log('..WebSocket reconnecting: '+this.endpoint); - reconnect(); - } - return true; + } }; - const _handleSocketError = function(reconnect, error) { - options.log(error); - options.log("WebSocket Error: "+this.endpoint, error.code); - /* - switch ( error.code ) { - case 'ECONNREFUSED': - this.reconnect(error); - break; - default: - this.onerror(error); - break; - } - */ - return true; + const _handleSocketError = function(error) { + // Errors ultimately result in a `close` event. + // see: https://github.com/websockets/ws/blob/828194044bf247af852b31c49e2800d557fedeff/lib/websocket.js#L126 + options.log('WebSocket error: '+this.endpoint+ + (error.code ? ' ('+error.code+')' : '')+ + (error.message ? ' '+error.message : '') + ); }; const _handleSocketHeartbeat = function() { this.isAlive = true; @@ -252,51 +230,51 @@ LIMIT_MAKER const ws = subscriptions[endpointId]; if ( ws.isAlive ) { ws.isAlive = false; - ws.ping(noop); + if ( ws.readyState === WebSocket.OPEN) ws.ping(noop); } else { if ( options.verbose ) options.log("Terminating inactive/broken WebSocket: "+ws.endpoint); - ws.terminate(); + if ( ws.readyState === WebSocket.OPEN) ws.terminate(); } } }, 30000); const subscribe = function(endpoint, callback, reconnect = false) { if ( options.verbose ) options.log("Subscribed to "+endpoint); const ws = new WebSocket(stream+endpoint); + ws.reconnect = options.reconnect; ws.endpoint = endpoint; ws.isAlive = false; ws.on('open', function() { //options.log('subscribe('+this.endpoint+')'); this.isAlive = true; + subscriptions[this.endpoint] = this; }); ws.on('pong', _handleSocketHeartbeat); + ws.on('error', _handleSocketError); ws.on('close', _handleSocketClose.bind(ws, reconnect)); - ws.on('unexpected-response', _handleSocketUnexpectedResponse.bind(ws, reconnect)); - ws.on('error', _handleSocketError.bind(ws, reconnect)); ws.on('message', function(data) { - //options.log(data); try { callback(JSON.parse(data)); } catch (error) { options.log('Parse error: '+error.message); } }); - subscriptions[endpoint] = ws; return ws; }; const subscribeCombined = function(streams, callback, reconnect = false) { const queryParams = streams.join('/'); const ws = new WebSocket(combineStream+queryParams); + ws.reconnect = options.reconnect; ws.endpoint = stringHash(queryParams); ws.isAlive = false; if ( options.verbose ) options.log('CombinedStream: Subscribed to ['+ws.endpoint+'] '+queryParams); ws.on('open', function() { //options.log('CombinedStream: WebSocket connection open: '+this.endpoint, queryParms); this.isAlive = true; + subscriptions[this.endpoint] = this; }); ws.on('pong', _handleSocketHeartbeat); + ws.on('error', _handleSocketError); ws.on('close', _handleSocketClose.bind(ws, reconnect)); - ws.on('unexpected-response', _handleSocketUnexpectedResponse.bind(ws, reconnect)); - ws.on('error', _handleSocketError.bind(ws, reconnect)); ws.on('message', function(data) { try { callback(JSON.parse(data).data); @@ -304,7 +282,6 @@ LIMIT_MAKER options.log('CombinedStream: Parse error: '+error.message); } }); - subscriptions[ws.endpoint] = ws; return ws; }; const userDataHandler = function(data) { @@ -807,7 +784,7 @@ LIMIT_MAKER else if ( symbol.substr(-4) === 'USDT' ) return 'USDT'; }, websockets: { - userData: function userData(callback, execution_callback = false) { + userData: function userData(callback, execution_callback = false, subscribed_callback = false) { let reconnect = function() { if ( options.reconnect ) userData(callback, execution_callback); }; @@ -822,7 +799,8 @@ LIMIT_MAKER }, 60 * 30 * 1000); // 30 minute keepalive options.balance_callback = callback; options.execution_callback = execution_callback; - subscribe(options.listenKey, userDataHandler, reconnect); + const subscription = subscribe(options.listenKey, userDataHandler, reconnect); + if ( subscribed_callback ) subscribed_callback(subscription.endpoint); },'POST'); }, subscribe: function(url, callback, reconnect = false) { @@ -831,29 +809,30 @@ LIMIT_MAKER subscriptions: function() { return subscriptions; }, - terminate: function(endpoint, disable_reconnect = true) { - if ( disable_reconnect ) options.reconnect = false; // Disable auto reconnect by default + terminate: function(endpoint) { let ws = subscriptions[endpoint]; if ( !ws ) return; options.log('WebSocket terminated:', endpoint); + ws.reconnect = false; ws.terminate(); - delete subscriptions[endpoint]; }, depth: function depth(symbols, callback) { let reconnect = function() { if ( options.reconnect ) depth(symbols, callback); }; + let subscription = undefined; if ( Array.isArray(symbols) ) { if ( !isArrayUnique(symbols) ) throw Error('depth: "symbols" cannot contain duplicate elements.'); let streams = symbols.map(function(symbol) { return symbol.toLowerCase()+'@depth'; }); - subscribeCombined(streams, callback, reconnect); + subscription = subscribeCombined(streams, callback, reconnect); } else { let symbol = symbols; - subscribe(symbol.toLowerCase()+'@depth', callback, reconnect); + subscription = subscribe(symbol.toLowerCase()+'@depth', callback, reconnect); } + return subscription.endpoint; }, depthCache: function depthCacheFunction(symbols, callback, limit = 500) { let reconnect = function() { @@ -895,6 +874,7 @@ LIMIT_MAKER // If an array of symbols are sent we use a combined stream connection rather. // This is transparent to the developer, and results in a single socket connection. // This essentially eliminates "unexpected response" errors when subscribing to a lot of data. + let subscription = undefined; if ( Array.isArray(symbols) ) { if ( !isArrayUnique(symbols) ) throw Error('depthCache: "symbols" cannot contain duplicate elements.'); @@ -902,30 +882,33 @@ LIMIT_MAKER let streams = symbols.map(function (symbol) { return symbol.toLowerCase()+'@depth'; }); - subscribeCombined(streams, handleDepthStreamData, reconnect); + subscription = subscribeCombined(streams, handleDepthStreamData, reconnect); symbols.forEach(getSymbolDepthSnapshot); } else { let symbol = symbols; symbolDepthInit(symbol); - subscribe(symbol.toLowerCase()+'@depth', handleDepthStreamData, reconnect); + subscription = subscribe(symbol.toLowerCase()+'@depth', handleDepthStreamData, reconnect); getSymbolDepthSnapshot(symbol); } + return subscription.endpoint; }, trades: function trades(symbols, callback) { let reconnect = function() { if ( options.reconnect ) trades(symbols, callback); }; + let subscription = undefined; if ( Array.isArray(symbols) ) { if ( !isArrayUnique(symbols) ) throw Error('trades: "symbols" cannot contain duplicate elements.'); let streams = symbols.map(function(symbol) { return symbol.toLowerCase()+'@aggTrade'; }); - subscribeCombined(streams, callback, reconnect); + subscription = subscribeCombined(streams, callback, reconnect); } else { let symbol = symbols; - subscribe(symbol.toLowerCase()+'@aggTrade', callback, reconnect); + subscription = subscribe(symbol.toLowerCase()+'@aggTrade', callback, reconnect); } + return subscription.endpoint; }, chart: function chart(symbols, interval, callback) { let reconnect = function() { @@ -968,20 +951,22 @@ LIMIT_MAKER }); }; + let subscription = undefined; if ( Array.isArray(symbols) ) { if ( !isArrayUnique(symbols) ) throw Error('chart: "symbols" cannot contain duplicate elements.'); symbols.forEach(symbolChartInit); let streams = symbols.map(function(symbol) { return symbol.toLowerCase()+`@kline_`+interval; }); - subscribeCombined(streams, handleKlineStreamData, reconnect); + subscription = subscribeCombined(streams, handleKlineStreamData, reconnect); symbols.forEach(getSymbolKlineSnapshot); } else { let symbol = symbols; symbolChartInit(symbol); - subscribe(symbol.toLowerCase()+'@kline_'+interval, handleKlineStreamData, reconnect); + subscription = subscribe(symbol.toLowerCase()+'@kline_'+interval, handleKlineStreamData, reconnect); getSymbolKlineSnapshot(symbol); } + return subscription.endpoint; }, candlesticks: function candlesticks(symbols, interval, callback) { let reconnect = function() { @@ -990,45 +975,49 @@ LIMIT_MAKER // If an array of symbols are sent we use a combined stream connection rather. // This is transparent to the developer, and results in a single socket connection. // This essentially eliminates "unexpected response" errors when subscribing to a lot of data. + let subscription = undefined; if ( Array.isArray(symbols) ) { if ( !isArrayUnique(symbols) ) throw Error('candlesticks: "symbols" cannot contain duplicate elements.'); let streams = symbols.map(function (symbol) { return symbol.toLowerCase()+'@kline_'+interval; }); - subscribeCombined(streams, callback, reconnect); + subscription = subscribeCombined(streams, callback, reconnect); } else { let symbol = symbols.toLowerCase(); - subscribe(symbol+'@kline_'+interval, callback, reconnect); + subscription = subscribe(symbol+'@kline_'+interval, callback, reconnect); } + return subscription.endpoint; }, prevDay: function prevDay(symbols, callback) { let reconnect = function() { if ( options.reconnect ) prevDay(symbols, callback); }; + let subscription = undefined; // Combine stream for array of symbols if ( Array.isArray(symbols) ) { if ( !isArrayUnique(symbols) ) throw Error('prevDay: "symbols" cannot contain duplicate elements.'); let streams = symbols.map(function(symbol) { return symbol.toLowerCase()+'@ticker'; }); - subscribeCombined(streams, function(data) { + subscription = subscribeCombined(streams, function(data) { prevDayStreamHandler(data, callback); }, reconnect); // Raw stream for a single symbol } else if ( symbols ) { let symbol = symbols; - subscribe(symbol.toLowerCase()+'@ticker', function(data) { + subscription = subscribe(symbol.toLowerCase()+'@ticker', function(data) { prevDayStreamHandler(data, callback); }, reconnect); // Raw stream of all listed symbols } else { - subscribe('!ticker@arr', function(data) { + subscription = subscribe('!ticker@arr', function(data) { for ( let line of data ) { prevDayStreamHandler(line, callback); } }, reconnect); } + return subscription.endpoint; } } }; diff --git a/package.json b/package.json index 5ca9a4eb..316a3b61 100644 --- a/package.json +++ b/package.json @@ -6,7 +6,7 @@ "dependencies": { "request": "^2.83.0", "string-hash": "^1.1.3", - "ws": "3.2.0" + "ws": "^4.0.0" }, "scripts": { "test": "echo \"Error: no test specified\" && exit 1"