Skip to content

Commit

Permalink
Add is-indexer event that fires immediately (#155)
Browse files Browse the repository at this point in the history
* rename internal prop to isActiveIndexer

* set/unset isIndexer prop and emit in apply and emit after

* add some tests

* add comment
  • Loading branch information
chm-diederichs authored Jun 14, 2024
1 parent fff4363 commit 95cf0f2
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 29 deletions.
47 changes: 34 additions & 13 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ module.exports = class Autobase extends ReadyResource {

this.local = null
this.localWriter = null
this.isIndexer = false

this.activeWriters = new ActiveWriters()
this.corePool = new CorePool()
Expand Down Expand Up @@ -181,8 +182,8 @@ module.exports = class Autobase extends ReadyResource {
return this._primaryBootstrap === null ? this.local.discoveryKey : this._primaryBootstrap.discoveryKey
}

get isIndexer () {
return this.localWriter ? this.localWriter.isIndexer : false
get isActiveIndexer () {
return this.localWriter ? this.localWriter.isActiveIndexer : false
}

replicate (init, opts) {
Expand Down Expand Up @@ -618,7 +619,7 @@ module.exports = class Autobase extends ReadyResource {
const unqueued = this._wakeup.unqueue(w.core.key, w.core.length)
this._coupler.remove(w.core)

if (!unqueued || w.isIndexer || this.localWriter === w) continue
if (!unqueued || w.isActiveIndexer || this.localWriter === w) continue

await this._closeWriter(w, false)
}
Expand Down Expand Up @@ -711,7 +712,7 @@ module.exports = class Autobase extends ReadyResource {
// if no one is waiting for our index manifest, wait for FF before pushing an ack
if (!isPendingIndexer && this._isFastForwarding()) return

const isIndexer = this.localWriter.isIndexer || isPendingIndexer
const isIndexer = this.localWriter.isActiveIndexer || isPendingIndexer

if (!isIndexer || this._acking || this.closing) return

Expand Down Expand Up @@ -959,7 +960,7 @@ module.exports = class Autobase extends ReadyResource {

_updateLinearizer (indexers, heads) {
this.linearizer = new Linearizer(indexers, { heads, writers: this.activeWriters })
this._addCheckpoints = !!(this.localWriter && (this.localWriter.isIndexer || this._isPending()))
this._addCheckpoints = !!(this.localWriter && (this.localWriter.isActiveIndexer || this._isPending()))
this._updateAckThreshold()
}

Expand All @@ -978,7 +979,8 @@ module.exports = class Autobase extends ReadyResource {

this.activeWriters.add(bootstrap)
this._checkWriters.push(bootstrap)
bootstrap.isIndexer = true
if (bootstrap === this.localWriter) this._setLocalIndexer()
bootstrap.isActiveIndexer = true
bootstrap.inflateBackground()
await bootstrap.ready()
this._resumeWriter(bootstrap)
Expand All @@ -1001,7 +1003,7 @@ module.exports = class Autobase extends ReadyResource {

for (const head of sys.indexers) {
const writer = await this._getWriterByKey(head.key, head.length, 0, false, false, sys)
writer.isIndexer = true
writer.isActiveIndexer = true
writer.inflateBackground()
indexers.push(writer)
}
Expand Down Expand Up @@ -1035,7 +1037,7 @@ module.exports = class Autobase extends ReadyResource {
this._resumeWriter(w)
}

if (!this.localWriter || !this.localWriter.isIndexer) return
if (!this.localWriter || !this.localWriter.isActiveIndexer) return

if (!hasWriter(this.linearizer.indexers, this.localWriter)) {
this._clearLocalIndexer()
Expand All @@ -1057,18 +1059,31 @@ module.exports = class Autobase extends ReadyResource {
if (!this.localWriter) return

this._closeWriter(this.localWriter, true)
if (this.localWriter.isIndexer) this._clearLocalIndexer()
if (this.localWriter.isActiveIndexer) this._clearLocalIndexer()

this.localWriter = null
this._pendingLocalRemoval = false

this.emit('unwritable')
}

_setLocalIndexer () {
assert(this.localWriter !== null)
this.isIndexer = true
this._addCheckpoints = true // unset once indexer is cleared
this.emit('is-indexer')
}

_unsetLocalIndexer () {
assert(this.localWriter !== null)
this.isIndexer = false
this.emit('is-non-indexer')
}

_clearLocalIndexer () {
if (!this.localWriter) return

this.localWriter.isIndexer = false
this.localWriter.isActiveIndexer = false

if (this._ackTimer) this._ackTimer.stop()
this._ackTimer = null
Expand Down Expand Up @@ -1195,7 +1210,7 @@ module.exports = class Autobase extends ReadyResource {
await this._flushLocal(localNodes)
}

if (this._pendingLocalRemoval && !this.localWriter.isIndexer) this._unsetLocalWriter()
if (this._pendingLocalRemoval && !this.localWriter.isActiveIndexer) this._unsetLocalWriter()

if (this.closing) return

Expand Down Expand Up @@ -1798,7 +1813,10 @@ module.exports = class Autobase extends ReadyResource {
}

// If we are getting added as indexer, already start adding checkpoints while we get confirmed...
if (writer === this.localWriter && isIndexer) this._addCheckpoints = true
if (writer === this.localWriter) {
if (isIndexer) this._setLocalIndexer()
else this._unsetLocalIndexer() // unset if demoted
}

// fetch any nodes needed for dependents
this._queueBump()
Expand All @@ -1809,7 +1827,10 @@ module.exports = class Autobase extends ReadyResource {
assert(this._applying !== null, 'System changes are only allowed in apply')
await this.system.remove(key)

if (b4a.equals(key, this.local.key)) this._pendingLocalRemoval = true
if (b4a.equals(key, this.local.key)) {
this._pendingLocalRemoval = true
if (this.isIndexer) this._unsetLocalIndexer()
}

const w = this.activeWriters.get(key)
if (w) w.isRemoved = true
Expand Down
6 changes: 3 additions & 3 deletions lib/consensus.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module.exports = class Consensus {
}

addHead (node) {
if (!node.writer.isIndexer) return
if (!node.writer.isActiveIndexer) return
if (this._isMerge(node)) this.merges.add(node)
this.updated = true
return node
Expand Down Expand Up @@ -52,7 +52,7 @@ module.exports = class Consensus {
}

_isMerge (node) {
if (!node.writer.isIndexer) return false
if (!node.writer.isActiveIndexer) return false

const deps = []

Expand Down Expand Up @@ -151,7 +151,7 @@ module.exports = class Consensus {
}

_acks (target) {
const acks = target.writer.isIndexer ? [target] : [] // TODO: can be cached on the target node in future (ie if we add one we dont have to check it again)
const acks = target.writer.isActiveIndexer ? [target] : [] // TODO: can be cached on the target node in future (ie if we add one we dont have to check it again)

for (const idx of this.indexers) {
if (idx === target.writer) continue
Expand Down
2 changes: 1 addition & 1 deletion lib/extension.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ module.exports = class WakeupExtension {
const writers = []

for (const w of this.base.activeWriters) {
if (w.isIndexer || w.flushed()) continue
if (w.isActiveIndexer || w.flushed()) continue
writers.push({ key: w.core.key, length: w.length })
}

Expand Down
8 changes: 4 additions & 4 deletions lib/linearizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class Node {
}
}

if (this.writer.isIndexer) this.clock.set(this.writer.core.key, this.length)
if (this.writer.isActiveIndexer) this.clock.set(this.writer.core.key, this.length)
}

tieBreak (node) {
Expand Down Expand Up @@ -121,7 +121,7 @@ module.exports = class Linearizer {
}

this.tip.add(node)
if (node.writer.isIndexer) this.consensus.addHead(node)
if (node.writer.isActiveIndexer) this.consensus.addHead(node)

this.size++
this.heads.add(node)
Expand Down Expand Up @@ -160,7 +160,7 @@ module.exports = class Linearizer {
/* Ack methods */

shouldAck (writer, pending = false) {
if (!writer || !writer.isIndexer) return false
if (!writer || !writer.isActiveIndexer) return false

// all indexers have to flushed to the dag before we ack as a quick "debounce"
for (const w of this.indexers) {
Expand All @@ -171,7 +171,7 @@ module.exports = class Linearizer {

// if ANY head is not an indexer ack
for (const head of this.heads) {
if (!head.writer.isIndexer) return true
if (!head.writer.isActiveIndexer) return true
if (head.writer === writer) isHead = true
}

Expand Down
7 changes: 4 additions & 3 deletions lib/writer.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ module.exports = class Writer extends ReadyResource {
this.nodes = new NodeBuffer(length)
this.node = null
this.isIndexer = false
this.isActiveIndexer = false
this.available = length
this.length = length
this.seenLength = 0
Expand Down Expand Up @@ -84,7 +85,7 @@ module.exports = class Writer extends ReadyResource {
async inflateExistingCheckpoints () {
await this.ready()

if (this.core.length === 0 || !this.isIndexer || this.nodes.length === 0) {
if (this.core.length === 0 || !this.isActiveIndexer || this.nodes.length === 0) {
return
}

Expand Down Expand Up @@ -201,7 +202,7 @@ module.exports = class Writer extends ReadyResource {
this.node = null
}

if (this.digestLength < this.core.length && this.isIndexer) await this._checkDigest()
if (this.digestLength < this.core.length && this.isActiveIndexer) await this._checkDigest()

return this.length < this.available
}
Expand Down Expand Up @@ -341,7 +342,7 @@ module.exports = class Writer extends ReadyResource {
if (!(await this.core.has(seq))) return false
const { node, checkpoint, maxSupportedVersion } = await this.core.get(seq, { wait: false })

if (this.isIndexer && checkpoint) {
if (this.isActiveIndexer && checkpoint) {
this._addCheckpoints(checkpoint)
}

Expand Down
28 changes: 26 additions & 2 deletions test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,22 @@ test('basic - two writers', async t => {
const { bases } = await create(3, t, { open: null })
const [base1, base2, base3] = bases

await addWriter(base1, base2)
let added = false
base2.once('is-indexer', () => { added = true })

await addWriter(base1, base2)
await confirm([base1, base2, base3])

await addWriter(base2, base3)
t.ok(added)

added = false
base3.once('is-indexer', () => { added = true })

await addWriter(base2, base3)
await confirm([base1, base2, base3])

t.ok(added)

t.is(base2.system.members, 3)
t.is(base2.system.members, base3.system.members)
t.is(base2.system.members, base2.activeWriters.size)
Expand Down Expand Up @@ -1461,8 +1469,15 @@ test('basic - readd removed indexer', async t => {
const { bases } = await create(2, t, { apply: applyWithRemove })
const [a, b] = bases

let added = false
b.on('is-indexer', () => { added = true })
b.on('is-non-indexer', () => { added = false })

await addWriterAndSync(a, b)

t.ok(added)
t.is(b.isIndexer, true)

await b.append('b1')

await confirm([a, b])
Expand All @@ -1471,6 +1486,12 @@ test('basic - readd removed indexer', async t => {
t.is(b.view.getBackingCore().session.manifest.signers.length, 2)

await a.append({ remove: b4a.toString(b.local.key, 'hex') })

await replicateAndSync([a, b])

t.absent(added)
t.is(b.isIndexer, false)

await confirm([a, b])

t.is(b.writable, false)
Expand All @@ -1489,7 +1510,10 @@ test('basic - readd removed indexer', async t => {
t.is(b.view.indexedLength, 2)

await addWriterAndSync(a, b)

t.ok(added)
t.is(b.writable, true)
t.is(b.isIndexer, true)

await b.append('b1')

Expand Down
6 changes: 3 additions & 3 deletions test/fuzz/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class SkeletonWriter {
this.seq = 0
this.head = null
this.majority = majority
this.isIndexer = !!indexer
this.isActiveIndexer = !!indexer
}

add (links = []) {
Expand Down Expand Up @@ -81,13 +81,13 @@ class SkeletonWriter {
class Writer {
constructor (key, indexer) {
this.core = { key }
this.isIndexer = true
this.isActiveIndexer = true
this.length = 0
this.nodes = {
offset: 0,
nodes: []
}
this.isIndexer = !!indexer
this.isActiveIndexer = !!indexer
}

get indexed () {
Expand Down

0 comments on commit 95cf0f2

Please sign in to comment.