Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Boot always loads heads in order #144

Merged
merged 33 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
d3f8c26
update heads on each drain cycle
chm-diederichs Jun 6, 2024
3173a10
load heads backwards from last recorded heads
chm-diederichs Jun 6, 2024
60472e3
no need to wait for bump in append anymore
chm-diederichs Jun 6, 2024
53ac11b
add test for appending before indexed nodes are flushed
chm-diederichs Jun 11, 2024
5f02837
only track initial batch size and let addRemoteHeads run
chm-diederichs Jun 11, 2024
1b34d0f
no longer need to await initial tick
chm-diederichs Jun 11, 2024
33cf6c4
add test for unindexed local nodes
chm-diederichs Jun 11, 2024
1cb04ff
clear active writers on close
chm-diederichs Jun 11, 2024
c695c4a
append may be applied in current tick
chm-diederichs Jun 11, 2024
ab3aa4f
no need to set state
chm-diederichs Jun 11, 2024
e110a62
review by @mafintosh
chm-diederichs Jun 11, 2024
47610cc
fix bad argument
chm-diederichs Jun 12, 2024
8c71ace
add more tests for mixed indexed/unindexed nodes
chm-diederichs Jun 12, 2024
edc4b74
pop heads rather than shift
chm-diederichs Jun 12, 2024
6965295
rename for clarity
chm-diederichs Jun 12, 2024
c3c5b41
explicit check
chm-diederichs Jun 12, 2024
aac5e0f
always set boot record from mem state
chm-diederichs Jun 12, 2024
b9c41ec
only read boot record in open
chm-diederichs Jun 12, 2024
3064bc9
always return heads from load system info
chm-diederichs Jun 12, 2024
c65ab22
catchup adds heads to linearizer directly
chm-diederichs Jun 13, 2024
496b14c
gc writers after catchup
chm-diederichs Jun 13, 2024
2cae1f0
only track start and end per writer
chm-diederichs Jun 13, 2024
76e9752
no longer use tip
chm-diederichs Jun 13, 2024
8ca813b
verify view lengths
chm-diederichs Jun 13, 2024
d11dc82
test should await first update
chm-diederichs Jun 13, 2024
9368595
let _getWriterByKey do system lookups
chm-diederichs Jun 13, 2024
4945b55
add test for crosslinked non-indexer nodes
chm-diederichs Jun 13, 2024
f00122a
condition should be >=
chm-diederichs Jun 13, 2024
0631593
catchup triggers drain to flush indexes
chm-diederichs Jun 13, 2024
35e9c5d
close base and store concurrently
chm-diederichs Jun 13, 2024
08415e9
stagger teardown timings
chm-diederichs Jun 14, 2024
94429b6
ready resource allows close if open throws
chm-diederichs Jun 14, 2024
bd6ab3d
teardown still needs to be staggered
chm-diederichs Jun 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 91 additions & 26 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,10 @@ module.exports = class Autobase extends ReadyResource {
this._ackTimer = null
this._acking = false

this._initialHeads = []
this._initialSystem = null
this._initialViews = null

this._waiting = new SignalPromise()

this.system = new SystemView(this._viewStore.get({ name: '_system', exclusive: true }))
Expand Down Expand Up @@ -271,7 +273,7 @@ module.exports = class Autobase extends ReadyResource {
this.local.setUserData('autobase/local', this.local.key)
}

const { bootstrap, system } = await this._loadSystemInfo()
const { bootstrap, system, heads } = await this._loadSystemInfo()

this.version = system
? system.version
Expand All @@ -280,22 +282,24 @@ module.exports = class Autobase extends ReadyResource {
: this.maxSupportedVersion

this.bootstrap = bootstrap

this._initialSystem = system
this._initialHeads = heads

await this._makeLinearizer(system)
}

async _loadSystemInfo () {
const pointer = await this.local.getUserData('autobase/boot')
const bootstrap = this.bootstrap || (await this.local.getUserData('referrer')) || this.local.key
if (!pointer) return { bootstrap, system: null }
if (!pointer) return { bootstrap, system: null, heads: [] }

const { indexed, views } = c.decode(messages.BootRecord, pointer)
const { indexed, views, heads } = c.decode(messages.BootRecord, pointer)
const { key, length } = indexed

this._systemPointer = length

if (!length) return { bootstrap, system: null }
if (!length) return { bootstrap, system: null, heads: [] }

const encryptionKey = AutoStore.getBlockKey(bootstrap, this.encryptionKey, '_system')
const actualCore = this.store.get({ key, exclusive: false, compat: false, encryptionKey, isBlockKey: true })
Expand All @@ -308,7 +312,7 @@ module.exports = class Autobase extends ReadyResource {
if (length === 0 || !(await core.has(length - 1))) {
await this.local.setUserData('autobase/boot', null)
this._systemPointer = 0
return { bootstrap, system: null }
return { bootstrap, system: null, heads: [] }
}

const system = new SystemView(core, length)
Expand All @@ -326,7 +330,8 @@ module.exports = class Autobase extends ReadyResource {

return {
bootstrap,
system
system,
heads
}
}

Expand Down Expand Up @@ -469,6 +474,64 @@ module.exports = class Autobase extends ReadyResource {
if (this.reindexing) this._setReindexed()

this.queueFastForward()

await this._catchup(this._initialHeads)
}

async _catchup (nodes) {
if (!nodes.length) return

const visited = new Set()
const writers = new Map()

while (nodes.length) {
const { key, length } = nodes.pop()

const hex = b4a.toString(key, 'hex')
const ref = hex + ':' + length

if (visited.has(ref)) continue
visited.add(ref)

let w = writers.get(hex)
if (!w) {
const writer = await this._getWriterByKey(key, -1, 0, true, false, null)

w = { writer, end: writer.length }

writers.set(hex, w)
}

if (w.writer.length >= length) continue

if (length > w.end) w.end = length

const block = await w.writer.core.get(length - 1)

for (const dep of block.node.heads) {
nodes.push(dep)
}
}

while (writers.size) {
for (const [hex, info] of writers) {
const { writer, end } = info

if (writer === null || writer.length === end) {
writers.delete(hex)
continue
}

if (writer.available <= writer.length) await writer.update()

const node = writer.advance()
if (!node) continue

this.linearizer.addHead(node)
}
mafintosh marked this conversation as resolved.
Show resolved Hide resolved
}

await this._drain() // runs for one tick
}

_reindexersIdle () {
Expand Down Expand Up @@ -518,7 +581,7 @@ module.exports = class Autobase extends ReadyResource {

if (this._hasClose) await this._handlers.close(this.view)
if (this._primaryBootstrap) await this._primaryBootstrap.close()
for (const w of this.activeWriters) await w.close()
await this.activeWriters.clear()
await this.corePool.clear()
await this.store.close()
await closing
Expand Down Expand Up @@ -685,21 +748,13 @@ module.exports = class Autobase extends ReadyResource {
if (!this.opened) await this.ready()
if (this.closing) throw new Error('Autobase is closing')

await this._advanced // ensure all local state has been applied, only needed until persistent batches

// if a reset is scheduled await those
while (this._queueViewReset && !this.closing) await this._bump()

if (this.localWriter === null) {
throw new Error('Not writable')
}

// make sure all local nodes are processed before continuing
while (!this.closing && this.localWriter.core.length > this.localWriter.length) {
await this.localWriter.waitForSynced()
await this._bump() // make sure its all flushed...
}

if (this._appending === null) this._appending = []

if (Array.isArray(value)) {
Expand All @@ -708,7 +763,11 @@ module.exports = class Autobase extends ReadyResource {
this._append(value)
}

await this._bump()
// await in case append is in current tick
if (this._advancing) await this._advancing

// only bump if there are unflushed nodes
if (this._appending !== null) return this._bump()
}

_append (value) {
Expand Down Expand Up @@ -1079,16 +1138,19 @@ module.exports = class Autobase extends ReadyResource {

this._systemPointer = length

const cores = this._viewStore.getIndexedCores()
const views = new Array(cores.length - 1)
for (const core of cores) {
if (core.systemIndex === -1) continue
views[core.systemIndex] = core.name
}
const views = this._viewStore.indexedViewsByName()

await this._setBootRecord(this.system.core.key, length, this.system.heads, views)
}

async _updateBootRecordHeads (heads) {
if (this._systemPointer === 0) return // first tick

const views = this._viewStore.indexedViewsByName()

await this._setBootRecord(this.system.core.key, this._systemPointer, heads, views)
}

async _setBootRecord (key, length, heads, views) {
const pointer = c.encode(messages.BootRecord, {
indexed: { key, length },
Expand All @@ -1101,7 +1163,7 @@ module.exports = class Autobase extends ReadyResource {

async _drain () {
while (!this.closing) {
if (this.fastForwardTo !== null) {
if (this.opened && this.fastForwardTo !== null) {
await this._applyFastForward()
this.system.requestWakeup()
}
Expand All @@ -1111,8 +1173,8 @@ module.exports = class Autobase extends ReadyResource {
await this._loadLocalWriter(this.system)
}

const remoteAdded = await this._addRemoteHeads()
const localNodes = this._appending === null ? null : this._addLocalHeads()
const remoteAdded = this.opened ? await this._addRemoteHeads() : null
const localNodes = this.opened && this._appending !== null ? this._addLocalHeads() : null

if (this._maybeStaticFastForward === true && this.fastForwardEnabled === true) await this._checkStaticFastForward()
if (this.closing) return
Expand All @@ -1127,6 +1189,8 @@ module.exports = class Autobase extends ReadyResource {

if (this.closing) return

if (this.opened) await this._updateBootRecordHeads(this.system.heads)

if (this.localWriter !== null && localNodes !== null) {
await this._flushLocal(localNodes)
}
Expand All @@ -1153,6 +1217,7 @@ module.exports = class Autobase extends ReadyResource {
if (!changed) {
if (this._checkWriters.length > 0) {
await this._gcWriters()
if (!this.opened) break // at most one tick preready
continue // rerun the update loop as a writer might have been added
}
if (remoteAdded >= REMOTE_ADD_BATCH) continue
Expand Down Expand Up @@ -1244,7 +1309,7 @@ module.exports = class Autobase extends ReadyResource {
this._waiting.notify(null)
}

await this._gcWriters()
if (!this.closing) await this._gcWriters()
}

_ackIsNeeded () {
Expand Down
8 changes: 8 additions & 0 deletions lib/active-writers.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,12 @@ module.exports = class ActiveWriters {
delete (writer) {
this.map.delete(b4a.toString(writer.core.key, 'hex'))
}

clear () {
const p = []
for (const w of this.map.values()) p.push(w.close())
this.map.clear()

return Promise.all(p)
}
}
12 changes: 12 additions & 0 deletions lib/store.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,18 @@ module.exports = class AutoStore {
return cores
}

indexedViewsByName () {
const views = []

for (let i = 0; i < this.base.system.views.length; i++) {
const core = this.getByIndex(i)
if (!core || !core.pendingIndexedLength) break
views.push(core.name)
}

return views
}

async flush () {
while (this.waiting.length) {
const core = this.waiting.pop()
Expand Down
11 changes: 8 additions & 3 deletions test/helpers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,14 @@ function createBase (store, key, t, opts = {}) {
base.maxSupportedVersion = opts.maxSupportedVersion
}

t.teardown(async () => {
await base.close().catch(() => {})
await base._viewStore.close().catch(() => {})
t.teardown(() => {
return new Promise(resolve => {
const c = []
c.push(base.close())
setImmediate(() => c.push(base._viewStore.close()))

Promise.all(c).then(resolve)
})
}, { order: 1 })

return base
Expand Down
Loading
Loading