Tye is a tool that makes developing, testing, and deploying microservices and distributed applications easier. Project Tye includes a local orchestrator to make developing microservices easier and the ability to deploy microservices to Kubernetes with min
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

104 lines
4.0 KiB

using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
using Dapper;
using System.Text.Json;
using Npgsql;
using Microsoft.AspNetCore.SignalR;
namespace Worker
{
public class Worker : BackgroundService
{
private readonly ILogger<Worker> _logger;
private readonly IConfiguration _configuration;
private readonly IHubContext<ResultsHub> _hubContext;
public Worker(ILogger<Worker> logger, IConfiguration configuration, IHubContext<ResultsHub> hubContext)
{
_logger = logger;
_configuration = configuration;
_hubContext = hubContext;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Worker started running at: {time}", DateTimeOffset.Now);
try
{
using (var connection = new NpgsqlConnection(_configuration.GetConnectionString("postgres")))
using (var redisConnection = ConnectionMultiplexer.Connect(_configuration.GetConnectionString("redis")))
{
await Task.Delay(1000);
connection.Open();
var redis = redisConnection.GetDatabase();
await CreateTable(connection);
while (!stoppingToken.IsCancellationRequested)
{
try
{
var data = await redis.ListRightPopAsync("votes");
if (data.HasValue)
{
var vote = JsonSerializer.Deserialize<Vote>(data);
_logger.LogInformation($"Got new vote from redis {vote.vote} {vote.voterId}");
var command = @"INSERT INTO votes (Id, Vote) VALUES (@voterId, @vote)
ON CONFLICT (Id)
DO UPDATE SET Vote = @vote";
await connection.ExecuteAsync(command, vote);
var newResults = await connection.QueryAsync<VoteCount>("SELECT Vote, COUNT(Id) AS Count FROM votes GROUP BY Vote ORDER BY Vote");
_logger.LogInformation("Wrote results to postgres, calling other clients with updated results.");
await _hubContext.Clients.All.SendAsync("votesRecieved", newResults);
}
else
{
var newResults = await connection.QueryAsync<VoteCount>("SELECT Vote, COUNT(Id) AS Count FROM votes GROUP BY Vote ORDER BY Vote");
await _hubContext.Clients.All.SendAsync("votesRecieved", newResults);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "error processing vote.");
}
await Task.Delay(100, stoppingToken);
}
}
}
catch(Exception ex)
{
_logger.LogError(ex, "Exception starting worker");
}
}
private class Vote
{
public Guid voterId { get; set; }
public string vote { get; set; }
}
private async Task CreateTable(IDbConnection connection)
{
await connection.ExecuteAsync(@"CREATE TABLE IF NOT EXISTS votes (
Id VARCHAR(255) NOT NULL UNIQUE,
Vote VARCHAR(255) NOT NULL);");
}
}
}