Skip to content

Commit

Permalink
ApplyMetadata() on SingleStreamProjection is applied during FetchForW…
Browse files Browse the repository at this point in the history
…riting/FetchLatest
  • Loading branch information
jeremydmiller committed Feb 4, 2025
1 parent 0a6d5dd commit f40ba6c
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 4 deletions.
96 changes: 96 additions & 0 deletions src/EventSourcingTests/Aggregation/using_apply_metadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,100 @@ public async Task apply_metadata()

// RIP Glenn Frey, take it easy!
item.LastModifiedBy.ShouldBe("Glenn Frey");
item.Version.ShouldBe(4);
}

[Theory]
[InlineData(ProjectionLifecycle.Live)]
[InlineData(ProjectionLifecycle.Inline)]
[InlineData(ProjectionLifecycle.Async)]
public async Task use_with_fetch_latest(ProjectionLifecycle lifecycle)
{
StoreOptions(opts =>
{
opts.Projections.Add(new ItemProjection(), lifecycle);

// THIS IS NECESSARY FOR THIS SAMPLE!
opts.Events.MetadataConfig.HeadersEnabled = true;
});

// Setting a header value on the session, which will get tagged on each
// event captured by the current session
theSession.SetHeader("last-modified-by", "Glenn Frey");

var id = theSession.Events.StartStream<Item>(new ItemStarted("Blue item")).Id;
await theSession.SaveChangesAsync();

theSession.Events.Append(id, new ItemWorked(), new ItemWorked(), new ItemFinished());
await theSession.SaveChangesAsync();

var item = await theSession.Events.FetchLatest<Item>(id);

// RIP Glenn Frey, take it easy!
item.LastModifiedBy.ShouldBe("Glenn Frey");
item.Version.ShouldBe(4);
}

[Theory]
[InlineData(ProjectionLifecycle.Live)]
[InlineData(ProjectionLifecycle.Inline)]
[InlineData(ProjectionLifecycle.Async)]
public async Task use_with_fetch_for_writing(ProjectionLifecycle lifecycle)
{
StoreOptions(opts =>
{
opts.Projections.Add(new ItemProjection(), lifecycle);

// THIS IS NECESSARY FOR THIS SAMPLE!
opts.Events.MetadataConfig.HeadersEnabled = true;
});

// Setting a header value on the session, which will get tagged on each
// event captured by the current session
theSession.SetHeader("last-modified-by", "Glenn Frey");

var id = theSession.Events.StartStream<Item>(new ItemStarted("Blue item")).Id;
await theSession.SaveChangesAsync();

theSession.Events.Append(id, new ItemWorked(), new ItemWorked(), new ItemFinished());
await theSession.SaveChangesAsync();

var item = await theSession.Events.FetchForWriting<Item>(id);

// RIP Glenn Frey, take it easy!
item.Aggregate.LastModifiedBy.ShouldBe("Glenn Frey");
item.Aggregate.Version.ShouldBe(4);
}

[Theory]
[InlineData(ProjectionLifecycle.Live)]
[InlineData(ProjectionLifecycle.Inline)]
[InlineData(ProjectionLifecycle.Async)]
public async Task use_with_fetch_for_writing_for_specific_version(ProjectionLifecycle lifecycle)
{
StoreOptions(opts =>
{
opts.Projections.Add(new ItemProjection(), lifecycle);

// THIS IS NECESSARY FOR THIS SAMPLE!
opts.Events.MetadataConfig.HeadersEnabled = true;
});

// Setting a header value on the session, which will get tagged on each
// event captured by the current session
theSession.SetHeader("last-modified-by", "Glenn Frey");

var id = theSession.Events.StartStream<Item>(new ItemStarted("Blue item")).Id;
await theSession.SaveChangesAsync();

theSession.Events.Append(id, new ItemWorked(), new ItemWorked(), new ItemFinished());
await theSession.SaveChangesAsync();

var item = await theSession.Events.FetchForWriting<Item>(id, 4);

// RIP Glenn Frey, take it easy!
item.Aggregate.LastModifiedBy.ShouldBe("Glenn Frey");
item.Aggregate.Version.ShouldBe(4);
}
}

Expand All @@ -54,6 +148,8 @@ public class Item
public bool Completed { get; set; }
public string LastModifiedBy { get; set; }
public DateTimeOffset? LastModified { get; set; }

public int Version { get; set; }
}

public record ItemStarted(string Description);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,11 +306,11 @@ private void buildLiveAggregationType(GeneratedAssembly assembly)
var snapshot = buildMethod.Arguments.Single(x => x.VariableType == typeof(T));
buildMethod.Frames.Code($"if (!events.Any()) return {snapshot.Usage};");

var callCreateAggregateFrame = new CallCreateAggregateFrame(_createMethods);
var callCreateAggregateFrame = new CallCreateAggregateFrame(_createMethods, GetType());
callCreateAggregateFrame.CoalesceAssignTo(snapshot);

buildMethod.Frames.Add(callCreateAggregateFrame);
buildMethod.Frames.Add(new CallApplyAggregateFrame(_applyMethods) { InsideForEach = true });
buildMethod.Frames.Add(new CallApplyAggregateFrame(_applyMethods, GetType()) { InsideForEach = true });

buildMethod.Frames.Return(typeof(T));

Expand Down
10 changes: 9 additions & 1 deletion src/Marten/Events/CodeGeneration/CallApplyAggregateFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ namespace Marten.Events.CodeGeneration;

internal class CallApplyAggregateFrame: Frame
{
private readonly Type _projectionType;
private Variable _aggregate;
private Variable _cancellation;
private Variable _session;
private Variable _usedEventOnCreate;
private Variable _projection;

public CallApplyAggregateFrame(ApplyMethodCollection methods): base(methods.IsAsync)
public CallApplyAggregateFrame(ApplyMethodCollection methods, Type projectionType): base(methods.IsAsync)
{
_projectionType = projectionType;
AggregateType = methods.AggregateType;
}

Expand All @@ -30,6 +33,9 @@ public override IEnumerable<Variable> FindVariables(IMethodVariables chain)
_session = chain.TryFindVariable(typeof(IQuerySession), VariableSource.All) ??
chain.FindVariable(typeof(IDocumentSession));

_projection = chain.FindVariable(_projectionType);
yield return _projection;

_usedEventOnCreate = chain.FindVariableByName(typeof(bool), CallCreateAggregateFrame.UsedEventOnCreateName);

yield return _session;
Expand Down Expand Up @@ -62,6 +68,8 @@ public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
$"{_aggregate.Usage} = {ApplyMethodCollection.MethodName}(@event, {_aggregate.Usage}, {_session.Usage});");
}

writer.WriteLine($"if ({_aggregate.Usage} != null) {_projection.Usage}.ApplyMetadata({_aggregate.Usage}, @event);");

if (InsideForEach)
{
writer.FinishBlock();
Expand Down
9 changes: 8 additions & 1 deletion src/Marten/Events/CodeGeneration/CallCreateAggregateFrame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,14 @@ namespace Marten.Events.CodeGeneration;

internal class CallCreateAggregateFrame: Frame
{
private readonly Type _projectionType;
private Variable _cancellation;
private Variable _session;
private Variable _projection;

public CallCreateAggregateFrame(CreateMethodCollection methods): base(methods.IsAsync)
public CallCreateAggregateFrame(CreateMethodCollection methods, Type projectionType): base(methods.IsAsync)
{
_projectionType = projectionType;
Aggregate = new Variable(methods.AggregateType, this);
UsedEventOnCreate = new Variable(typeof(bool), UsedEventOnCreateName, this);
}
Expand Down Expand Up @@ -46,6 +49,9 @@ public override IEnumerable<Variable> FindVariables(IMethodVariables chain)
chain.FindVariable(typeof(IDocumentSession));
yield return _session;

_projection = chain.FindVariable(_projectionType);
yield return _projection;

if (IsAsync)
{
_cancellation = chain.TryFindVariable(typeof(CancellationToken), VariableSource.All) ??
Expand Down Expand Up @@ -104,6 +110,7 @@ public override void GenerateCode(GeneratedMethod method, ISourceWriter writer)
writer.WriteLine($"{UsedEventOnCreate.Usage} = false;");
// creates default or throws if not possible
writer.WriteLine($"{Aggregate.Usage} = {CreateDefaultMethod.MethodName}({FirstEventExpression});");
writer.WriteLine($"if ({Aggregate.Usage} != null) {_projection.Usage}.ApplyMetadata({Aggregate.Usage}, {FirstEventExpression});");
writer.FinishBlock();

Next?.GenerateCode(method, writer);
Expand Down

0 comments on commit f40ba6c

Please sign in to comment.