diff --git a/index.js b/index.js index b59d7f68..347da175 100644 --- a/index.js +++ b/index.js @@ -500,6 +500,9 @@ module.exports = class Autobase extends ReadyResource { async append (value) { if (!this.opened) await this.ready() + + if (this.closing) throw new Error('Autobase is closing') + await this._advanced // ensure all local state has been applied, only needed until persistent batches // if a reset is scheduled await those @@ -759,11 +762,7 @@ module.exports = class Autobase extends ReadyResource { } _onUpgrade (version) { - if (version > this.maxSupportedVersion) { - this._onError(new Error('Autobase upgrade required')) - return false - } - return true + if (version > this.maxSupportedVersion) throw new Error('Autobase upgrade required') } _addLocalHeads () { @@ -1436,7 +1435,7 @@ module.exports = class Autobase extends ReadyResource { // autobase version was bumped let upgraded = false if (update.version > this.version) { - if (!this._onUpgrade(update.version)) return // failed + this._onUpgrade(update.version) // throws if not supported upgraded = true } @@ -1501,12 +1500,7 @@ module.exports = class Autobase extends ReadyResource { if (this.system.bootstrapping) await this._bootstrap() if (applyBatch.length && this._hasApply === true) { - try { - await this._handlers.apply(applyBatch, this.view, this) - } catch (err) { - this._onError(err) - return null - } + await this._handlers.apply(applyBatch, this.view, this) } update.indexers = !!this.system.indexerUpdate @@ -1531,7 +1525,7 @@ module.exports = class Autobase extends ReadyResource { // autobase version was bumped let upgraded = false if (update.version > this.version) { - if (!this._onUpgrade(update.version)) return // failed + this._onUpgrade(update.version) // throws if not supported upgraded = true } diff --git a/test/upgrade.js b/test/upgrade.js index f87ebf25..2afc5fae 100644 --- a/test/upgrade.js +++ b/test/upgrade.js @@ -20,6 +20,7 @@ test('upgrade - do not proceed', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -29,6 +30,7 @@ test('upgrade - do not proceed', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -49,6 +51,7 @@ test('upgrade - do not proceed', async t => { apply: applyv1, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -58,11 +61,7 @@ test('upgrade - do not proceed', async t => { await a1.append({ version: 1, data: '3' }) - const error = new Promise((resolve, reject) => b0.on('error', reject)) - - replicateAndSync([a1, b0]) - - await t.exception(error, /Upgrade required/) + await t.exception(replicateAndSync([a1, b0]), /Upgrade required/) t.is(a1.view.data.indexedLength, 4) t.is(b0.view.data.indexedLength, 3) @@ -75,6 +74,7 @@ test('upgrade - proceed', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -84,6 +84,7 @@ test('upgrade - proceed', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -104,6 +105,7 @@ test('upgrade - proceed', async t => { apply: applyv1, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -113,11 +115,7 @@ test('upgrade - proceed', async t => { await a1.append({ version: 1, data: '3' }) - const error = new Promise((resolve, reject) => b0.on('error', reject)) - - replicateAndSync([a1, b0]) - - await t.exception(error, /Upgrade required/) + await t.exception(replicateAndSync([a1, b0]), /Upgrade required/) t.is(a1.view.data.indexedLength, 4) t.is(b0.view.data.indexedLength, 3) @@ -128,6 +126,7 @@ test('upgrade - proceed', async t => { apply: applyv1, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -144,6 +143,7 @@ test('upgrade - consensus', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -153,6 +153,7 @@ test('upgrade - consensus', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -186,17 +187,13 @@ test('upgrade - consensus', async t => { await a1.append({ version: 1, data: '3' }) - const error = new Promise((resolve, reject) => b0.on('error', reject)) - - replicateAndSync([a1, b0]) - t.is(a1.view.data.indexedLength, 3) t.is(b0.view.data.indexedLength, 3) t.is(a1.view.data.length, 4) t.is(b0.view.data.length, 3) - await t.exception(error, /Upgrade required/) + await t.exception(replicateAndSync([a1, b0]), /Upgrade required/) t.is(b0.view.data.indexedLength, 3) // should not advance @@ -206,6 +203,7 @@ test('upgrade - consensus', async t => { apply: applyv1, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -223,6 +221,7 @@ test('upgrade - consensus 3 writers', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -232,6 +231,7 @@ test('upgrade - consensus 3 writers', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -241,6 +241,7 @@ test('upgrade - consensus 3 writers', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -266,6 +267,7 @@ test('upgrade - consensus 3 writers', async t => { apply: applyv1, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -276,41 +278,25 @@ test('upgrade - consensus 3 writers', async t => { await b0.append({ version: 0, data: '4' }) await replicateAndSync([a1, b0]) - const berror = new Promise((resolve, reject) => b0.once('error', reject)) - const cerror = new Promise((resolve, reject) => c0.once('error', reject)) - await a1.append({ version: 1, data: '5' }) - await replicateAndSync([a1, b0, c0]) + + // error should throw on apply + await t.exception(replicateAndSync([a1, b0]), /Upgrade required/) + await t.exception(replicateAndSync([a1, c0]), /Upgrade required/) t.is(a1.view.data.indexedLength, 3) t.is(b0.view.data.indexedLength, 3) t.is(c0.view.data.indexedLength, 3) - await b0.append(null) - await replicateAndSync([b0, c0]) - await c0.append(null) - await replicateAndSync([b0, c0]) - await b0.append(null) - await replicateAndSync([a1, b0, c0]) + await t.exception(b0.append(null)) + await t.exception(c0.append(null)) t.is(a1.view.data.indexedLength, 3) t.is(b0.view.data.indexedLength, 3) t.is(c0.view.data.indexedLength, 3) - await c0.append(null) - await replicateAndSync([a1, b0, c0]) - await b0.append(null) - await replicateAndSync([a1, b0, c0]) - t.is(await b0.view.version.get(b0.view.version.indexedLength - 1), 0) - await a1.append(null) - replicateAndSync([a1, b0, c0]) - - await t.exception(berror, /Upgrade required/) - await t.exception(cerror, /Upgrade required/) - - t.is(b0.view.data.indexedLength, 3) // should not advance t.is(b0.view.data.length, 4) await b0.close() @@ -319,6 +305,7 @@ test('upgrade - consensus 3 writers', async t => { apply: applyv1, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -341,6 +328,7 @@ test('upgrade - writer cannot append while behind', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -350,6 +338,7 @@ test('upgrade - writer cannot append while behind', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -359,6 +348,7 @@ test('upgrade - writer cannot append while behind', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -384,6 +374,7 @@ test('upgrade - writer cannot append while behind', async t => { apply: applyv1, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -395,6 +386,7 @@ test('upgrade - writer cannot append while behind', async t => { apply: applyv1, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -406,14 +398,10 @@ test('upgrade - writer cannot append while behind', async t => { t.is(a1.view.data.indexedLength, 4) t.is(b1.view.data.indexedLength, 4) - const error = new Promise((resolve, reject) => c0.on('error', reject)) - - replicateAndSync([a1, c0]) - - await t.exception(error, /Upgrade required/) + await t.exception(replicateAndSync([a1, c0]), /Upgrade required/) const len = c0.local.length - await c0.append({ version: 0, data: '5' }) + await t.exception(c0.append({ version: 0, data: '5' })) t.is(c0.local.length, len) // did not append t.is(c0.view.data.indexedLength, 3) @@ -436,6 +424,7 @@ test('upgrade - onindex hook', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -445,6 +434,7 @@ test('upgrade - onindex hook', async t => { apply: applyv0, open, encryptionKey, + ackInterval: 0, onindex: async () => { const view = b0.view.version if (!view.indexedLength) return @@ -470,6 +460,7 @@ test('upgrade - onindex hook', async t => { apply: applyv1, open, encryptionKey, + ackInterval: 0, onindex: async () => { const view = a1.view.version if (!view.indexedLength) return @@ -484,11 +475,7 @@ test('upgrade - onindex hook', async t => { await a1.append({ version: 1, data: '3' }) - const error = new Promise((resolve, reject) => b0.on('error', reject)) - - replicateAndSync([a1, b0]) - - await t.exception(error, /Upgrade required/) + await t.exception(replicateAndSync([a1, b0]), /Upgrade required/) t.is(a1.view.data.indexedLength, 4) t.is(b0.view.data.indexedLength, 3) @@ -502,6 +489,7 @@ test('upgrade - onindex hook', async t => { apply: applyv1, open, encryptionKey, + ackInterval: 0, onindex: async () => { const view = b1.view.version if (!view.indexedLength) return @@ -523,6 +511,7 @@ test('autobase upgrade - do not proceed', async t => { apply, open: store => store.get('view', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -534,6 +523,7 @@ test('autobase upgrade - do not proceed', async t => { apply, open: store => store.get('view', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -554,6 +544,7 @@ test('autobase upgrade - do not proceed', async t => { apply, open: store => store.get('view', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -566,11 +557,7 @@ test('autobase upgrade - do not proceed', async t => { await a1.append({ data: '3' }) - const error = new Promise((resolve, reject) => b0.on('error', reject)) - - replicateAndSync([a1, b0]) - - await t.exception(error, /Autobase upgrade required/) + await t.exception(replicateAndSync([a1, b0]), /Autobase upgrade required/) t.is(a1.view.indexedLength, 4) t.is(b0.view.indexedLength, 3) @@ -583,6 +570,7 @@ test('autobase upgrade - proceed', async t => { apply, open: store => store.get('view', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -594,6 +582,7 @@ test('autobase upgrade - proceed', async t => { apply, open: store => store.get('view', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -614,6 +603,7 @@ test('autobase upgrade - proceed', async t => { apply, open: store => store.get('view', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -626,11 +616,7 @@ test('autobase upgrade - proceed', async t => { await a1.append({ data: '3' }) - const error = new Promise((resolve, reject) => b0.on('error', reject)) - - replicateAndSync([a1, b0]) - - await t.exception(error, /Autobase upgrade required/) + await t.exception(replicateAndSync([a1, b0]), /Autobase upgrade required/) t.is(a1.view.indexedLength, 4) t.is(b0.view.indexedLength, 3) @@ -641,6 +627,7 @@ test('autobase upgrade - proceed', async t => { apply, open: store => store.get('view', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -660,6 +647,7 @@ test('autobase upgrade - consensus', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -671,6 +659,7 @@ test('autobase upgrade - consensus', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -695,6 +684,7 @@ test('autobase upgrade - consensus', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -723,6 +713,7 @@ test('autobase upgrade - consensus', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -744,6 +735,7 @@ test('autobase upgrade - consensus 3 writers', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -755,6 +747,7 @@ test('autobase upgrade - consensus 3 writers', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -764,6 +757,7 @@ test('autobase upgrade - consensus 3 writers', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -790,6 +784,7 @@ test('autobase upgrade - consensus 3 writers', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -797,6 +792,7 @@ test('autobase upgrade - consensus 3 writers', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -810,14 +806,10 @@ test('autobase upgrade - consensus 3 writers', async t => { await a1.append({ data: '5' }) await c1.append({ data: '6' }) - const berror = new Promise((resolve, reject) => b0.once('error', reject)) - - confirm([a1, b0, c1]) - - await t.exception(berror, /Autobase upgrade required/) + await t.exception(confirm([a1, b0, c1]), /Autobase upgrade required/) t.is((await b0.system.getIndexedInfo()).version, version) - t.ok(b0.closed) + t.ok(b0.closing) t.is(b0.view.indexedLength, 3) // should not advance @@ -827,6 +819,7 @@ test('autobase upgrade - consensus 3 writers', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -852,6 +845,7 @@ test('autobase upgrade - downgrade', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -869,6 +863,7 @@ test('autobase upgrade - downgrade', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -894,6 +889,7 @@ test('autobase upgrade - downgrade', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -909,6 +905,7 @@ test('autobase upgrade - downgrade then restart', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -926,6 +923,7 @@ test('autobase upgrade - downgrade then restart', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -950,6 +948,7 @@ test('autobase upgrade - downgrade then restart', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -962,6 +961,7 @@ test('autobase upgrade - downgrade then restart', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -981,6 +981,7 @@ test('autobase upgrade - downgrade then restart', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -1004,6 +1005,7 @@ test('autobase upgrade - upgrade before writer joins', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' } @@ -1035,6 +1037,7 @@ test('autobase upgrade - fix borked version', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' } @@ -1099,11 +1102,11 @@ test('autobase upgrade - fix borked version', async t => { const aerr = new Promise((resolve, reject) => a1.once('error', reject)) const berr = new Promise((resolve, reject) => b1.once('error', reject)) - a1.append('three', /Block/) - b1.append('three', /Block/) + a1.append('three') + b1.append('three') - await t.exception(aerr) - await t.exception(berr) + await t.exception(aerr, /Block/) + await t.exception(berr, /Block/) await a1.close() await b1.close() @@ -1139,6 +1142,7 @@ test('autobase upgrade - downgrade then fix bork', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' } @@ -1368,6 +1372,7 @@ test('autobase upgrade - non monotonic version', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -1379,6 +1384,7 @@ test('autobase upgrade - non monotonic version', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -1395,6 +1401,7 @@ test('autobase upgrade - non monotonic version', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -1418,6 +1425,7 @@ test('autobase upgrade - non monotonic version', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' }) @@ -1429,6 +1437,7 @@ test('autobase upgrade - non monotonic version', async t => { apply, open: store => store.get('test', { valueEncoding: 'json' }), encryptionKey, + ackInterval: 0, valueEncoding: 'json' })