diff --git a/index.js b/index.js index beb8bc2d..4f5c43f5 100644 --- a/index.js +++ b/index.js @@ -143,8 +143,10 @@ module.exports = class Autobase extends ReadyResource { this._ackTimer = null this._acking = false + this._initialHeads = [] this._initialSystem = null this._initialViews = null + this._waiting = new SignalPromise() this.system = new SystemView(this._viewStore.get({ name: '_system', exclusive: true })) @@ -271,7 +273,7 @@ module.exports = class Autobase extends ReadyResource { this.local.setUserData('autobase/local', this.local.key) } - const { bootstrap, system } = await this._loadSystemInfo() + const { bootstrap, system, heads } = await this._loadSystemInfo() this.version = system ? system.version @@ -280,7 +282,9 @@ module.exports = class Autobase extends ReadyResource { : this.maxSupportedVersion this.bootstrap = bootstrap + this._initialSystem = system + this._initialHeads = heads await this._makeLinearizer(system) } @@ -288,14 +292,14 @@ module.exports = class Autobase extends ReadyResource { async _loadSystemInfo () { const pointer = await this.local.getUserData('autobase/boot') const bootstrap = this.bootstrap || (await this.local.getUserData('referrer')) || this.local.key - if (!pointer) return { bootstrap, system: null } + if (!pointer) return { bootstrap, system: null, heads: [] } - const { indexed, views } = c.decode(messages.BootRecord, pointer) + const { indexed, views, heads } = c.decode(messages.BootRecord, pointer) const { key, length } = indexed this._systemPointer = length - if (!length) return { bootstrap, system: null } + if (!length) return { bootstrap, system: null, heads: [] } const encryptionKey = AutoStore.getBlockKey(bootstrap, this.encryptionKey, '_system') const actualCore = this.store.get({ key, exclusive: false, compat: false, encryptionKey, isBlockKey: true }) @@ -308,7 +312,7 @@ module.exports = class Autobase extends ReadyResource { if (length === 0 || !(await core.has(length - 1))) { await this.local.setUserData('autobase/boot', null) this._systemPointer = 0 - return { bootstrap, system: null } + return { bootstrap, system: null, heads: [] } } const system = new SystemView(core, length) @@ -326,7 +330,8 @@ module.exports = class Autobase extends ReadyResource { return { bootstrap, - system + system, + heads } } @@ -469,6 +474,64 @@ module.exports = class Autobase extends ReadyResource { if (this.reindexing) this._setReindexed() this.queueFastForward() + + await this._catchup(this._initialHeads) + } + + async _catchup (nodes) { + if (!nodes.length) return + + const visited = new Set() + const writers = new Map() + + while (nodes.length) { + const { key, length } = nodes.pop() + + const hex = b4a.toString(key, 'hex') + const ref = hex + ':' + length + + if (visited.has(ref)) continue + visited.add(ref) + + let w = writers.get(hex) + if (!w) { + const writer = await this._getWriterByKey(key, -1, 0, true, false, null) + + w = { writer, end: writer.length } + + writers.set(hex, w) + } + + if (w.writer.length >= length) continue + + if (length > w.end) w.end = length + + const block = await w.writer.core.get(length - 1) + + for (const dep of block.node.heads) { + nodes.push(dep) + } + } + + while (writers.size) { + for (const [hex, info] of writers) { + const { writer, end } = info + + if (writer === null || writer.length === end) { + writers.delete(hex) + continue + } + + if (writer.available <= writer.length) await writer.update() + + const node = writer.advance() + if (!node) continue + + this.linearizer.addHead(node) + } + } + + await this._drain() // runs for one tick } _reindexersIdle () { @@ -518,7 +581,7 @@ module.exports = class Autobase extends ReadyResource { if (this._hasClose) await this._handlers.close(this.view) if (this._primaryBootstrap) await this._primaryBootstrap.close() - for (const w of this.activeWriters) await w.close() + await this.activeWriters.clear() await this.corePool.clear() await this.store.close() await closing @@ -685,8 +748,6 @@ module.exports = class Autobase extends ReadyResource { if (!this.opened) await this.ready() if (this.closing) throw new Error('Autobase is closing') - await this._advanced // ensure all local state has been applied, only needed until persistent batches - // if a reset is scheduled await those while (this._queueViewReset && !this.closing) await this._bump() @@ -694,12 +755,6 @@ module.exports = class Autobase extends ReadyResource { throw new Error('Not writable') } - // make sure all local nodes are processed before continuing - while (!this.closing && this.localWriter.core.length > this.localWriter.length) { - await this.localWriter.waitForSynced() - await this._bump() // make sure its all flushed... - } - if (this._appending === null) this._appending = [] if (Array.isArray(value)) { @@ -708,7 +763,11 @@ module.exports = class Autobase extends ReadyResource { this._append(value) } - await this._bump() + // await in case append is in current tick + if (this._advancing) await this._advancing + + // only bump if there are unflushed nodes + if (this._appending !== null) return this._bump() } _append (value) { @@ -1079,16 +1138,19 @@ module.exports = class Autobase extends ReadyResource { this._systemPointer = length - const cores = this._viewStore.getIndexedCores() - const views = new Array(cores.length - 1) - for (const core of cores) { - if (core.systemIndex === -1) continue - views[core.systemIndex] = core.name - } + const views = this._viewStore.indexedViewsByName() await this._setBootRecord(this.system.core.key, length, this.system.heads, views) } + async _updateBootRecordHeads (heads) { + if (this._systemPointer === 0) return // first tick + + const views = this._viewStore.indexedViewsByName() + + await this._setBootRecord(this.system.core.key, this._systemPointer, heads, views) + } + async _setBootRecord (key, length, heads, views) { const pointer = c.encode(messages.BootRecord, { indexed: { key, length }, @@ -1101,7 +1163,7 @@ module.exports = class Autobase extends ReadyResource { async _drain () { while (!this.closing) { - if (this.fastForwardTo !== null) { + if (this.opened && this.fastForwardTo !== null) { await this._applyFastForward() this.system.requestWakeup() } @@ -1111,8 +1173,8 @@ module.exports = class Autobase extends ReadyResource { await this._loadLocalWriter(this.system) } - const remoteAdded = await this._addRemoteHeads() - const localNodes = this._appending === null ? null : this._addLocalHeads() + const remoteAdded = this.opened ? await this._addRemoteHeads() : null + const localNodes = this.opened && this._appending !== null ? this._addLocalHeads() : null if (this._maybeStaticFastForward === true && this.fastForwardEnabled === true) await this._checkStaticFastForward() if (this.closing) return @@ -1127,6 +1189,8 @@ module.exports = class Autobase extends ReadyResource { if (this.closing) return + if (this.opened) await this._updateBootRecordHeads(this.system.heads) + if (this.localWriter !== null && localNodes !== null) { await this._flushLocal(localNodes) } @@ -1153,6 +1217,7 @@ module.exports = class Autobase extends ReadyResource { if (!changed) { if (this._checkWriters.length > 0) { await this._gcWriters() + if (!this.opened) break // at most one tick preready continue // rerun the update loop as a writer might have been added } if (remoteAdded >= REMOTE_ADD_BATCH) continue @@ -1244,7 +1309,7 @@ module.exports = class Autobase extends ReadyResource { this._waiting.notify(null) } - await this._gcWriters() + if (!this.closing) await this._gcWriters() } _ackIsNeeded () { diff --git a/lib/active-writers.js b/lib/active-writers.js index e5afe72c..a46e9e2c 100644 --- a/lib/active-writers.js +++ b/lib/active-writers.js @@ -28,4 +28,12 @@ module.exports = class ActiveWriters { delete (writer) { this.map.delete(b4a.toString(writer.core.key, 'hex')) } + + clear () { + const p = [] + for (const w of this.map.values()) p.push(w.close()) + this.map.clear() + + return Promise.all(p) + } } diff --git a/lib/store.js b/lib/store.js index dc2558df..f2610696 100644 --- a/lib/store.js +++ b/lib/store.js @@ -127,6 +127,18 @@ module.exports = class AutoStore { return cores } + indexedViewsByName () { + const views = [] + + for (let i = 0; i < this.base.system.views.length; i++) { + const core = this.getByIndex(i) + if (!core || !core.pendingIndexedLength) break + views.push(core.name) + } + + return views + } + async flush () { while (this.waiting.length) { const core = this.waiting.pop() diff --git a/test/helpers/index.js b/test/helpers/index.js index c4d85736..d8b3ebe4 100644 --- a/test/helpers/index.js +++ b/test/helpers/index.js @@ -78,9 +78,14 @@ function createBase (store, key, t, opts = {}) { base.maxSupportedVersion = opts.maxSupportedVersion } - t.teardown(async () => { - await base.close().catch(() => {}) - await base._viewStore.close().catch(() => {}) + t.teardown(() => { + return new Promise(resolve => { + const c = [] + c.push(base.close()) + setImmediate(() => c.push(base._viewStore.close())) + + Promise.all(c).then(resolve) + }) }, { order: 1 }) return base diff --git a/test/suspend.js b/test/suspend.js index c7eaf137..8cb94721 100644 --- a/test/suspend.js +++ b/test/suspend.js @@ -486,6 +486,10 @@ test('suspend - open new index after reopen', async t => { }) await b2.ready() + await b2.update() + + t.is(b2.view.first.length, b.view.first.length) + t.is(b2.view.second.length, b.view.second.length) for (let i = 0; i < b2.view.first.length; i++) { t.alike(await b2.view.first.get(i), order[i]) @@ -573,7 +577,6 @@ test('suspend - reopen multiple indexes', async t => { }) await b2.ready() - await b2.update() for (let i = 0; i < b2.view.first.length; i++) { @@ -953,6 +956,189 @@ test('suspend - recover from bad sys core', async t => { t.is(b1.system.core.length, len) }) +test('suspend - restart with unindexed nodes', async t => { + const { bases, stores } = await create(3, t, { storage: () => tmpDir(t) }) + + const [a, b, c] = bases + + await addWriterAndSync(a, b) + await replicateAndSync([a, b, c]) + + await addWriterAndSync(a, c, false) + await confirm([a, b, c]) + + // bigger than autobase max batch size + for (let i = 0; i < 100; i++) await b.append('b' + i) + + await replicateAndSync([b, c]) + + await c.close() + + const c1 = createBase(stores[2], null, t) + + await c1.ready() + + await c1.append('c0') + + await replicateAndSync([a, c1]) + + const exp = { key: b.local.key, length: b.local.length } + + const last = await c1.local.get(0) + t.alike(last.node.heads, [exp]) + + t.is(await a.view.get(a.view.length - 1), 'c0') +}) + +test('suspend - restart with indexed and unindexed nodes', async t => { + const { bases, stores } = await create(3, t, { storage: () => tmpDir(t) }) + + const [a, b, c] = bases + + await addWriterAndSync(a, b) + await replicateAndSync([a, b, c]) + + await addWriterAndSync(a, c, false) + await confirm([a, b, c]) + + // bigger than autobase max batch size + for (let i = 0; i < 100; i++) await b.append('b' + i) + + await confirm([a, b, c]) + + // bigger than autobase max batch size + for (let i = 100; i < 200; i++) await b.append('b' + i) + + await replicateAndSync([b, c]) + + await c.close() + + const c1 = createBase(stores[2], null, t) + + await c1.ready() + + await c1.append('c0') + + await replicateAndSync([a, c1]) + + const exp = { key: b.local.key, length: b.local.length } + + const last = await c1.local.get(0) + t.alike(last.node.heads, [exp]) + + t.is(await a.view.get(a.view.length - 1), 'c0') +}) + +test('suspend - restart with unindexed local nodes', async t => { + const { bases, stores } = await create(3, t, { storage: () => tmpDir(t) }) + + const [a, b, c] = bases + + await addWriterAndSync(a, b) + await replicateAndSync([a, b, c]) + + await addWriterAndSync(a, c, false) + await confirm([a, b, c]) + + // bigger than autobase max batch size + for (let i = 0; i < 100; i++) await c.append('c' + i) + + await replicateAndSync([b, c]) + + await c.close() + + const c1 = createBase(stores[2], null, t) + + await c1.append('c101') + + const exp = { key: c.local.key, length: c.local.length - 1 } + + const last = await c1.local.get(c1.local.length - 1) + t.alike(last.node.heads, [exp]) + + await replicateAndSync([a, c1]) + + t.is(await a.view.get(a.view.length - 1), 'c101') +}) + +test('suspend - restart with indexed and unindexed local nodes', async t => { + const { bases, stores } = await create(3, t, { storage: () => tmpDir(t) }) + + const [a, b, c] = bases + + await addWriterAndSync(a, b) + await replicateAndSync([a, b, c]) + + await addWriterAndSync(a, c, false) + await confirm([a, b, c]) + + // writer has indexed nodes + for (let i = 0; i < 100; i++) await c.append('c' + i) + + await confirm([a, b, c]) + + // bigger than autobase max batch size + for (let i = 100; i < 200; i++) await c.append('c' + i) + + await replicateAndSync([b, c]) + + await c.close() + + const c1 = createBase(stores[2], null, t) + + await c1.append('c101') + + const exp = { key: c.local.key, length: c.local.length - 1 } + + const last = await c1.local.get(c1.local.length - 1) + t.alike(last.node.heads, [exp]) + + await replicateAndSync([a, c1]) + + t.is(await a.view.get(a.view.length - 1), 'c101') +}) + +test('suspend - restart with crosslinked non-indexer nodes', async t => { + const { bases, stores } = await create(3, t, { storage: () => tmpDir(t) }) + + const [a, b, c] = bases + + await addWriterAndSync(a, b, false) + await addWriterAndSync(a, c, false) + + await replicateAndSync([a, b, c]) + + let n = 0 + + // writer has indexed nodes + for (let i = 0; i < 100; i++) await c.append('c' + n++) + + await confirm([a, b, c]) + + // bigger than autobase max batch size + for (let i = 0; i < 40; i++) await b.append('b' + n++) + await replicateAndSync([b, c]) + for (let i = 0; i < 40; i++) await c.append('c' + n++) + await replicateAndSync([b, c]) + for (let i = 0; i < 40; i++) await b.append('b' + n++) + await replicateAndSync([b, c]) + + await c.close() + + const c1 = createBase(stores[2], null, t) + + await c1.append('c' + n) + + const exp = { key: b.local.key, length: b.local.length } + + const last = await c1.local.get(c1.local.length - 1) + t.alike(last.node.heads, [exp]) + + await replicateAndSync([a, c1]) + + t.is(await a.view.get(a.view.length - 1), 'c' + n) +}) + function openMultiple (store) { return { first: store.get('first', { valueEncoding: 'json' }),