franz/franz/Telefranz.cs
2022-12-13 12:34:19 -05:00

208 lines
7.8 KiB
C#

using Confluent.Kafka;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
namespace franz
{
public class Telefranz
{
private class TopicConsumption
{
private string topic { get; set; }
public IConsumer<string, string> topicConsumer { get; set; }
public List<Action<object>> externalSubscribers { get; set; } = new List<Action<object>>();
private CancellationTokenSource cancellationSource { get; set; } = new CancellationTokenSource();
private Task consumptionTask = null;
private System.Type messageType;
public TopicConsumption(System.Type messageType)
{
this.messageType = messageType;
topic = messageType.ToString();
topicConsumer = new ConsumerBuilder<string, string>(Telefranz.config).Build();
}
#pragma warning disable 1998
private async void ConsumeTaskFor(string topic, CancellationToken token)
#pragma warning restore 1998
{
try
{
while (!token.IsCancellationRequested)
{
var cr = topicConsumer.Consume();
var deserialized = JsonConvert.DeserializeObject(cr.Message.Value, messageType);
if (deserialized != null && (deserialized as gray_messages.message).type == deserialized.GetType().ToString())
{
foreach (var waitingAction in externalSubscribers)
{
try
{
waitingAction(deserialized);
}
catch (Exception e)
{
Console.Error.WriteLine("if I don't catch this error the whole thing dies so...");
Console.Error.WriteLine(JsonConvert.SerializeObject(e));
}
}
}
}
}
finally
{
topicConsumer.Close();
cancellationSource.Cancel();
}
}
internal void Unsubscribe(Action<object> wrapped)
{
externalSubscribers.Remove(wrapped);
if (externalSubscribers.Count == 0)
{
topicConsumer.Unsubscribe();
}
}
internal void Subscribe(Action<object> wrapped)
{
if(cancellationSource.IsCancellationRequested)
{
consumptionTask = null;
}
if(consumptionTask == null || consumptionTask.IsCanceled || consumptionTask.IsCompleted)
{
consumptionTask = Task.Run(() => {this.ConsumeTaskFor(topic, cancellationSource.Token);});
topicConsumer.Unsubscribe();
topicConsumer.Subscribe(topic);
}
externalSubscribers.Add(wrapped);
}
}
private static Telefranz instance = null;
private static readonly object createLock = new object();
private string handling_group { get; set; } = "Liszt";
private IProducer<string, string> producer { get; set; }
private gray_messages.global.report howToReport { get; set; }
private Dictionary<object, Action<object>> wrappedEventMap = new Dictionary<object, Action<object>>();
private Dictionary<string, TopicConsumption> Consumptions { get; set; } = new Dictionary<string, TopicConsumption>();
protected static ConsumerConfig config { get; set; }
private Telefranz(string name, string bootstrap_servers,
List<string> commands = null, List<string> checks = null,
List<string> errors = null, List<string> warnings = null)
{
config = new ConsumerConfig
{
BootstrapServers = bootstrap_servers,
GroupId = name,
AutoOffsetReset = AutoOffsetReset.Earliest
};
producer = new ProducerBuilder<string, string>(
new ProducerConfig(){BootstrapServers = config.BootstrapServers}
).Build();
handling_group = name;
this.howToReport = new gray_messages.global.report()
{
name = name,
host = Dns.GetHostName(),
errors = errors ?? new List<string>(),
warnings = warnings ?? new List<string>()
};
if (commands?.Count > 0)
{
howToReport.capabilites.commands = commands;
}
if (checks?.Count > 0)
{
howToReport.capabilites.checks = checks;
}
addHandler<gray_messages.global.sound_off>((m) =>
{
ProduceMessage(this.howToReport);
});
addHandler<gray_messages.global.stop>((m) =>
{
if (m.name == handling_group)
{
Environment.Exit(0);
}
});
}
public static void Configure(string name, string bootstrap_servers,
List<string> commands = null, List<string> checks = null,
List<string> errors = null, List<string> warnings = null)
{
lock (createLock)
{
if (instance == null)
{
instance = new Telefranz(name, bootstrap_servers, commands, checks, errors, warnings);
}
}
}
public static Telefranz Instance
{
get
{
lock (createLock)
{
if (instance == null)
{
throw new NotInitializedException("Configure me first");
}
}
return instance;
}
}
public void addHandler<T>(Action<T> theAction) where T : gray_messages.message, new()
{
var topic = typeof(T).ToString();
if (!Consumptions.ContainsKey(topic))
{
Consumptions[topic] = new TopicConsumption(typeof(T));
}
var wrapped = (object msg) => { theAction(msg as T); };
wrappedEventMap[theAction] = wrapped;
Consumptions[topic].Subscribe(wrapped);
}
public void removeHandler<T>(Action<T> theAction) where T : gray_messages.message
{
var topic = typeof(T).ToString();
var wrapped = wrappedEventMap[theAction];
Consumptions[topic].Unsubscribe(wrapped);
wrappedEventMap.Remove(theAction);
}
public void ProduceMessage<T>(T message) where T : gray_messages.message
{
//Console.WriteLine(message.ToString());
producer.Produce(typeof(T).ToString(), new Message<string, string> { Key = handling_group, Value = message.ToString() },
(deliveryReport) =>
{
if (deliveryReport.Error.Code != ErrorCode.NoError)
{
Console.Error.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
}
});
}
}
public class NotInitializedException : Exception
{
public NotInitializedException(string message) : base(message) { }
}
}