Skip to content

Commit

Permalink
Merge pull request #282 from microsoft/develop
Browse files Browse the repository at this point in the history
CO 2.2.6
  • Loading branch information
GitTorre authored Oct 9, 2023
2 parents 143a7c6 + 8b6c558 commit 9b0266a
Show file tree
Hide file tree
Showing 13 changed files with 108 additions and 107 deletions.
8 changes: 4 additions & 4 deletions Build-COSFPkgs.ps1
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ function Build-SFPkg {
try {
Push-Location $scriptPath

Build-SFPkg "Microsoft.ServiceFabricApps.ClusterObserver.Linux.SelfContained.2.2.5" "$scriptPath\bin\release\ClusterObserver\linux-x64\self-contained\ClusterObserverType"
Build-SFPkg "Microsoft.ServiceFabricApps.ClusterObserver.Linux.FrameworkDependent.2.2.5" "$scriptPath\bin\release\ClusterObserver\linux-x64\framework-dependent\ClusterObserverType"
Build-SFPkg "Microsoft.ServiceFabricApps.ClusterObserver.Linux.SelfContained.2.2.6" "$scriptPath\bin\release\ClusterObserver\linux-x64\self-contained\ClusterObserverType"
Build-SFPkg "Microsoft.ServiceFabricApps.ClusterObserver.Linux.FrameworkDependent.2.2.6" "$scriptPath\bin\release\ClusterObserver\linux-x64\framework-dependent\ClusterObserverType"

Build-SFPkg "Microsoft.ServiceFabricApps.ClusterObserver.Windows.SelfContained.2.2.5" "$scriptPath\bin\release\ClusterObserver\win-x64\self-contained\ClusterObserverType"
Build-SFPkg "Microsoft.ServiceFabricApps.ClusterObserver.Windows.FrameworkDependent.2.2.5" "$scriptPath\bin\release\ClusterObserver\win-x64\framework-dependent\ClusterObserverType"
Build-SFPkg "Microsoft.ServiceFabricApps.ClusterObserver.Windows.SelfContained.2.2.6" "$scriptPath\bin\release\ClusterObserver\win-x64\self-contained\ClusterObserverType"
Build-SFPkg "Microsoft.ServiceFabricApps.ClusterObserver.Windows.FrameworkDependent.2.2.6" "$scriptPath\bin\release\ClusterObserver\win-x64\framework-dependent\ClusterObserverType"
}
finally {
Pop-Location
Expand Down
3 changes: 2 additions & 1 deletion ClusterObserver.nuspec.template
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
<package xmlns="http://schemas.microsoft.com/packaging/2013/05/nuspec.xsd">
<metadata minClientVersion="3.3.0">
<id>%PACKAGE_ID%</id>
<version>2.2.5</version>
<version>2.2.6</version>
<releaseNotes>
- Bug Fix in App Parameter Version-less Upgrade feature.
- Performance and Code improvements.
</releaseNotes>
<authors>Microsoft</authors>
Expand Down
20 changes: 10 additions & 10 deletions ClusterObserver/ClusterObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1337,23 +1337,23 @@ private async Task<RepairTaskList> GetRepairTasksCurrentlyProcessingAsync(Cancel
try
{
var repairTasks = await FabricClientRetryHelper.ExecuteFabricActionWithRetryAsync(
() => FabricClientInstance.RepairManager.GetRepairTaskListAsync(
null,
RepairTaskStateFilter.Active |
RepairTaskStateFilter.Approved |
RepairTaskStateFilter.Executing,
null,
ignoreDefaultQueryTimeout ? TimeSpan.FromSeconds(1) : ConfigurationSettings.AsyncTimeout,
cancellationToken),
cancellationToken);
() => FabricClientInstance.RepairManager.GetRepairTaskListAsync(
null,
RepairTaskStateFilter.Active |
RepairTaskStateFilter.Approved |
RepairTaskStateFilter.Executing,
null,
ignoreDefaultQueryTimeout ? TimeSpan.FromSeconds(1) : ConfigurationSettings.AsyncTimeout,
cancellationToken),
cancellationToken);

return repairTasks;
}
catch (Exception e) when (e is FabricException or TimeoutException)
{

}
catch (Exception e) when (e is not (OperationCanceledException or TaskCanceledException))
catch (Exception e) when (e is not (OperationCanceledException or TaskCanceledException or OutOfMemoryException))
{
ObserverLogger.LogWarning(e.ToString());
}
Expand Down
5 changes: 2 additions & 3 deletions ClusterObserver/ClusterObserver.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@
<IsServiceFabricServiceProject>True</IsServiceFabricServiceProject>
<RuntimeIdentifiers>win-x64;linux-x64</RuntimeIdentifiers>
<TargetLatestRuntimePatch>True</TargetLatestRuntimePatch>
<Copyright>Copyright © 2022</Copyright>
<Product>ClusterObserver</Product>
<Version>2.2.5</Version>
<FileVersion>2.2.5</FileVersion>
<Version>2.2.6</Version>
<FileVersion>2.2.6</FileVersion>
<AutoGenerateBindingRedirects>true</AutoGenerateBindingRedirects>
<GenerateAssemblyInfo>false</GenerateAssemblyInfo>
<StartupObject>ClusterObserver.Program</StartupObject>
Expand Down
147 changes: 74 additions & 73 deletions ClusterObserver/ClusterObserverManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Fabric.Health;
using System.IO;
using System.Linq;
using System.Net.Http.Headers;
using System.Threading;
using System.Threading.Tasks;
using ClusterObserver.Utilities;
Expand Down Expand Up @@ -39,7 +40,7 @@ public sealed class ClusterObserverManager : IDisposable
private bool appParamsUpdating;

// Folks often use their own version numbers. This is for internal diagnostic telemetry.
private const string InternalVersionNumber = "2.2.5";
private const string InternalVersionNumber = "2.2.6";

public bool EnableOperationalTelemetry
{
Expand Down Expand Up @@ -172,7 +173,7 @@ private async void ShutdownHandler(object sender, ConsoleCancelEventArgs console
return;
}

await Task.Delay(shutdownGracePeriodInSeconds).ConfigureAwait(true);
await Task.Delay(shutdownGracePeriodInSeconds);

shutdownSignaled = true;
await StopAsync();
Expand Down Expand Up @@ -295,32 +296,40 @@ public async Task StartAsync()
}
}
}
catch
catch (Exception e) when (e is not OutOfMemoryException)
{
// Telemetry is non-critical and should not take down CO.
// TelemetryLib will log exception details to file in top level FO log folder.
}
}

while (true)
// Run until the SF RunAsync CancellationToken is cancelled.
while (!token.IsCancellationRequested)
{
if (!appParamsUpdating && (shutdownSignaled || token.IsCancellationRequested))
try
{
Logger.LogInfo("Shutdown signaled. Stopping.");
await StopAsync();
break;
}
if (!appParamsUpdating && shutdownSignaled)
{
Logger.LogInfo("Shutdown signaled. Stopping.");
break;
}

await RunAsync();
await Task.Delay(TimeSpan.FromSeconds(ObserverExecutionLoopSleepSeconds > 0 ? ObserverExecutionLoopSleepSeconds : 15), token);
}
}
catch (Exception e) when (e is OperationCanceledException or TaskCanceledException)
{
if (!appParamsUpdating && (shutdownSignaled || token.IsCancellationRequested))
{
await StopAsync();
await RunAsync();
Logger.LogInfo($"Waiting {(ObserverExecutionLoopSleepSeconds > 0 ? ObserverExecutionLoopSleepSeconds : 15)} seconds until next observer run loop.");
await Task.Delay(TimeSpan.FromSeconds(ObserverExecutionLoopSleepSeconds > 0 ? ObserverExecutionLoopSleepSeconds : 15), token);
}
catch (Exception e) when (e is ArgumentException or FabricException or OperationCanceledException or TaskCanceledException or TimeoutException)
{
if (token.IsCancellationRequested)
{
Logger.LogInfo("RunAsync CancellationToken has been canceled by the SF runtime. Stopping.");
break;
}
}
}

// Closing. Stop all observers.
await StopAsync();
}
catch (Exception e)
{
Expand All @@ -332,7 +341,7 @@ public async Task StartAsync()
{
_ = TelemetryClient?.ReportHealthAsync(
"ClusterObserverServiceHealth",
HealthState.Warning,
HealthState.Error,
message,
ClusterObserverConstants.ClusterObserverManagerName,
token);
Expand All @@ -345,9 +354,8 @@ public async Task StartAsync()
ClusterObserverConstants.ClusterObserverETWEventName,
new
{
HealthState = "Warning",
HealthEventDescription = message,
Metric = "ClusterObserverServiceHealth",
Level = "Critical",
Message = message,
Source = ClusterObserverConstants.ClusterObserverName
});
}
Expand All @@ -371,7 +379,7 @@ public async Task StartAsync()
string filepath = Path.Combine(Logger.LogFolderBasePath, $"co_critical_error_telemetry.log");
_ = telemetryEvents.EmitCriticalErrorEvent(data, ClusterObserverConstants.ClusterObserverName, filepath);
}
catch
catch (Exception ex) when (ex is not OutOfMemoryException)
{
// Telemetry is non-critical and should not take down FO.
}
Expand All @@ -393,22 +401,22 @@ private static ClusterObserverOperationalEventData GetClusterObserverInternalTel
Version = InternalVersionNumber
};
}
catch (Exception e) when (e is ArgumentException)
catch (ArgumentException)
{

}

return telemetryData;
}

public async Task StopAsync()
public async Task StopAsync(bool isAppParamUpdate = false)
{
if (!shutdownSignaled)
if (!shutdownSignaled && !isAppParamUpdate)
{
shutdownSignaled = true;
}

await SignalAbortToRunningObserverAsync().ConfigureAwait(true);
await SignalAbortToRunningObserverAsync();
}

private Task SignalAbortToRunningObserverAsync()
Expand Down Expand Up @@ -463,17 +471,25 @@ private async Task RunAsync()

try
{
Logger.LogInfo($"Starting {observer.ObserverName}");
Logger.LogInfo($"Started {observer.ObserverName} run.");
IsObserverRunning = true;

// Synchronous call.
bool isCompleted = observer.ObserveAsync(linkedSFRuntimeObserverTokenSource != null ? linkedSFRuntimeObserverTokenSource.Token : token).Wait(observerExecTimeout);
bool isCompleted =
observer.ObserveAsync(linkedSFRuntimeObserverTokenSource != null ? linkedSFRuntimeObserverTokenSource.Token : token).Wait(observerExecTimeout);

// The observer is taking too long (hung?)
if (!isCompleted)
if (!isCompleted && !(token.IsCancellationRequested || shutdownSignaled || appParamsUpdating))
{
string observerHealthWarning = $"{observer.ObserverName} has exceeded its specified run time of {observerExecTimeout.TotalSeconds} seconds. Aborting.";
string observerHealthWarning =
$"{observer.ObserverName} has exceeded its specified run time of {observerExecTimeout.TotalSeconds} seconds. Aborting.";
await SignalAbortToRunningObserverAsync();

// Refresh CO CancellationTokenSources.
cts?.Dispose();
linkedSFRuntimeObserverTokenSource?.Dispose();
cts = new CancellationTokenSource();
linkedSFRuntimeObserverTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, token);

Logger.LogWarning(observerHealthWarning);

Expand All @@ -495,12 +511,12 @@ private async Task RunAsync()
{
HealthState = "Warning",
HealthEventDescription = observerHealthWarning,
Metric = "ClusterObserverServiceHealth",
Source = ClusterObserverConstants.ClusterObserverName
});
}
}

Logger.LogInfo($"Completed {observer.ObserverName} run.");
}
catch (AggregateException ae)
{
Expand Down Expand Up @@ -557,82 +573,61 @@ private async Task RunAsync()
}
}

/// <summary>
/// App parameter config update handler. This will recreate CO instance with new ConfigSettings applied.
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
private async void CodePackageActivationContext_ConfigurationPackageModifiedEvent(object sender, PackageModifiedEventArgs<ConfigurationPackage> e)
{
appParamsUpdating = true;
Logger.LogWarning("Application Parameter upgrade started...");

try
{

await StopAsync();
appParamsUpdating = true;
await StopAsync(isAppParamUpdate: true);
var newSettings = e.NewPackage.Settings;

// Observer settings.
// ClusterObserver and plugin observer settings.
foreach (var observer in Observers)
{
if (token.IsCancellationRequested)
{
return;
}

observer.ConfigurationSettings = new ConfigSettings(e.NewPackage.Settings, $"{observer.ObserverName}Configuration");

// The ObserverLogger instance (member of each observer type) checks its EnableVerboseLogging setting before writing Info events (it won't write if this setting is false, thus non-verbose).
// So, we set it here in case the parameter update includes a change to this config setting.
if (e.NewPackage.Settings.Sections[$"{observer.ObserverName}Configuration"].Parameters.Contains(ObserverConstants.EnableVerboseLoggingParameter)
&& e.OldPackage.Settings.Sections[$"{observer.ObserverName}Configuration"].Parameters.Contains(ObserverConstants.EnableVerboseLoggingParameter))
{
string newLoggingSetting = e.NewPackage.Settings.Sections[$"{observer.ObserverName}Configuration"].Parameters[ObserverConstants.EnableVerboseLoggingParameter].Value.ToLower();
string oldLoggingSetting = e.OldPackage.Settings.Sections[$"{observer.ObserverName}Configuration"].Parameters[ObserverConstants.EnableVerboseLoggingParameter].Value.ToLower();
string configSectionName = observer.ConfigurationSettings.ConfigSection.Name;
observer.ConfigPackage = e.NewPackage;
observer.ConfigurationSettings = new ConfigSettings(newSettings, configSectionName);
observer.ObserverLogger.EnableVerboseLogging = observer.ConfigurationSettings.EnableVerboseLogging;

if (newLoggingSetting != oldLoggingSetting)
{
observer.ObserverLogger.EnableVerboseLogging = observer.ConfigurationSettings.EnableVerboseLogging;
}
}
// Reset last run time so the observer restarts (if enabled) after the app parameter update completes.
observer.LastRunDateTime = DateTime.MinValue;
}

// ClusterObserverManager settings.
SetPropertiesFromConfigurationParameters(e.NewPackage.Settings);

cts ??= new CancellationTokenSource();
linkedSFRuntimeObserverTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, token);
SetPropertiesFromConfigurationParameters(newSettings);
}
catch (Exception err)
catch (Exception ex) when (ex is not OutOfMemoryException)
{
var healthReport = new HealthReport
{
AppName = new Uri(FabricServiceContext.CodePackageActivationContext.ApplicationName),
Code = FOErrorWarningCodes.Ok,
EntityType = EntityType.Application,
HealthMessage = $"Error updating ClusterObserver with new configuration settings:{Environment.NewLine}{err}",
HealthMessage = $"Error updating ClusterObserver with new configuration settings:{Environment.NewLine}{ex}",
NodeName = FabricServiceContext.NodeContext.NodeName,
State = HealthState.Ok,
Property = "CO_Configuration_Upate_Error",
Property = "Configuration_Upate_Error",
EmitLogEvent = true
};

ObserverHealthReporter healthReporter = new(Logger);
healthReporter.ReportHealthToServiceFabric(healthReport);
}

// Refresh CO CancellationTokenSources.
cts?.Dispose();
linkedSFRuntimeObserverTokenSource?.Dispose();
cts = new CancellationTokenSource();
linkedSFRuntimeObserverTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, this.token);
Logger.LogWarning("Application Parameter upgrade completed...");
appParamsUpdating = false;
}

private void Dispose(bool disposing)
{
if (hasDisposed)
{
return;
}

if (!disposing)
if (hasDisposed || !disposing)
{
return;
}
Expand All @@ -648,6 +643,12 @@ private void Dispose(bool disposing)
cts = null;
}

if (linkedSFRuntimeObserverTokenSource != null)
{
linkedSFRuntimeObserverTokenSource.Dispose();
linkedSFRuntimeObserverTokenSource = null;
}

// Flush and Dispose all NLog targets. No more logging.
Logger.Flush();
Logger.ShutDown();
Expand Down
2 changes: 1 addition & 1 deletion ClusterObserver/FabricClusterObserver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ protected override async Task RunAsync(CancellationToken cancellationToken)
ConfigureServices(services);

await using ServiceProvider serviceProvider = services.BuildServiceProvider();
using var observerManager = new ClusterObserverManager(serviceProvider, cancellationToken);
using ClusterObserverManager observerManager = new(serviceProvider, cancellationToken);
await observerManager.StartAsync();
}

Expand Down
Loading

0 comments on commit 9b0266a

Please sign in to comment.