Browse Source

Purging Hanging Replicas (On Startup and via Command) (#155)

* Purge hanging replicas
* Use dictionary for event serialization
* Subscribing to replica events instead of waiting for arbitrary time in tests
* Only return list of running containers from DockerAssert
* use IDictionary instead of deserializing to ReplicaStatus when removing stale replicas
* Exit logs loop in DockerRunner if container is killed
pull/242/head
areller 6 years ago
committed by GitHub
parent
commit
42bcc8a454
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      src/Microsoft.Tye.Core/ProcessUtil.cs
  2. 42
      src/Microsoft.Tye.Hosting/DockerRunner.cs
  3. 37
      src/Microsoft.Tye.Hosting/ProcessRunner.cs
  4. 104
      src/Microsoft.Tye.Hosting/ReplicaRegistry.cs
  5. 13
      src/Microsoft.Tye.Hosting/TyeHost.cs
  6. 26
      test/E2ETest/DockerAssert.cs
  7. 111
      test/E2ETest/TestHelpers.cs
  8. 155
      test/E2ETest/TyePurgeTests.cs

10
src/Microsoft.Tye.Core/ProcessUtil.cs

@ -139,5 +139,15 @@ namespace Microsoft.Tye
return await processLifetimeTask.Task;
}
public static void KillProcess(int pid)
{
try
{
Process.GetProcessById(pid)?.Kill();
}
catch (ArgumentException) { }
catch (InvalidOperationException) { }
}
}
}

42
src/Microsoft.Tye.Hosting/DockerRunner.cs

@ -15,16 +15,23 @@ namespace Microsoft.Tye.Hosting
{
public class DockerRunner : IApplicationProcessor
{
private const string DOCKER_REPLICA_STORE = "docker";
private static readonly TimeSpan DockerStopTimeout = TimeSpan.FromSeconds(30);
private readonly ILogger _logger;
public DockerRunner(ILogger logger)
private readonly ReplicaRegistry _replicaRegistry;
public DockerRunner(ILogger logger, ReplicaRegistry replicaRegistry)
{
_logger = logger;
_replicaRegistry = replicaRegistry;
}
public Task StartAsync(Application application)
public async Task StartAsync(Application application)
{
await PurgeFromPreviousRun();
var tasks = new Task[application.Services.Count];
var index = 0;
foreach (var s in application.Services)
@ -32,7 +39,7 @@ namespace Microsoft.Tye.Hosting
tasks[index++] = s.Value.Description.RunInfo is DockerRunInfo docker ? StartContainerAsync(application, s.Value, docker) : Task.CompletedTask;
}
return Task.WhenAll(tasks);
await Task.WhenAll(tasks);
}
public Task StopAsync(Application application)
@ -157,6 +164,7 @@ namespace Microsoft.Tye.Hosting
status.DockerCommand = command;
WriteReplicaToStore(replica);
var result = await ProcessUtil.RunAsync(
"docker",
command,
@ -199,12 +207,17 @@ namespace Microsoft.Tye.Hosting
while (!dockerInfo.StoppingTokenSource.Token.IsCancellationRequested)
{
await ProcessUtil.RunAsync("docker", $"logs -f {containerId}",
var logsRes = await ProcessUtil.RunAsync("docker", $"logs -f {containerId}",
outputDataReceived: data => service.Logs.OnNext($"[{replica}]: {data}"),
errorDataReceived: data => service.Logs.OnNext($"[{replica}]: {data}"),
throwOnError: false,
cancellationToken: dockerInfo.StoppingTokenSource.Token);
if (logsRes.ExitCode != 0)
{
break;
}
if (!dockerInfo.StoppingTokenSource.IsCancellationRequested)
{
try
@ -286,6 +299,27 @@ namespace Microsoft.Tye.Hosting
service.Items[typeof(DockerInformation)] = dockerInfo;
}
private async Task PurgeFromPreviousRun()
{
var dockerReplicas = await _replicaRegistry.GetEvents(DOCKER_REPLICA_STORE);
foreach (var replica in dockerReplicas)
{
var container = replica["container"];
await ProcessUtil.RunAsync("docker", $"rm -f {container}", throwOnError: false);
_logger.LogInformation("removed contaienr {container} from previous run", container);
}
_replicaRegistry.DeleteStore(DOCKER_REPLICA_STORE);
}
private void WriteReplicaToStore(string container)
{
_replicaRegistry.WriteReplicaEvent(DOCKER_REPLICA_STORE, new Dictionary<string, string>()
{
["container"] = container
});
}
private static void PrintStdOutAndErr(Service service, string replica, ProcessResult result)
{
if (result.ExitCode != 0)

37
src/Microsoft.Tye.Hosting/ProcessRunner.cs

@ -17,17 +17,24 @@ namespace Microsoft.Tye.Hosting
{
public class ProcessRunner : IApplicationProcessor
{
private const string PROCESS_REPLICA_STORE = "process";
private readonly ILogger _logger;
private readonly ProcessRunnerOptions _options;
public ProcessRunner(ILogger logger, ProcessRunnerOptions options)
private readonly ReplicaRegistry _replicaRegistry;
public ProcessRunner(ILogger logger, ReplicaRegistry replicaRegistry, ProcessRunnerOptions options)
{
_logger = logger;
_replicaRegistry = replicaRegistry;
_options = options;
}
public Task StartAsync(Application application)
public async Task StartAsync(Application application)
{
await PurgeFromPreviousRun();
var tasks = new Task[application.Services.Count];
var index = 0;
foreach (var s in application.Services)
@ -41,7 +48,7 @@ namespace Microsoft.Tye.Hosting
};
}
return Task.WhenAll(tasks);
await Task.WhenAll(tasks);
}
public Task StopAsync(Application application)
@ -205,6 +212,7 @@ namespace Microsoft.Tye.Hosting
status.Pid = pid;
WriteReplicaToStore(pid.ToString());
service.ReplicaEvents.OnNext(new ReplicaEvent(ReplicaState.Started, status));
},
throwOnError: false,
@ -299,6 +307,29 @@ namespace Microsoft.Tye.Hosting
return Task.WhenAll(tasks);
}
private async Task PurgeFromPreviousRun()
{
var processReplicas = await _replicaRegistry.GetEvents(PROCESS_REPLICA_STORE);
foreach (var replica in processReplicas)
{
if (int.TryParse(replica["pid"], out var pid))
{
ProcessUtil.KillProcess(pid);
_logger.LogInformation("removed process {pid} from previous run", pid);
}
}
_replicaRegistry.DeleteStore(PROCESS_REPLICA_STORE);
}
private void WriteReplicaToStore(string pid)
{
_replicaRegistry.WriteReplicaEvent(PROCESS_REPLICA_STORE, new Dictionary<string, string>()
{
["pid"] = pid
});
}
private static string? GetDotnetRoot()
{
var process = Process.GetCurrentProcess();

104
src/Microsoft.Tye.Hosting/ReplicaRegistry.cs

@ -0,0 +1,104 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text.Json;
using System.Threading.Tasks;
using Microsoft.Tye.Hosting.Model;
using Microsoft.Extensions.Logging;
using System.Threading;
using System.Collections.Concurrent;
namespace Microsoft.Tye.Hosting
{
public class ReplicaRegistry : IDisposable
{
private readonly ILogger _logger;
private readonly ConcurrentDictionary<string, SemaphoreSlim> _fileWriteSemaphores;
private readonly string _tyeFolderPath;
public ReplicaRegistry(Model.Application application, ILogger logger)
{
_logger = logger;
_fileWriteSemaphores = new ConcurrentDictionary<string, SemaphoreSlim>();
_tyeFolderPath = Path.Join(Path.GetDirectoryName(application.Source), ".tye");
if (!Directory.Exists(_tyeFolderPath))
{
Directory.CreateDirectory(_tyeFolderPath);
}
}
public bool WriteReplicaEvent(string storeName, IDictionary<string, string> replicaRecord)
{
var filePath = Path.Join(_tyeFolderPath, GetStoreFile(storeName));
var contents = JsonSerializer.Serialize(replicaRecord, new JsonSerializerOptions { WriteIndented = false });
var semaphore = GetSempahoreForStore(storeName);
semaphore.Wait();
try
{
File.AppendAllText(filePath, contents + Environment.NewLine);
return true;
}
catch (DirectoryNotFoundException ex)
{
_logger.LogWarning(ex, "tye folder is not found. file: {file}", filePath);
return false;
}
finally
{
semaphore.Release();
}
}
public bool DeleteStore(string storeName)
{
var filePath = Path.Join(_tyeFolderPath, GetStoreFile(storeName));
try
{
File.Delete(storeName);
return true;
}
catch (DirectoryNotFoundException ex)
{
_logger.LogWarning(ex, "tye folder is not found. file: {file}", filePath);
return false;
}
}
public async ValueTask<IList<IDictionary<string, string>>> GetEvents(string storeName)
{
var filePath = Path.Join(_tyeFolderPath, GetStoreFile(storeName));
if (!File.Exists(filePath))
{
return Array.Empty<IDictionary<string, string>>();
}
var contents = await File.ReadAllTextAsync(filePath);
var events = contents.Split(Environment.NewLine);
return events.Where(e => !string.IsNullOrEmpty(e.Trim()))
.Select(e => JsonSerializer.Deserialize<IDictionary<string, string>>(e))
.ToList();
}
private SemaphoreSlim GetSempahoreForStore(string storeName)
{
return _fileWriteSemaphores.GetOrAdd(storeName, _ => new SemaphoreSlim(1, 1));
}
private string GetStoreFile(string storeName) => $"{storeName}_store";
public void Dispose()
{
Directory.Delete(_tyeFolderPath, true);
}
}
}

13
src/Microsoft.Tye.Hosting/TyeHost.cs

@ -37,6 +37,8 @@ namespace Microsoft.Tye.Hosting
private readonly string[] _args;
private readonly string[] _servicesToDebug;
private ReplicaRegistry? _replicaRegistry;
public TyeHost(Application application, string[] args)
: this(application, args, new string[0])
{
@ -81,7 +83,9 @@ namespace Microsoft.Tye.Hosting
var configuration = app.Configuration;
_processor = CreateApplicationProcessor(_args, _servicesToDebug, _logger, configuration);
_replicaRegistry = new ReplicaRegistry(_application, _logger);
_processor = CreateApplicationProcessor(_replicaRegistry, _args, _servicesToDebug, _logger, configuration);
await app.StartAsync();
@ -262,7 +266,7 @@ namespace Microsoft.Tye.Hosting
return false;
}
private static AggregateApplicationProcessor CreateApplicationProcessor(string[] args, string[] servicesToDebug, Microsoft.Extensions.Logging.ILogger logger, IConfiguration configuration)
private static AggregateApplicationProcessor CreateApplicationProcessor(ReplicaRegistry replicaRegistry, string[] args, string[] servicesToDebug, Microsoft.Extensions.Logging.ILogger logger, IConfiguration configuration)
{
var diagnosticOptions = DiagnosticOptions.FromConfiguration(configuration);
var diagnosticsCollector = new DiagnosticsCollector(logger, diagnosticOptions);
@ -276,8 +280,8 @@ namespace Microsoft.Tye.Hosting
new PortAssigner(logger),
new ProxyService(logger),
new HttpProxyService(logger),
new DockerRunner(logger),
new ProcessRunner(logger, ProcessRunnerOptions.FromArgs(args, servicesToDebug)),
new DockerRunner(logger, replicaRegistry),
new ProcessRunner(logger, replicaRegistry, ProcessRunnerOptions.FromArgs(args, servicesToDebug))
};
// If the docker command is specified then transform the ProjectRunInfo into DockerRunInfo
@ -291,6 +295,7 @@ namespace Microsoft.Tye.Hosting
public void Dispose()
{
_replicaRegistry?.Dispose();
DashboardWebApplication?.Dispose();
}
}

26
test/E2ETest/DockerAssert.cs

@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Generic;
using System.CommandLine.Invocation;
using System.Linq;
using System.Text;
@ -76,6 +77,31 @@ namespace E2ETest
}
}
public static async Task<string[]> GetRunningContainersIdsAsync(ITestOutputHelper output)
{
var builder = new StringBuilder();
output.WriteLine($"> docker ps --format \"{{{{.ID}}}}\"");
var exitCode = await Process.ExecuteAsync(
"docker",
$"ps --format \"{{{{.ID}}}}\"",
stdOut: OnOutput,
stdErr: OnOutput);
if (exitCode != 0)
{
throw new XunitException($"Running `docker ps` failed." + Environment.NewLine + builder.ToString());
}
var lines = builder.ToString().Split(new[] { '\r', '\n', }, StringSplitOptions.RemoveEmptyEntries);
return lines;
void OnOutput(string text)
{
builder.AppendLine(text);
output.WriteLine(text);
}
}
private static async Task<string[]> ListDockerImagesIdsAsync(ITestOutputHelper output, string repository)
{
// docker images -q '{repository}' returns just the ID of the image (one per line)

111
test/E2ETest/TestHelpers.cs

@ -3,16 +3,26 @@
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Reactive.Linq;
using System.Text;
using Microsoft.Tye;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Tye.Hosting;
using Microsoft.Tye.Hosting.Model;
using Xunit;
using Xunit.Abstractions;
using Microsoft.Tye;
namespace E2ETest
{
public static class TestHelpers
{
private static readonly TimeSpan WaitForServicesTimeout = TimeSpan.FromSeconds(20);
// https://github.com/dotnet/aspnetcore/blob/5a0526dfd991419d5bce0d8ea525b50df2e37b04/src/Testing/src/TestPathUtilities.cs
// This can get into a bad pattern for having crazy paths in places. Eventually, especially if we use helix,
// we may want to avoid relying on sln position.
@ -64,5 +74,104 @@ namespace E2ETest
DirectoryCopy.Copy(GetTestProjectDirectory(projectName).FullName, temp.DirectoryPath);
return temp;
}
public static async Task StartHostAndWaitForReplicasToStart(TyeHost host)
{
var startedTask = new TaskCompletionSource<bool>();
var alreadyStarted = 0;
var totalReplicas = host.Application.Services.Sum(s => s.Value.Description.Replicas);
void OnReplicaChange(ReplicaEvent ev)
{
if (ev.State == ReplicaState.Started)
{
Interlocked.Increment(ref alreadyStarted);
}
else if (ev.State == ReplicaState.Stopped)
{
Interlocked.Decrement(ref alreadyStarted);
}
if (alreadyStarted == totalReplicas)
{
startedTask.TrySetResult(true);
}
}
var servicesStateObserver = host.Application.Services.Select(srv => srv.Value.ReplicaEvents.Subscribe(OnReplicaChange)).ToList();
await host.StartAsync();
using var cancellation = new CancellationTokenSource(WaitForServicesTimeout);
try
{
using (cancellation.Token.Register(() => startedTask.TrySetCanceled()))
{
await startedTask.Task;
}
}
catch (TaskCanceledException)
{
await host.StopAsync();
throw;
}
finally
{
foreach (var observer in servicesStateObserver)
{
observer.Dispose();
}
}
}
public static async Task PurgeHostAndWaitForGivenReplicasToStop(TyeHost host, string[] replicas, string tyeDir)
{
static async Task Purge(TyeHost host)
{
var logger = host.DashboardWebApplication!.Logger;
var replicaRegistry = new ReplicaRegistry(host.Application, logger);
var processRunner = new ProcessRunner(logger, replicaRegistry, new ProcessRunnerOptions());
var dockerRunner = new DockerRunner(logger, replicaRegistry);
await processRunner.StartAsync(new Application(new FileInfo(host.Application.Source), new Dictionary<string, Service>()));
await dockerRunner.StartAsync(new Application(new FileInfo(host.Application.Source), new Dictionary<string, Service>()));
}
var stoppedTask = new TaskCompletionSource<bool>();
var remaining = replicas.Length;
void OnReplicaChange(ReplicaEvent ev)
{
if (replicas.Contains(ev.Replica.Name) && ev.State == ReplicaState.Stopped)
{
Interlocked.Decrement(ref remaining);
}
if (remaining == 0)
{
stoppedTask.TrySetResult(true);
}
}
var servicesStateObserver = host.Application.Services.Select(srv => srv.Value.ReplicaEvents.Subscribe(OnReplicaChange)).ToList();
// We purge existing replicas by restarting the host which will initiate the purging process
await Purge(host);
using var cancellation = new CancellationTokenSource(WaitForServicesTimeout);
try
{
using (cancellation.Token.Register(() => stoppedTask.TrySetCanceled()))
{
await stoppedTask.Task;
}
}
finally
{
foreach (var observer in servicesStateObserver)
{
observer.Dispose();
}
}
}
}
}

155
test/E2ETest/TyePurgeTests.cs

@ -0,0 +1,155 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
using System;
using System.Collections.Generic;
using System.CommandLine;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Tye;
using Microsoft.Tye.ConfigModel;
using Microsoft.Tye.Hosting;
using Microsoft.Tye.Hosting.Model;
using Xunit;
using Xunit.Abstractions;
namespace E2ETest
{
public class TyePurgeTests
{
private readonly ITestOutputHelper output;
private readonly TestOutputLogEventSink sink;
public TyePurgeTests(ITestOutputHelper output)
{
this.output = output;
sink = new TestOutputLogEventSink(output);
}
[Fact]
public async Task FrontendBackendPurgeTest()
{
var projectDirectory = new DirectoryInfo(Path.Combine(TestHelpers.GetSolutionRootDirectory("tye"), "samples", "frontend-backend"));
using var tempDirectory = TempDirectory.Create();
DirectoryCopy.Copy(projectDirectory.FullName, tempDirectory.DirectoryPath);
var projectFile = new FileInfo(Path.Combine(tempDirectory.DirectoryPath, "tye.yaml"));
var tyeDir = new DirectoryInfo(Path.Combine(tempDirectory.DirectoryPath, ".tye"));
var outputContext = new OutputContext(sink, Verbosity.Debug);
var application = await ApplicationFactory.CreateAsync(outputContext, projectFile);
var host = new TyeHost(application.ToHostingApplication(), Array.Empty<string>())
{
Sink = sink,
};
try
{
await TestHelpers.StartHostAndWaitForReplicasToStart(host);
try
{
var pids = GetAllAppPids(host.Application);
Assert.True(Directory.Exists(tyeDir.FullName));
Assert.Subset(new HashSet<int>(GetAllPids()), new HashSet<int>(pids));
await TestHelpers.PurgeHostAndWaitForGivenReplicasToStop(host,
GetAllReplicasNames(host.Application), tyeDir.FullName);
var runningPids = new HashSet<int>(GetAllPids());
Assert.True(pids.All(pid => !runningPids.Contains(pid)));
}
finally
{
await host.StopAsync();
}
}
finally
{
host.Dispose();
Assert.False(Directory.Exists(tyeDir.FullName));
}
}
[ConditionalFact]
[SkipIfDockerNotRunning]
public async Task MultiProjectPurgeTest()
{
var projectDirectory = new DirectoryInfo(Path.Combine(TestHelpers.GetSolutionRootDirectory("tye"), "samples", "multi-project"));
using var tempDirectory = TempDirectory.Create();
DirectoryCopy.Copy(projectDirectory.FullName, tempDirectory.DirectoryPath);
var projectFile = new FileInfo(Path.Combine(tempDirectory.DirectoryPath, "tye.yaml"));
var tyeDir = new DirectoryInfo(Path.Combine(tempDirectory.DirectoryPath, ".tye"));
var outputContext = new OutputContext(sink, Verbosity.Debug);
var application = await ApplicationFactory.CreateAsync(outputContext, projectFile);
var host = new TyeHost(application.ToHostingApplication(), Array.Empty<string>())
{
Sink = sink,
};
try
{
await TestHelpers.StartHostAndWaitForReplicasToStart(host);
try
{
var pids = GetAllAppPids(host.Application);
var containers = GetAllContainerIds(host.Application);
Assert.True(Directory.Exists(tyeDir.FullName));
Assert.Subset(new HashSet<int>(GetAllPids()), new HashSet<int>(pids));
Assert.Subset(new HashSet<string>(await DockerAssert.GetRunningContainersIdsAsync(output)),
new HashSet<string>(containers));
await TestHelpers.PurgeHostAndWaitForGivenReplicasToStop(host,
GetAllReplicasNames(host.Application), tyeDir.FullName);
var runningPids = new HashSet<int>(GetAllPids());
Assert.True(pids.All(pid => !runningPids.Contains(pid)));
var runningContainers =
new HashSet<string>(await DockerAssert.GetRunningContainersIdsAsync(output));
Assert.True(containers.All(c => !runningContainers.Contains(c)));
}
finally
{
await host.StopAsync();
}
}
finally
{
host.Dispose();
Assert.False(Directory.Exists(tyeDir.FullName));
}
}
private string[] GetAllReplicasNames(Microsoft.Tye.Hosting.Model.Application application)
{
var replicas = application.Services.SelectMany(s => s.Value.Replicas);
return replicas.Select(r => r.Value.Name).ToArray();
}
private int[] GetAllAppPids(Microsoft.Tye.Hosting.Model.Application application)
{
var replicas = application.Services.SelectMany(s => s.Value.Replicas);
var ids = replicas.Where(r => r.Value is ProcessStatus).Select(r => ((ProcessStatus)r.Value).Pid ?? -1).ToArray();
return ids;
}
private string[] GetAllContainerIds(Microsoft.Tye.Hosting.Model.Application application)
{
var replicas = application.Services.SelectMany(s => s.Value.Replicas);
var ids = replicas.Where(r => r.Value is DockerStatus).Select(r => ((DockerStatus)r.Value).ContainerId!).ToArray();
return ids;
}
private int[] GetAllPids()
{
return Process.GetProcesses().Select(p => p.Id).ToArray();
}
}
}
Loading…
Cancel
Save