Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distributed tracing support #5078

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#nullable enable

using System;
using System.Collections.Generic;
using System.Diagnostics.Tracing;
using System.Linq;
using System.Text;
using Microsoft.Diagnostics.NETCore.Client;

namespace Microsoft.Diagnostics.Monitoring.EventPipe
{
public sealed class ActivitySourceConfiguration : MonitoringSourceConfiguration
{
private readonly double _samplingRatio;
private readonly string[] _activitySourceNames;

public ActivitySourceConfiguration(
double samplingRatio,
IEnumerable<string>? activitySourceNames)
{
_samplingRatio = samplingRatio;
_activitySourceNames = activitySourceNames?.ToArray() ?? Array.Empty<string>();
}

public override IList<EventPipeProvider> GetProviders()
{
StringBuilder filterAndPayloadSpecs = new();
foreach (string activitySource in _activitySourceNames)
{
if (string.IsNullOrEmpty(activitySource))
{
continue;
}

// Note: It isn't currently possible to get Events or Links off
// of Activity using this mechanism:
// Events=Events.*Enumerate;Links=Links.*Enumerate; See:
// https://github.com/dotnet/runtime/issues/102924

string sampler = string.Empty;

if (_samplingRatio < 1D)
{
sampler = $"-ParentRatioSampler({_samplingRatio})";
}

filterAndPayloadSpecs.AppendLine($"[AS]{activitySource}/Stop{sampler}:-TraceId;SpanId;ParentSpanId;ActivityTraceFlags;TraceStateString;Kind;DisplayName;StartTimeTicks=StartTimeUtc.Ticks;DurationTicks=Duration.Ticks;Status;StatusDescription;Tags=TagObjects.*Enumerate;ActivitySourceVersion=Source.Version");
CodeBlanch marked this conversation as resolved.
Show resolved Hide resolved
}

// Note: Microsoft-Diagnostics-DiagnosticSource only supports a
// single listener. There can only be one
// ActivitySourceConfiguration, AspNetTriggerSourceConfiguration, or
// HttpRequestSourceConfiguration in play.
return new[] {
new EventPipeProvider(
DiagnosticSourceEventSource,
keywords: DiagnosticSourceEventSourceEvents | DiagnosticSourceEventSourceMessages,
eventLevel: EventLevel.Verbose,
arguments: new Dictionary<string, string>()
{
{ "FilterAndPayloadSpecs", filterAndPayloadSpecs.ToString() },
})
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public CounterConfiguration(CounterFilter filter)

internal record struct ProviderAndCounter(string ProviderName, string CounterName);

internal static class TraceEventExtensions
internal static partial class TraceEventExtensions
{
private static Dictionary<ProviderAndCounter, CounterMetadata> counterMetadataByName = new();
private static Dictionary<int, CounterMetadata> counterMetadataById = new();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#nullable enable

using System;
using System.Diagnostics;

namespace Microsoft.Diagnostics.Monitoring.EventPipe
{
internal readonly struct ActivityData
{
public ActivityData(
ActivitySourceData source,
string operationName,
string? displayName,
ActivityKind kind,
ActivityTraceId traceId,
ActivitySpanId spanId,
ActivitySpanId parentSpanId,
ActivityTraceFlags traceFlags,
string? traceState,
DateTime startTimeUtc,
DateTime endTimeUtc,
ActivityStatusCode status,
string? statusDescription)
{
if (string.IsNullOrEmpty(operationName))
{
throw new ArgumentNullException(nameof(operationName));
}

Source = source;
OperationName = operationName;
DisplayName = displayName;
Kind = kind;
TraceId = traceId;
SpanId = spanId;
ParentSpanId = parentSpanId;
TraceFlags = traceFlags;
TraceState = traceState;
StartTimeUtc = startTimeUtc;
EndTimeUtc = endTimeUtc;
Status = status;
StatusDescription = statusDescription;
}

public readonly ActivitySourceData Source;

public readonly string OperationName;

public readonly string? DisplayName;

public readonly ActivityKind Kind;

public readonly ActivityTraceId TraceId;

public readonly ActivitySpanId SpanId;

public readonly ActivitySpanId ParentSpanId;

public readonly ActivityTraceFlags TraceFlags;

public readonly string? TraceState;

public readonly DateTime StartTimeUtc;

public readonly DateTime EndTimeUtc;

public readonly ActivityStatusCode Status;

public readonly string? StatusDescription;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#nullable enable

using System;
using System.Collections.Generic;

namespace Microsoft.Diagnostics.Monitoring.EventPipe
{
internal ref struct ActivityPayload
{
public ActivityData ActivityData;

public ReadOnlySpan<KeyValuePair<string, object?>> Tags;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#nullable enable

namespace Microsoft.Diagnostics.Monitoring.EventPipe
{
internal sealed class ActivitySourceData
{
public ActivitySourceData(
string name,
string? version)
{
Name = name;
Version = version;
}

public string Name { get; }
public string? Version { get; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#nullable enable

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Diagnostics.NETCore.Client;
using Microsoft.Diagnostics.Tracing;

namespace Microsoft.Diagnostics.Monitoring.EventPipe
{
internal class DistributedTracesPipeline : EventSourcePipeline<DistributedTracesPipelineSettings>
{
private readonly IActivityLogger[] _loggers;

public DistributedTracesPipeline(DiagnosticsClient client,
DistributedTracesPipelineSettings settings,
IEnumerable<IActivityLogger> loggers) : base(client, settings)
{
_loggers = loggers?.ToArray() ?? throw new ArgumentNullException(nameof(loggers));
}

protected override MonitoringSourceConfiguration CreateConfiguration()
=> new ActivitySourceConfiguration(Settings.SamplingRatio, Settings.Sources);

protected override async Task OnRun(CancellationToken token)
{
double samplingRatio = Settings.SamplingRatio;
if (samplingRatio < 1D)
{
await ValidateEventSourceVersion().ConfigureAwait(false);
}

await base.OnRun(token).ConfigureAwait(false);
}

private async Task ValidateEventSourceVersion()
{
int majorVersion = 0;

using CancellationTokenSource cancellationTokenSource = new();

DiagnosticsEventPipeProcessor processor = new(
new ActivitySourceConfiguration(1D, activitySourceNames: null),
async (EventPipeEventSource eventSource, Func<Task> stopSessionAsync, CancellationToken token) => {
eventSource.Dynamic.All += traceEvent => {
try
{
if ("Version".Equals(traceEvent.EventName))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might want to check the provider name as well so you don't get confused if more than one provider has a Version event.

{
majorVersion = (int)traceEvent.PayloadValue(0);
}

if (!cancellationTokenSource.IsCancellationRequested)
{
// Note: Version should be the first message
// written so cancel once we have received a
// message.
cancellationTokenSource.Cancel();
}
}
catch (Exception)
{
}
};

using EventTaskSource<Action> sourceCompletedTaskSource = new(
taskComplete => taskComplete,
handler => eventSource.Completed += handler,
handler => eventSource.Completed -= handler,
token);

await sourceCompletedTaskSource.Task.ConfigureAwait(false);
});

try
{
await processor.Process(Client, TimeSpan.FromSeconds(10), resumeRuntime: false, token: cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
}

await processor.DisposeAsync().ConfigureAwait(false);

if (majorVersion < 9)
{
throw new PipelineException("Sampling ratio can only be set when listening to processes running System.Diagnostics.DiagnosticSource 9 or greater");
}
}

protected override async Task OnEventSourceAvailable(EventPipeEventSource eventSource, Func<Task> stopSessionAsync, CancellationToken token)
{
await ExecuteCounterLoggerActionAsync((logger) => logger.PipelineStarted(token)).ConfigureAwait(false);

eventSource.Dynamic.All += traceEvent => {
try
{
if (traceEvent.TryGetActivityPayload(out ActivityPayload activity))
{
foreach (IActivityLogger logger in _loggers)
{
try
{
logger.Log(
in activity.ActivityData,
activity.Tags);
}
catch (ObjectDisposedException)
{
}
}
}
}
catch (Exception)
{
}
};

using EventTaskSource<Action> sourceCompletedTaskSource = new(
taskComplete => taskComplete,
handler => eventSource.Completed += handler,
handler => eventSource.Completed -= handler,
token);

await sourceCompletedTaskSource.Task.ConfigureAwait(false);

await ExecuteCounterLoggerActionAsync((logger) => logger.PipelineStopped(token)).ConfigureAwait(false);
}

private async Task ExecuteCounterLoggerActionAsync(Func<IActivityLogger, Task> action)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private async Task ExecuteCounterLoggerActionAsync(Func<IActivityLogger, Task> action)
private async Task ExecuteActivityLoggerActionAsync(Func<IActivityLogger, Task> action)

Did you intend to name this 'CounterLogger'? I'm guessing this is a copy/paste that didn't get renamed.

{
foreach (IActivityLogger logger in _loggers)
{
try
{
await action(logger).ConfigureAwait(false);
}
catch (ObjectDisposedException)
{
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#nullable enable

namespace Microsoft.Diagnostics.Monitoring.EventPipe
{
internal class DistributedTracesPipelineSettings : EventSourcePipelineSettings
{
public double SamplingRatio { get; set; } = 1.0D;

public string[]? Sources { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

#nullable enable

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Diagnostics.Monitoring.EventPipe
{
internal interface IActivityLogger
{
void Log(
in ActivityData activity,
ReadOnlySpan<KeyValuePair<string, object?>> tags);

Task PipelineStarted(CancellationToken token);
Task PipelineStopped(CancellationToken token);
}
}
Loading
Loading