From 42bcc8a454b2370b5616086f69a201cbc0280aba Mon Sep 17 00:00:00 2001 From: areller Date: Fri, 27 Mar 2020 22:39:57 -0400 Subject: [PATCH] Purging Hanging Replicas (On Startup and via Command) (#155) * Purge hanging replicas * Use dictionary for event serialization * Subscribing to replica events instead of waiting for arbitrary time in tests * Only return list of running containers from DockerAssert * use IDictionary instead of deserializing to ReplicaStatus when removing stale replicas * Exit logs loop in DockerRunner if container is killed --- src/Microsoft.Tye.Core/ProcessUtil.cs | 10 ++ src/Microsoft.Tye.Hosting/DockerRunner.cs | 42 ++++- src/Microsoft.Tye.Hosting/ProcessRunner.cs | 37 ++++- src/Microsoft.Tye.Hosting/ReplicaRegistry.cs | 104 +++++++++++++ src/Microsoft.Tye.Hosting/TyeHost.cs | 13 +- test/E2ETest/DockerAssert.cs | 26 ++++ test/E2ETest/TestHelpers.cs | 111 ++++++++++++- test/E2ETest/TyePurgeTests.cs | 155 +++++++++++++++++++ 8 files changed, 486 insertions(+), 12 deletions(-) create mode 100644 src/Microsoft.Tye.Hosting/ReplicaRegistry.cs create mode 100644 test/E2ETest/TyePurgeTests.cs diff --git a/src/Microsoft.Tye.Core/ProcessUtil.cs b/src/Microsoft.Tye.Core/ProcessUtil.cs index 15ee3d8c..103ea3a6 100644 --- a/src/Microsoft.Tye.Core/ProcessUtil.cs +++ b/src/Microsoft.Tye.Core/ProcessUtil.cs @@ -139,5 +139,15 @@ namespace Microsoft.Tye return await processLifetimeTask.Task; } + + public static void KillProcess(int pid) + { + try + { + Process.GetProcessById(pid)?.Kill(); + } + catch (ArgumentException) { } + catch (InvalidOperationException) { } + } } } diff --git a/src/Microsoft.Tye.Hosting/DockerRunner.cs b/src/Microsoft.Tye.Hosting/DockerRunner.cs index 32563885..993c5b45 100644 --- a/src/Microsoft.Tye.Hosting/DockerRunner.cs +++ b/src/Microsoft.Tye.Hosting/DockerRunner.cs @@ -15,16 +15,23 @@ namespace Microsoft.Tye.Hosting { public class DockerRunner : IApplicationProcessor { + private const string DOCKER_REPLICA_STORE = "docker"; + private static readonly TimeSpan DockerStopTimeout = TimeSpan.FromSeconds(30); private readonly ILogger _logger; - public DockerRunner(ILogger logger) + private readonly ReplicaRegistry _replicaRegistry; + + public DockerRunner(ILogger logger, ReplicaRegistry replicaRegistry) { _logger = logger; + _replicaRegistry = replicaRegistry; } - public Task StartAsync(Application application) + public async Task StartAsync(Application application) { + await PurgeFromPreviousRun(); + var tasks = new Task[application.Services.Count]; var index = 0; foreach (var s in application.Services) @@ -32,7 +39,7 @@ namespace Microsoft.Tye.Hosting tasks[index++] = s.Value.Description.RunInfo is DockerRunInfo docker ? StartContainerAsync(application, s.Value, docker) : Task.CompletedTask; } - return Task.WhenAll(tasks); + await Task.WhenAll(tasks); } public Task StopAsync(Application application) @@ -157,6 +164,7 @@ namespace Microsoft.Tye.Hosting status.DockerCommand = command; + WriteReplicaToStore(replica); var result = await ProcessUtil.RunAsync( "docker", command, @@ -199,12 +207,17 @@ namespace Microsoft.Tye.Hosting while (!dockerInfo.StoppingTokenSource.Token.IsCancellationRequested) { - await ProcessUtil.RunAsync("docker", $"logs -f {containerId}", + var logsRes = await ProcessUtil.RunAsync("docker", $"logs -f {containerId}", outputDataReceived: data => service.Logs.OnNext($"[{replica}]: {data}"), errorDataReceived: data => service.Logs.OnNext($"[{replica}]: {data}"), throwOnError: false, cancellationToken: dockerInfo.StoppingTokenSource.Token); + if (logsRes.ExitCode != 0) + { + break; + } + if (!dockerInfo.StoppingTokenSource.IsCancellationRequested) { try @@ -286,6 +299,27 @@ namespace Microsoft.Tye.Hosting service.Items[typeof(DockerInformation)] = dockerInfo; } + private async Task PurgeFromPreviousRun() + { + var dockerReplicas = await _replicaRegistry.GetEvents(DOCKER_REPLICA_STORE); + foreach (var replica in dockerReplicas) + { + var container = replica["container"]; + await ProcessUtil.RunAsync("docker", $"rm -f {container}", throwOnError: false); + _logger.LogInformation("removed contaienr {container} from previous run", container); + } + + _replicaRegistry.DeleteStore(DOCKER_REPLICA_STORE); + } + + private void WriteReplicaToStore(string container) + { + _replicaRegistry.WriteReplicaEvent(DOCKER_REPLICA_STORE, new Dictionary() + { + ["container"] = container + }); + } + private static void PrintStdOutAndErr(Service service, string replica, ProcessResult result) { if (result.ExitCode != 0) diff --git a/src/Microsoft.Tye.Hosting/ProcessRunner.cs b/src/Microsoft.Tye.Hosting/ProcessRunner.cs index ee3e785c..3a5e7176 100644 --- a/src/Microsoft.Tye.Hosting/ProcessRunner.cs +++ b/src/Microsoft.Tye.Hosting/ProcessRunner.cs @@ -17,17 +17,24 @@ namespace Microsoft.Tye.Hosting { public class ProcessRunner : IApplicationProcessor { + private const string PROCESS_REPLICA_STORE = "process"; + private readonly ILogger _logger; private readonly ProcessRunnerOptions _options; - public ProcessRunner(ILogger logger, ProcessRunnerOptions options) + private readonly ReplicaRegistry _replicaRegistry; + + public ProcessRunner(ILogger logger, ReplicaRegistry replicaRegistry, ProcessRunnerOptions options) { _logger = logger; + _replicaRegistry = replicaRegistry; _options = options; } - public Task StartAsync(Application application) + public async Task StartAsync(Application application) { + await PurgeFromPreviousRun(); + var tasks = new Task[application.Services.Count]; var index = 0; foreach (var s in application.Services) @@ -41,7 +48,7 @@ namespace Microsoft.Tye.Hosting }; } - return Task.WhenAll(tasks); + await Task.WhenAll(tasks); } public Task StopAsync(Application application) @@ -205,6 +212,7 @@ namespace Microsoft.Tye.Hosting status.Pid = pid; + WriteReplicaToStore(pid.ToString()); service.ReplicaEvents.OnNext(new ReplicaEvent(ReplicaState.Started, status)); }, throwOnError: false, @@ -299,6 +307,29 @@ namespace Microsoft.Tye.Hosting return Task.WhenAll(tasks); } + private async Task PurgeFromPreviousRun() + { + var processReplicas = await _replicaRegistry.GetEvents(PROCESS_REPLICA_STORE); + foreach (var replica in processReplicas) + { + if (int.TryParse(replica["pid"], out var pid)) + { + ProcessUtil.KillProcess(pid); + _logger.LogInformation("removed process {pid} from previous run", pid); + } + } + + _replicaRegistry.DeleteStore(PROCESS_REPLICA_STORE); + } + + private void WriteReplicaToStore(string pid) + { + _replicaRegistry.WriteReplicaEvent(PROCESS_REPLICA_STORE, new Dictionary() + { + ["pid"] = pid + }); + } + private static string? GetDotnetRoot() { var process = Process.GetCurrentProcess(); diff --git a/src/Microsoft.Tye.Hosting/ReplicaRegistry.cs b/src/Microsoft.Tye.Hosting/ReplicaRegistry.cs new file mode 100644 index 00000000..43a307d8 --- /dev/null +++ b/src/Microsoft.Tye.Hosting/ReplicaRegistry.cs @@ -0,0 +1,104 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text.Json; +using System.Threading.Tasks; +using Microsoft.Tye.Hosting.Model; +using Microsoft.Extensions.Logging; +using System.Threading; +using System.Collections.Concurrent; + +namespace Microsoft.Tye.Hosting +{ + public class ReplicaRegistry : IDisposable + { + private readonly ILogger _logger; + private readonly ConcurrentDictionary _fileWriteSemaphores; + private readonly string _tyeFolderPath; + + public ReplicaRegistry(Model.Application application, ILogger logger) + { + _logger = logger; + _fileWriteSemaphores = new ConcurrentDictionary(); + _tyeFolderPath = Path.Join(Path.GetDirectoryName(application.Source), ".tye"); + + if (!Directory.Exists(_tyeFolderPath)) + { + Directory.CreateDirectory(_tyeFolderPath); + } + } + + public bool WriteReplicaEvent(string storeName, IDictionary replicaRecord) + { + var filePath = Path.Join(_tyeFolderPath, GetStoreFile(storeName)); + var contents = JsonSerializer.Serialize(replicaRecord, new JsonSerializerOptions { WriteIndented = false }); + var semaphore = GetSempahoreForStore(storeName); + + semaphore.Wait(); + try + { + File.AppendAllText(filePath, contents + Environment.NewLine); + return true; + } + catch (DirectoryNotFoundException ex) + { + _logger.LogWarning(ex, "tye folder is not found. file: {file}", filePath); + return false; + } + finally + { + semaphore.Release(); + } + } + + public bool DeleteStore(string storeName) + { + var filePath = Path.Join(_tyeFolderPath, GetStoreFile(storeName)); + + try + { + File.Delete(storeName); + return true; + } + catch (DirectoryNotFoundException ex) + { + _logger.LogWarning(ex, "tye folder is not found. file: {file}", filePath); + return false; + } + } + + public async ValueTask>> GetEvents(string storeName) + { + var filePath = Path.Join(_tyeFolderPath, GetStoreFile(storeName)); + + if (!File.Exists(filePath)) + { + return Array.Empty>(); + } + + var contents = await File.ReadAllTextAsync(filePath); + var events = contents.Split(Environment.NewLine); + + return events.Where(e => !string.IsNullOrEmpty(e.Trim())) + .Select(e => JsonSerializer.Deserialize>(e)) + .ToList(); + } + + private SemaphoreSlim GetSempahoreForStore(string storeName) + { + return _fileWriteSemaphores.GetOrAdd(storeName, _ => new SemaphoreSlim(1, 1)); + } + + private string GetStoreFile(string storeName) => $"{storeName}_store"; + + public void Dispose() + { + Directory.Delete(_tyeFolderPath, true); + } + } +} diff --git a/src/Microsoft.Tye.Hosting/TyeHost.cs b/src/Microsoft.Tye.Hosting/TyeHost.cs index 1e7847ec..87afccba 100644 --- a/src/Microsoft.Tye.Hosting/TyeHost.cs +++ b/src/Microsoft.Tye.Hosting/TyeHost.cs @@ -37,6 +37,8 @@ namespace Microsoft.Tye.Hosting private readonly string[] _args; private readonly string[] _servicesToDebug; + private ReplicaRegistry? _replicaRegistry; + public TyeHost(Application application, string[] args) : this(application, args, new string[0]) { @@ -81,7 +83,9 @@ namespace Microsoft.Tye.Hosting var configuration = app.Configuration; - _processor = CreateApplicationProcessor(_args, _servicesToDebug, _logger, configuration); + _replicaRegistry = new ReplicaRegistry(_application, _logger); + + _processor = CreateApplicationProcessor(_replicaRegistry, _args, _servicesToDebug, _logger, configuration); await app.StartAsync(); @@ -262,7 +266,7 @@ namespace Microsoft.Tye.Hosting return false; } - private static AggregateApplicationProcessor CreateApplicationProcessor(string[] args, string[] servicesToDebug, Microsoft.Extensions.Logging.ILogger logger, IConfiguration configuration) + private static AggregateApplicationProcessor CreateApplicationProcessor(ReplicaRegistry replicaRegistry, string[] args, string[] servicesToDebug, Microsoft.Extensions.Logging.ILogger logger, IConfiguration configuration) { var diagnosticOptions = DiagnosticOptions.FromConfiguration(configuration); var diagnosticsCollector = new DiagnosticsCollector(logger, diagnosticOptions); @@ -276,8 +280,8 @@ namespace Microsoft.Tye.Hosting new PortAssigner(logger), new ProxyService(logger), new HttpProxyService(logger), - new DockerRunner(logger), - new ProcessRunner(logger, ProcessRunnerOptions.FromArgs(args, servicesToDebug)), + new DockerRunner(logger, replicaRegistry), + new ProcessRunner(logger, replicaRegistry, ProcessRunnerOptions.FromArgs(args, servicesToDebug)) }; // If the docker command is specified then transform the ProjectRunInfo into DockerRunInfo @@ -291,6 +295,7 @@ namespace Microsoft.Tye.Hosting public void Dispose() { + _replicaRegistry?.Dispose(); DashboardWebApplication?.Dispose(); } } diff --git a/test/E2ETest/DockerAssert.cs b/test/E2ETest/DockerAssert.cs index eeb7be40..74effff8 100644 --- a/test/E2ETest/DockerAssert.cs +++ b/test/E2ETest/DockerAssert.cs @@ -3,6 +3,7 @@ // See the LICENSE file in the project root for more information. using System; +using System.Collections.Generic; using System.CommandLine.Invocation; using System.Linq; using System.Text; @@ -76,6 +77,31 @@ namespace E2ETest } } + public static async Task GetRunningContainersIdsAsync(ITestOutputHelper output) + { + var builder = new StringBuilder(); + + output.WriteLine($"> docker ps --format \"{{{{.ID}}}}\""); + var exitCode = await Process.ExecuteAsync( + "docker", + $"ps --format \"{{{{.ID}}}}\"", + stdOut: OnOutput, + stdErr: OnOutput); + if (exitCode != 0) + { + throw new XunitException($"Running `docker ps` failed." + Environment.NewLine + builder.ToString()); + } + + var lines = builder.ToString().Split(new[] { '\r', '\n', }, StringSplitOptions.RemoveEmptyEntries); + return lines; + + void OnOutput(string text) + { + builder.AppendLine(text); + output.WriteLine(text); + } + } + private static async Task ListDockerImagesIdsAsync(ITestOutputHelper output, string repository) { // docker images -q '{repository}' returns just the ID of the image (one per line) diff --git a/test/E2ETest/TestHelpers.cs b/test/E2ETest/TestHelpers.cs index 8d85b551..4bcd19d2 100644 --- a/test/E2ETest/TestHelpers.cs +++ b/test/E2ETest/TestHelpers.cs @@ -3,16 +3,26 @@ // See the LICENSE file in the project root for more information. using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; +using System.Linq; +using System.Reactive.Linq; using System.Text; -using Microsoft.Tye; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Tye.Hosting; +using Microsoft.Tye.Hosting.Model; using Xunit; +using Xunit.Abstractions; +using Microsoft.Tye; namespace E2ETest { public static class TestHelpers { + private static readonly TimeSpan WaitForServicesTimeout = TimeSpan.FromSeconds(20); + // https://github.com/dotnet/aspnetcore/blob/5a0526dfd991419d5bce0d8ea525b50df2e37b04/src/Testing/src/TestPathUtilities.cs // This can get into a bad pattern for having crazy paths in places. Eventually, especially if we use helix, // we may want to avoid relying on sln position. @@ -64,5 +74,104 @@ namespace E2ETest DirectoryCopy.Copy(GetTestProjectDirectory(projectName).FullName, temp.DirectoryPath); return temp; } + + public static async Task StartHostAndWaitForReplicasToStart(TyeHost host) + { + var startedTask = new TaskCompletionSource(); + var alreadyStarted = 0; + var totalReplicas = host.Application.Services.Sum(s => s.Value.Description.Replicas); + + void OnReplicaChange(ReplicaEvent ev) + { + if (ev.State == ReplicaState.Started) + { + Interlocked.Increment(ref alreadyStarted); + } + else if (ev.State == ReplicaState.Stopped) + { + Interlocked.Decrement(ref alreadyStarted); + } + + if (alreadyStarted == totalReplicas) + { + startedTask.TrySetResult(true); + } + } + + var servicesStateObserver = host.Application.Services.Select(srv => srv.Value.ReplicaEvents.Subscribe(OnReplicaChange)).ToList(); + await host.StartAsync(); + + using var cancellation = new CancellationTokenSource(WaitForServicesTimeout); + try + { + using (cancellation.Token.Register(() => startedTask.TrySetCanceled())) + { + await startedTask.Task; + } + } + catch (TaskCanceledException) + { + await host.StopAsync(); + throw; + } + finally + { + foreach (var observer in servicesStateObserver) + { + observer.Dispose(); + } + } + } + + public static async Task PurgeHostAndWaitForGivenReplicasToStop(TyeHost host, string[] replicas, string tyeDir) + { + static async Task Purge(TyeHost host) + { + var logger = host.DashboardWebApplication!.Logger; + var replicaRegistry = new ReplicaRegistry(host.Application, logger); + var processRunner = new ProcessRunner(logger, replicaRegistry, new ProcessRunnerOptions()); + var dockerRunner = new DockerRunner(logger, replicaRegistry); + + await processRunner.StartAsync(new Application(new FileInfo(host.Application.Source), new Dictionary())); + await dockerRunner.StartAsync(new Application(new FileInfo(host.Application.Source), new Dictionary())); + } + + var stoppedTask = new TaskCompletionSource(); + var remaining = replicas.Length; + + void OnReplicaChange(ReplicaEvent ev) + { + if (replicas.Contains(ev.Replica.Name) && ev.State == ReplicaState.Stopped) + { + Interlocked.Decrement(ref remaining); + } + + if (remaining == 0) + { + stoppedTask.TrySetResult(true); + } + } + + var servicesStateObserver = host.Application.Services.Select(srv => srv.Value.ReplicaEvents.Subscribe(OnReplicaChange)).ToList(); + + // We purge existing replicas by restarting the host which will initiate the purging process + await Purge(host); + + using var cancellation = new CancellationTokenSource(WaitForServicesTimeout); + try + { + using (cancellation.Token.Register(() => stoppedTask.TrySetCanceled())) + { + await stoppedTask.Task; + } + } + finally + { + foreach (var observer in servicesStateObserver) + { + observer.Dispose(); + } + } + } } } diff --git a/test/E2ETest/TyePurgeTests.cs b/test/E2ETest/TyePurgeTests.cs new file mode 100644 index 00000000..d7e883a4 --- /dev/null +++ b/test/E2ETest/TyePurgeTests.cs @@ -0,0 +1,155 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System; +using System.Collections.Generic; +using System.CommandLine; +using System.Diagnostics; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using Microsoft.Tye; +using Microsoft.Tye.ConfigModel; +using Microsoft.Tye.Hosting; +using Microsoft.Tye.Hosting.Model; +using Xunit; +using Xunit.Abstractions; + +namespace E2ETest +{ + public class TyePurgeTests + { + private readonly ITestOutputHelper output; + private readonly TestOutputLogEventSink sink; + + public TyePurgeTests(ITestOutputHelper output) + { + this.output = output; + sink = new TestOutputLogEventSink(output); + } + + [Fact] + public async Task FrontendBackendPurgeTest() + { + var projectDirectory = new DirectoryInfo(Path.Combine(TestHelpers.GetSolutionRootDirectory("tye"), "samples", "frontend-backend")); + using var tempDirectory = TempDirectory.Create(); + DirectoryCopy.Copy(projectDirectory.FullName, tempDirectory.DirectoryPath); + + var projectFile = new FileInfo(Path.Combine(tempDirectory.DirectoryPath, "tye.yaml")); + var tyeDir = new DirectoryInfo(Path.Combine(tempDirectory.DirectoryPath, ".tye")); + var outputContext = new OutputContext(sink, Verbosity.Debug); + var application = await ApplicationFactory.CreateAsync(outputContext, projectFile); + var host = new TyeHost(application.ToHostingApplication(), Array.Empty()) + { + Sink = sink, + }; + + try + { + await TestHelpers.StartHostAndWaitForReplicasToStart(host); + try + { + var pids = GetAllAppPids(host.Application); + + Assert.True(Directory.Exists(tyeDir.FullName)); + Assert.Subset(new HashSet(GetAllPids()), new HashSet(pids)); + + await TestHelpers.PurgeHostAndWaitForGivenReplicasToStop(host, + GetAllReplicasNames(host.Application), tyeDir.FullName); + + var runningPids = new HashSet(GetAllPids()); + Assert.True(pids.All(pid => !runningPids.Contains(pid))); + } + finally + { + await host.StopAsync(); + } + } + finally + { + host.Dispose(); + Assert.False(Directory.Exists(tyeDir.FullName)); + } + } + + [ConditionalFact] + [SkipIfDockerNotRunning] + public async Task MultiProjectPurgeTest() + { + var projectDirectory = new DirectoryInfo(Path.Combine(TestHelpers.GetSolutionRootDirectory("tye"), "samples", "multi-project")); + using var tempDirectory = TempDirectory.Create(); + DirectoryCopy.Copy(projectDirectory.FullName, tempDirectory.DirectoryPath); + + var projectFile = new FileInfo(Path.Combine(tempDirectory.DirectoryPath, "tye.yaml")); + var tyeDir = new DirectoryInfo(Path.Combine(tempDirectory.DirectoryPath, ".tye")); + var outputContext = new OutputContext(sink, Verbosity.Debug); + var application = await ApplicationFactory.CreateAsync(outputContext, projectFile); + var host = new TyeHost(application.ToHostingApplication(), Array.Empty()) + { + Sink = sink, + }; + + try + { + await TestHelpers.StartHostAndWaitForReplicasToStart(host); + try + { + var pids = GetAllAppPids(host.Application); + var containers = GetAllContainerIds(host.Application); + + Assert.True(Directory.Exists(tyeDir.FullName)); + Assert.Subset(new HashSet(GetAllPids()), new HashSet(pids)); + Assert.Subset(new HashSet(await DockerAssert.GetRunningContainersIdsAsync(output)), + new HashSet(containers)); + + await TestHelpers.PurgeHostAndWaitForGivenReplicasToStop(host, + GetAllReplicasNames(host.Application), tyeDir.FullName); + + var runningPids = new HashSet(GetAllPids()); + Assert.True(pids.All(pid => !runningPids.Contains(pid))); + var runningContainers = + new HashSet(await DockerAssert.GetRunningContainersIdsAsync(output)); + Assert.True(containers.All(c => !runningContainers.Contains(c))); + } + finally + { + await host.StopAsync(); + } + } + finally + { + host.Dispose(); + Assert.False(Directory.Exists(tyeDir.FullName)); + } + } + + private string[] GetAllReplicasNames(Microsoft.Tye.Hosting.Model.Application application) + { + var replicas = application.Services.SelectMany(s => s.Value.Replicas); + return replicas.Select(r => r.Value.Name).ToArray(); + } + + private int[] GetAllAppPids(Microsoft.Tye.Hosting.Model.Application application) + { + var replicas = application.Services.SelectMany(s => s.Value.Replicas); + var ids = replicas.Where(r => r.Value is ProcessStatus).Select(r => ((ProcessStatus)r.Value).Pid ?? -1).ToArray(); + + return ids; + } + + private string[] GetAllContainerIds(Microsoft.Tye.Hosting.Model.Application application) + { + var replicas = application.Services.SelectMany(s => s.Value.Replicas); + var ids = replicas.Where(r => r.Value is DockerStatus).Select(r => ((DockerStatus)r.Value).ContainerId!).ToArray(); + + return ids; + } + + private int[] GetAllPids() + { + return Process.GetProcesses().Select(p => p.Id).ToArray(); + } + } +}