Skip to content

Commit

Permalink
Boot always loads heads in order (#144)
Browse files Browse the repository at this point in the history
* update heads on each drain cycle

* load heads backwards from last recorded heads

* no need to wait for bump in append anymore

* add test for appending before indexed nodes are flushed

* only track initial batch size and let addRemoteHeads run

* no longer need to await initial tick

* add test for unindexed local nodes

* clear active writers on close

* append may be applied in current tick

* review by @mafintosh

* add more tests for mixed indexed/unindexed nodes

* pop heads rather than shift

* always set boot record from mem state

* only read boot record in open

* always return heads from load system info

* catchup adds heads to linearizer directly

* gc writers after catchup

* only track start and end per writer

* let _getWriterByKey do system lookups

* add test for crosslinked non-indexer nodes

* catchup triggers drain to flush indexes

* stagger teardown timings
  • Loading branch information
chm-diederichs authored Jun 14, 2024
1 parent b0f2e6a commit bfbd46a
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 30 deletions.
117 changes: 91 additions & 26 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ module.exports = class Autobase extends ReadyResource {
this._ackTimer = null
this._acking = false

this._initialHeads = []
this._initialSystem = null
this._initialViews = null

this._waiting = new SignalPromise()

this.system = new SystemView(this._viewStore.get({ name: '_system', exclusive: true }))
Expand Down Expand Up @@ -271,7 +273,7 @@ module.exports = class Autobase extends ReadyResource {
this.local.setUserData('autobase/local', this.local.key)
}

const { bootstrap, system } = await this._loadSystemInfo()
const { bootstrap, system, heads } = await this._loadSystemInfo()

this.version = system
? system.version
Expand All @@ -280,22 +282,24 @@ module.exports = class Autobase extends ReadyResource {
: this.maxSupportedVersion

this.bootstrap = bootstrap

this._initialSystem = system
this._initialHeads = heads

await this._makeLinearizer(system)
}

async _loadSystemInfo () {
const pointer = await this.local.getUserData('autobase/boot')
const bootstrap = this.bootstrap || (await this.local.getUserData('referrer')) || this.local.key
if (!pointer) return { bootstrap, system: null }
if (!pointer) return { bootstrap, system: null, heads: [] }

const { indexed, views } = c.decode(messages.BootRecord, pointer)
const { indexed, views, heads } = c.decode(messages.BootRecord, pointer)
const { key, length } = indexed

this._systemPointer = length

if (!length) return { bootstrap, system: null }
if (!length) return { bootstrap, system: null, heads: [] }

const encryptionKey = AutoStore.getBlockKey(bootstrap, this.encryptionKey, '_system')
const actualCore = this.store.get({ key, exclusive: false, compat: false, encryptionKey, isBlockKey: true })
Expand All @@ -308,7 +312,7 @@ module.exports = class Autobase extends ReadyResource {
if (length === 0 || !(await core.has(length - 1))) {
await this.local.setUserData('autobase/boot', null)
this._systemPointer = 0
return { bootstrap, system: null }
return { bootstrap, system: null, heads: [] }
}

const system = new SystemView(core, length)
Expand All @@ -326,7 +330,8 @@ module.exports = class Autobase extends ReadyResource {

return {
bootstrap,
system
system,
heads
}
}

Expand Down Expand Up @@ -469,6 +474,64 @@ module.exports = class Autobase extends ReadyResource {
if (this.reindexing) this._setReindexed()

this.queueFastForward()

await this._catchup(this._initialHeads)
}

async _catchup (nodes) {
if (!nodes.length) return

const visited = new Set()
const writers = new Map()

while (nodes.length) {
const { key, length } = nodes.pop()

const hex = b4a.toString(key, 'hex')
const ref = hex + ':' + length

if (visited.has(ref)) continue
visited.add(ref)

let w = writers.get(hex)
if (!w) {
const writer = await this._getWriterByKey(key, -1, 0, true, false, null)

w = { writer, end: writer.length }

writers.set(hex, w)
}

if (w.writer.length >= length) continue

if (length > w.end) w.end = length

const block = await w.writer.core.get(length - 1)

for (const dep of block.node.heads) {
nodes.push(dep)
}
}

while (writers.size) {
for (const [hex, info] of writers) {
const { writer, end } = info

if (writer === null || writer.length === end) {
writers.delete(hex)
continue
}

if (writer.available <= writer.length) await writer.update()

const node = writer.advance()
if (!node) continue

this.linearizer.addHead(node)
}
}

await this._drain() // runs for one tick
}

_reindexersIdle () {
Expand Down Expand Up @@ -518,7 +581,7 @@ module.exports = class Autobase extends ReadyResource {

if (this._hasClose) await this._handlers.close(this.view)
if (this._primaryBootstrap) await this._primaryBootstrap.close()
for (const w of this.activeWriters) await w.close()
await this.activeWriters.clear()
await this.corePool.clear()
await this.store.close()
await closing
Expand Down Expand Up @@ -685,21 +748,13 @@ module.exports = class Autobase extends ReadyResource {
if (!this.opened) await this.ready()
if (this.closing) throw new Error('Autobase is closing')

await this._advanced // ensure all local state has been applied, only needed until persistent batches

// if a reset is scheduled await those
while (this._queueViewReset && !this.closing) await this._bump()

if (this.localWriter === null) {
throw new Error('Not writable')
}

// make sure all local nodes are processed before continuing
while (!this.closing && this.localWriter.core.length > this.localWriter.length) {
await this.localWriter.waitForSynced()
await this._bump() // make sure its all flushed...
}

if (this._appending === null) this._appending = []

if (Array.isArray(value)) {
Expand All @@ -708,7 +763,11 @@ module.exports = class Autobase extends ReadyResource {
this._append(value)
}

await this._bump()
// await in case append is in current tick
if (this._advancing) await this._advancing

// only bump if there are unflushed nodes
if (this._appending !== null) return this._bump()
}

_append (value) {
Expand Down Expand Up @@ -1079,16 +1138,19 @@ module.exports = class Autobase extends ReadyResource {

this._systemPointer = length

const cores = this._viewStore.getIndexedCores()
const views = new Array(cores.length - 1)
for (const core of cores) {
if (core.systemIndex === -1) continue
views[core.systemIndex] = core.name
}
const views = this._viewStore.indexedViewsByName()

await this._setBootRecord(this.system.core.key, length, this.system.heads, views)
}

async _updateBootRecordHeads (heads) {
if (this._systemPointer === 0) return // first tick

const views = this._viewStore.indexedViewsByName()

await this._setBootRecord(this.system.core.key, this._systemPointer, heads, views)
}

async _setBootRecord (key, length, heads, views) {
const pointer = c.encode(messages.BootRecord, {
indexed: { key, length },
Expand All @@ -1101,7 +1163,7 @@ module.exports = class Autobase extends ReadyResource {

async _drain () {
while (!this.closing) {
if (this.fastForwardTo !== null) {
if (this.opened && this.fastForwardTo !== null) {
await this._applyFastForward()
this.system.requestWakeup()
}
Expand All @@ -1111,8 +1173,8 @@ module.exports = class Autobase extends ReadyResource {
await this._loadLocalWriter(this.system)
}

const remoteAdded = await this._addRemoteHeads()
const localNodes = this._appending === null ? null : this._addLocalHeads()
const remoteAdded = this.opened ? await this._addRemoteHeads() : null
const localNodes = this.opened && this._appending !== null ? this._addLocalHeads() : null

if (this._maybeStaticFastForward === true && this.fastForwardEnabled === true) await this._checkStaticFastForward()
if (this.closing) return
Expand All @@ -1127,6 +1189,8 @@ module.exports = class Autobase extends ReadyResource {

if (this.closing) return

if (this.opened) await this._updateBootRecordHeads(this.system.heads)

if (this.localWriter !== null && localNodes !== null) {
await this._flushLocal(localNodes)
}
Expand All @@ -1153,6 +1217,7 @@ module.exports = class Autobase extends ReadyResource {
if (!changed) {
if (this._checkWriters.length > 0) {
await this._gcWriters()
if (!this.opened) break // at most one tick preready
continue // rerun the update loop as a writer might have been added
}
if (remoteAdded >= REMOTE_ADD_BATCH) continue
Expand Down Expand Up @@ -1244,7 +1309,7 @@ module.exports = class Autobase extends ReadyResource {
this._waiting.notify(null)
}

await this._gcWriters()
if (!this.closing) await this._gcWriters()
}

_ackIsNeeded () {
Expand Down
8 changes: 8 additions & 0 deletions lib/active-writers.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,12 @@ module.exports = class ActiveWriters {
delete (writer) {
this.map.delete(b4a.toString(writer.core.key, 'hex'))
}

clear () {
const p = []
for (const w of this.map.values()) p.push(w.close())
this.map.clear()

return Promise.all(p)
}
}
12 changes: 12 additions & 0 deletions lib/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@ module.exports = class AutoStore {
return cores
}

indexedViewsByName () {
const views = []

for (let i = 0; i < this.base.system.views.length; i++) {
const core = this.getByIndex(i)
if (!core || !core.pendingIndexedLength) break
views.push(core.name)
}

return views
}

async flush () {
while (this.waiting.length) {
const core = this.waiting.pop()
Expand Down
11 changes: 8 additions & 3 deletions test/helpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,14 @@ function createBase (store, key, t, opts = {}) {
base.maxSupportedVersion = opts.maxSupportedVersion
}

t.teardown(async () => {
await base.close().catch(() => {})
await base._viewStore.close().catch(() => {})
t.teardown(() => {
return new Promise(resolve => {
const c = []
c.push(base.close())
setImmediate(() => c.push(base._viewStore.close()))

Promise.all(c).then(resolve)
})
}, { order: 1 })

return base
Expand Down
Loading

0 comments on commit bfbd46a

Please sign in to comment.