-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconnectionFactory.ts
205 lines (163 loc) · 6.03 KB
/
connectionFactory.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import { currentEnvironment, isLocal, isRemote, log, Metrics } from '@paradoxical-io/common-server';
import { Pool } from 'mysql';
import { Connection, createConnection } from 'typeorm';
import { MysqlDriver } from 'typeorm/driver/mysql/MysqlDriver';
import { LoggerOptions } from 'typeorm/logger/LoggerOptions';
import { CrudBase } from './crudBase';
import { AWS_RDS_PROFILE } from './ssl/profiles';
/**
* Public configurations for MySQL
*/
export interface MySQLOptions {
hostname: string;
port: number;
username: string;
password: string;
database?: string;
connectionCount?: number;
connectionName?: string;
}
/**
* PrivateOptions are internal options configured for the factory that consumers
* cannot configure
*/
interface PrivateOptions {
useSSL?: boolean;
}
/**
* All Options are the public ones and the private ones together
*/
type AllMySQLOptions = MySQLOptions & PrivateOptions;
export class ConnectionFactory {
/**
* Takes a set of model constructors. These must be class constructor references that subclass CrudBase
* @param entities
*/
constructor(private entities?: Array<{ new (): CrudBase }>) {}
async mysql(options: MySQLOptions): Promise<Connection> {
const defaultConnectionSize = currentEnvironment() === 'prod' ? 20 : 5;
const envVar = process.env.PARADOX_MYSQL_CONNECTION_POOL_SIZE;
const overrideSize = envVar && Number.isInteger(envVar) ? Number.parseInt(envVar, 10) : undefined;
const defaults: Partial<AllMySQLOptions> = {
connectionCount: overrideSize ?? defaultConnectionSize,
useSSL: currentEnvironment() !== 'local',
};
const opts: AllMySQLOptions = Object.assign({}, defaults, options);
const dbLog = log.with({
host: `${opts.hostname}:${opts.port}`,
database: opts.database,
});
dbLog
.with({
username: opts.username,
useSSL: opts.useSSL,
connectionCount: opts.connectionCount,
name: options.connectionName,
})
.info(`Creating mysql connection pool`);
let loggingOptions: LoggerOptions;
if (log.isQuiet()) {
loggingOptions = false;
} else if (isLocal && process.env.PARADOX_MYSQL_LOGGING_LEVEL) {
loggingOptions = process.env.PARADOX_MYSQL_LOGGING_LEVEL as LoggerOptions;
} else {
loggingOptions = ['warn', 'info', 'log'];
}
const isReadReplica = opts.hostname.includes('read-replica');
const sync =
!isReadReplica &&
((currentEnvironment() !== 'prod' && isRemote) || (currentEnvironment() === 'local' && isLocal));
if (sync) {
dbLog.info('Database synchronize is allowed!');
} else {
dbLog.info('Database synchronize is not allowed!');
}
if (isReadReplica) {
dbLog.info('DB url is a read replica ');
}
// https://github.com/mysqljs/mysql#ssl-options
let sslOptions: { rejectUnauthorized: false } | typeof AWS_RDS_PROFILE | undefined;
if (opts.useSSL) {
if (opts.hostname?.includes('proxy')) {
sslOptions = { rejectUnauthorized: false };
} else {
sslOptions = AWS_RDS_PROFILE;
}
}
try {
const conn = await createConnection({
type: 'mysql',
host: opts.hostname,
port: opts.port,
username: opts.username,
password: opts.password,
database: opts.database,
/**
* Support bigints
*/
supportBigNumbers: true,
logging: loggingOptions,
charset: 'utf8mb4',
synchronize: sync,
timezone: 'Z',
name: opts.connectionName,
ssl: sslOptions,
entities: this.entities,
// log slow queries that take 2 seconds
maxQueryExecutionTime: 2000,
// these get passed to the underlying driver
// https://github.com/mysqljs/mysql#pooling-connections
extra: {
connectionLimit: opts.connectionCount,
},
});
if (conn.driver instanceof MysqlDriver) {
const tags = { schema: opts.database || 'unknown' };
const pool: Pool = (conn.driver as MysqlDriver).pool;
const idMap: { [id: string]: number } = {};
// attach listeners on the mysql driver for metrics on pool activity
pool.on('acquire', conn => {
if (conn.threadId) {
idMap[conn.threadId] = new Date().getTime();
}
Metrics.instance.increment('mysql.connection.active', tags);
});
pool.on('release', conn => {
Metrics.instance.increment('mysql.connection.active', -1, tags);
if (conn.threadId && idMap[conn.threadId]) {
Metrics.instance.timing('mysql.connection.active_time', new Date().getTime() - idMap[conn.threadId], tags);
delete idMap[conn.threadId];
}
});
pool.on('enqueue', () => Metrics.instance.increment('mysql.connection.enqueue', tags));
pool.on('connection', () => Metrics.instance.increment('mysql.connection.new_connection', tags));
pool.on('error', () => Metrics.instance.increment('mysql.connection.error', tags));
}
return conn;
} catch (err) {
const message = `Error while creating connection to ${opts.username}@${opts.hostname}`;
dbLog.error(message, err);
throw new Error(message);
}
}
async sqlite(synchronize = true): Promise<Connection> {
const id = Math.random().toString();
const name = process.env.JEST_TEST ? `${expect.getState().currentTestName}_${new Date().getTime()}_${id}` : id;
const path = `${process.cwd()}/.db/runs/${name}.db`;
// if the env var is set will dump the sqlite db to disk, otherwise will use it in memory
const dbName = process.env.PARADOX_DEBUG_SQLITE_DB ? path : ':memory:';
const conn = await createConnection({
type: 'sqlite',
name: id,
database: dbName,
logging: process.env.PARADOX_SQLITE_LOGGING === undefined ? ['warn', 'info', 'log'] : 'all',
synchronize: false,
maxQueryExecutionTime: 1000,
entities: this.entities,
});
if (synchronize) {
await conn.synchronize();
}
return conn;
}
}