Skip to content

Commit

Permalink
feat!: add inclusion claim to filecoin api
Browse files Browse the repository at this point in the history
BREAKING CHANGE: aggregator events context need assert service
  • Loading branch information
vasco-santos committed Feb 9, 2024
1 parent bb74ffc commit 4b28943
Show file tree
Hide file tree
Showing 10 changed files with 329 additions and 91 deletions.
1 change: 1 addition & 0 deletions packages/filecoin-api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
"@ucanto/server": "^9.0.1",
"@ucanto/transport": "^9.0.0",
"@web3-storage/capabilities": "workspace:^",
"@web3-storage/content-claims": "^4.0.2",
"@web3-storage/data-segment": "^4.0.0",
"p-map": "^6.0.0"
},
Expand Down
40 changes: 39 additions & 1 deletion packages/filecoin-api/src/aggregator/api.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import type { Signer, Principal, Link } from '@ucanto/interface'
import type { Signer, Principal, Link, ConnectionView } from '@ucanto/interface'
import { Failure, ServiceMethod } from '@ucanto/server'
import { InclusionProof } from '@web3-storage/capabilities/types'
import { PieceLink } from '@web3-storage/data-segment'
import {
AggregatorService,
DealerService,
InvocationConfig,
} from '@web3-storage/filecoin-client/types'
import { AssertInclusion } from '@web3-storage/content-claims/capability/api'
import {
Store,
UpdatableStore,
Expand Down Expand Up @@ -53,6 +56,20 @@ export interface ServiceContext {
inclusionStore: InclusionStore
}

export interface InvocationConfigWithRequiredAudience extends InvocationConfig {
/**
* The principal delegated to in the current UCAN.
*/
audience: Principal
}

export interface ServiceConfigWithRequiredAudience<
T extends Record<string, unknown>
> {
connection: ConnectionView<T>
invocationConfig: InvocationConfigWithRequiredAudience
}

export interface PieceMessageContext
extends Pick<ServiceContext, 'pieceStore'> {}

Expand Down Expand Up @@ -83,6 +100,23 @@ export interface InclusionInsertEventToIssuePieceAccept {
aggregatorService: ServiceConfig<AggregatorService>
}

export type AssertInclusionServiceMethod = ServiceMethod<
AssertInclusion,
object,
Failure
>

export interface InclusionInsertEventToIssueInclusionClaim {
/**
* Content claims connection to claim inclusion.
*/
assertService: ServiceConfigWithRequiredAudience<{
assert: {
inclusion: AssertInclusionServiceMethod
}
}>
}

export interface AggregateInsertEventToPieceAcceptQueueContext {
/**
* Store of CID => Buffer Record
Expand Down Expand Up @@ -226,6 +260,10 @@ export interface InclusionRecordKey
export interface InclusionRecordQueryByGroup
extends Pick<InclusionRecord, 'piece' | 'group'> {}

export interface InclusionRecordWithProof extends InclusionRecord {
proof: Link
}

export type BufferedPiece = {
/**
* Piece CID for the content.
Expand Down
6 changes: 4 additions & 2 deletions packages/filecoin-api/src/aggregator/buffer-reducing.js
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,12 @@ export function aggregatePieces(bufferedPieces, config) {
const remainingBufferedPieces = []

// start by adding prepend buffered pieces if available
for (const bufferedPiece of (config.prependBufferedPieces || [])) {
for (const bufferedPiece of config.prependBufferedPieces || []) {
const p = Piece.fromLink(bufferedPiece.piece)
if (builder.estimate(p).error) {
throw new Error('aggregate builder is not able to create aggregates with only prepend buffered pieces')
throw new Error(
'aggregate builder is not able to create aggregates with only prepend buffered pieces'
)
}
builder.write(p)
addedBufferedPieces.push(bufferedPiece)
Expand Down
39 changes: 38 additions & 1 deletion packages/filecoin-api/src/aggregator/events.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Aggregator, Dealer } from '@web3-storage/filecoin-client'
import { Assert } from '@web3-storage/content-claims/capability'
import { Aggregate, Piece } from '@web3-storage/data-segment'
import { CBOR } from '@ucanto/core'

Expand Down Expand Up @@ -114,7 +115,7 @@ export const handleBufferQueueMessage = async (context, records) => {
maxAggregateSize: context.config.maxAggregateSize,
minAggregateSize: context.config.minAggregateSize,
minUtilizationFactor: context.config.minUtilizationFactor,
prependBufferedPieces: context.config.prependBufferedPieces
prependBufferedPieces: context.config.prependBufferedPieces,
})

// Store buffered pieces if not enough to do aggregate and re-queue them
Expand Down Expand Up @@ -324,6 +325,42 @@ export const handleInclusionInsertToIssuePieceAccept = async (
return { ok: {} }
}

/**
* Handle issueing inclusion claims once piece is included in an aggregate.
*
* @param {import('./api.js').InclusionInsertEventToIssueInclusionClaim} context
* @param {import('./api.js').InclusionRecordWithProof} record
*/
export const handleInclusionInsertToIssueInclusionClaim = async (
context,
record
) => {
const claimResult = await Assert.inclusion
.invoke({
issuer: context.assertService.invocationConfig.issuer,
audience: context.assertService.invocationConfig.audience,
with: context.assertService.invocationConfig.with,
nb: {
content: record.aggregate,
includes: record.piece,
proof: record.proof,
},
expiration: Infinity,
proofs: context.assertService.invocationConfig.proofs,
})
.execute(context.assertService.connection)

if (claimResult.out.error) {
return {
error: claimResult.out.error,
}
}

return {
ok: {},
}
}

/**
* On Aggregate store insert, offer inserted aggregate for deal.
*
Expand Down
22 changes: 21 additions & 1 deletion packages/filecoin-api/test/aggregator.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@ import * as AggregatorEvents from './events/aggregator.js'

import { getStoreImplementations } from './context/store-implementations.js'
import { Queue } from './context/queue.js'
import { getMockService, getConnection } from './context/service.js'
import {
getMockService,
getContentClaimsMockService,
getConnection,
} from './context/service.js'
import { validateAuthorization } from './utils.js'

describe('Aggregator', () => {
Expand Down Expand Up @@ -67,13 +71,20 @@ describe('Aggregator', () => {
define(name, async () => {
const aggregatorSigner = await Signer.generate()
const dealerSigner = await Signer.generate()
const assertSigner = await Signer.generate()

const service = getMockService()
const contentClaimsService = getContentClaimsMockService()

const aggregatorConnection = getConnection(
aggregatorSigner,
service
).connection
const dealerConnection = getConnection(dealerSigner, service).connection
const assertConnect = getConnection(
assertSigner,
contentClaimsService
).connection

// resources
/** @type {Map<string, unknown[]>} */
Expand Down Expand Up @@ -120,8 +131,17 @@ describe('Aggregator', () => {
audience: aggregatorSigner,
},
},
assertService: {
connection: assertConnect,
invocationConfig: {
issuer: assertSigner,
with: assertSigner.did(),
audience: assertSigner,
},
},
queuedMessages,
service,
contentClaimsService,
errorReporter: {
catch(error) {
assert.fail(error)
Expand Down
15 changes: 15 additions & 0 deletions packages/filecoin-api/test/context/mocks.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,21 @@ const notImplemented = () => {
throw new Server.Failure('not implemented')
}

/**
* @param {{
* assert: {
* inclusion: import('../../src/aggregator/api.js').AssertInclusionServiceMethod
* }
* }} impl
*/
export function mockContentClaimsService(impl) {
return {
assert: {
inclusion: withCallParams(impl.assert.inclusion ?? notImplemented),
},
}
}

/**
* @param {Partial<{
* filecoin: Partial<import('../../src/types.js').StorefrontService['filecoin']>
Expand Down
16 changes: 15 additions & 1 deletion packages/filecoin-api/test/context/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import * as Client from '@ucanto/client'
import * as Server from '@ucanto/server'
import * as CAR from '@ucanto/transport/car'

import * as AssertCaps from '@web3-storage/content-claims/capability'
import * as StorefrontCaps from '@web3-storage/capabilities/filecoin/storefront'
import * as AggregatorCaps from '@web3-storage/capabilities/filecoin/aggregator'
import * as DealerCaps from '@web3-storage/capabilities/filecoin/dealer'
Expand All @@ -11,7 +12,7 @@ import * as DealTrackerCaps from '@web3-storage/capabilities/filecoin/deal-track
import * as API from '../../src/types.js'

import { validateAuthorization } from '../utils.js'
import { mockService } from './mocks.js'
import { mockService, mockContentClaimsService } from './mocks.js'

export { getStoreImplementations } from './store-implementations.js'
export { getQueueImplementations } from './queue-implementations.js'
Expand Down Expand Up @@ -218,6 +219,19 @@ export function getMockService() {
})
}

export function getContentClaimsMockService() {
return mockContentClaimsService({
assert: {
inclusion: Server.provideAdvanced({
capability: AssertCaps.Assert.inclusion,
handler: async ({ invocation, context }) => {
return Server.ok({})
},
}),
},
})
}

/**
* @param {any} service
* @param {any} id
Expand Down
Loading

0 comments on commit 4b28943

Please sign in to comment.