-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmaster_client.js
224 lines (198 loc) · 7.72 KB
/
master_client.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
var net = require('net');
var redis = require('redis');
var logger = require('./logger.js');
var CommandQueue = require('./command_queue.js');
module.exports = MasterClient;
/**
* A redis cluster abstraction which is initialized as a connection to a master instance.
* If the connection fails, it queues writes and sends reads to a selected slave instance.
*
* @param {string} master_name The name given to the clister by the sentinels.
* @param {number} master_port The port of the master redis instance.
* @param {string} master_host The host of the master redis instance.
* @param {Array} slaves The array of configuration info for the slave instances.
* @param {number} timeout Milliseconds to hold queued requests while waiting to exit failsafe state. Requests will
* resolve with an error after the timeout finishes.
*/
function MasterClient( master_name, master_port, master_host, slaves, timeout ) {
if( !arguments.length ) return;
this.name = master_name;
this.failover_timeout = timeout || 5000;
onSync(master_port, master_host, slaves);
}
MasterClient.prototype.__proto__ = redis.RedisClient.prototype;
MasterClient.prototype.onSync = function(master_port, master_host, slaves){
if (master_host){
this.slaves = slaves;
var net_client = net.createConnection( master_port, master_host );
redis.RedisClient.call( this, net_client );
this.host = master_host;
this.port = master_port;
this.listen_for_errors();
if (this.cq)
this.consume_command_queue();
}else{
if (this.cq)
this.emit('errorConfig');
else
this.generate_command_queue();
}
}
MasterClient.prototype.listen_for_errors = function() {
if (this._events && this._events.error && this._events.error.length) return;
this.on('error', function(e) {
logger.error(e.message);
this.enter_failsafe_state();
// send any commands waiting in the offline_queue
this.send_offline_queue();
});
}
MasterClient.prototype.enter_failsafe_state = function(next) {
next = next || none;
if (this.cq) return next();
logger.warn('Master client: '+this.name+' entering failsafe');
this.generate_command_queue();
this.connect_to_valid_slave(0, next);
this.emit('+failsafe');
};
MasterClient.prototype.exit_failsafe_state = function(new_master_config) {
if (!this.cq) return;
delete this.failsafe_state;
logger.info('Master client '+this.name+' exiting failsafe');
this.connect_to_redis_instance( new_master_config.port, new_master_config.host );
this.once( 'connect', function() {
logger.info(
'Master client: '+this.name+' connected to '+new_master_config.host+':'+new_master_config.port
);
this.consume_command_queue();
this.emit('-failsafe');
});
};
MasterClient.prototype.connect_to_valid_slave = function(num_trials, next) {
this.failsafe_state = 'rw';
// arbitrary upper bound chosen to account for slaves being altered while fallback is taking place
if (num_trials < this.slaves.length + 3) {
this._connect_to_slave(num_trials, next);
} else {
next();
}
};
MasterClient.prototype._connect_to_slave = function (num_trials, next) {
var slave_index = num_trials % (this.slaves.length || 1);
var slave = this.slaves[slave_index];
if(!slave) return this.connect_to_valid_slave(num_trials + 1, next);
this.connect_to_redis_instance( slave.port, slave.ip );
this.once('error', on_error);
this.once('connect', on_connect);
function on_error() {
this.on_slave_error(slave, num_trials, next);
}
function on_connect() {
this.on_slave_connect(slave, num_trials, next);
}
};
MasterClient.prototype.on_slave_error = function (slave, num_trials, next) {
logger.warn('Master client: '+this.name+' failed to connect to slave: '+slave.ip+':'+slave.port);
this.removeAllListeners('connect');
this.connect_to_valid_slave(num_trials + 1, next);
}
MasterClient.prototype.on_slave_connect = function (slave, num_trials, next) {
logger.info('Master client: '+this.name+' connected to slave: '+slave.ip+':'+slave.port);
this.removeAllListeners('error');
this.listen_for_errors();
this.failsafe_state = 'w';
this.cq.exec_reads();
next();
}
/**
* Reconnect the MasterClient to the specified port/host combo.
*
* Deletes the old stream object and creates a new one, reassigning all necessary
* events to it.
*
* @param {number} port
* @param {string} host
*/
MasterClient.prototype.connect_to_redis_instance = function( port, host ) {
var self = this;
//this.options = {no_ready_check: true};
this.stream.removeAllListeners();
this.port = port;
this.host = host;
this.ready = false;
this.connected = false;
this.stream = net.createConnection(port, host);
// add handles to the new stream necessary to reconnect
this.stream.on("connect", this.on_connect.bind(this));
this.stream.on("data", this.on_data.bind(this));
this.stream.on("error", this.on_error.bind(this));
this.stream.on("close", this.connection_gone.bind(this, "close"));
this.stream.on("end", this.connection_gone.bind(this, "end"));
this.stream.on("drain", function () {
self.should_buffer = false;
self.emit("drain");
});
};
// Store the original send_command
MasterClient.prototype.super_send_command = redis.RedisClient.prototype.send_command;
// Create a new send_command with middleware to handle failures
MasterClient.prototype.send_command = function(command, args, next, trials) {
trials = trials || 0; // track number of retries
if (typeof args[args.length-1] == 'function') next = args.pop();
else if (typeof next != 'function') next = function(){};
// check failsafe_state instead of cq. It reads better
if (this.cq) this.send_command_with_failsafe(command, args, next, trials);
else this.send_command_without_failsafe(command, args, next, trials);
};
/**
* Route the command to the command queue or a slave.
*/
MasterClient.prototype.send_command_with_failsafe = function(command, args, next) {
var self = this;
if (is_write_command(command) || this.failsafe_state == 'rw')
return this.cq[command](args, next);
this.super_send_command(command, args, function(error, response) {
if (error) self.connect_to_valid_slave( 0, function() {
self.send_command(command, args, next);
});
else next(error, response);
});
};
/**
* Route command to super with appropriate middleware to handle failures.
*/
MasterClient.prototype.send_command_without_failsafe = function(command, args, next, trials) {
var self = this;
this.super_send_command(command, args, function( error, response ) {
// TODO: how many trials will we allow before deciding the process has failed beyond recovery?
if (error && trials < 1) {
return self.enter_failsafe_state( function() {
self.send_command(command, args, next);
});
}
else if (error) return self.send_command(command, args, next, trials + 1);
next(error, response);
});
};
/**
* Creates a new command queue. Overridden for mocking in test.
*/
MasterClient.prototype.generate_command_queue = function() {
this.cq = new CommandQueue(this);
};
/**
* Delete the command queue and consume queued commands
*/
MasterClient.prototype.consume_command_queue = function() {
var cq = this.cq;
delete this.cq;
cq.exec();
};
function none(){}
/**
* Returns true if the command would perform a write to redis
*/
// TODO: use the command queue version of this instead
function is_write_command(command) {
return /(pop)|(set)|(del)/i.test(command);
}