175 lines
6.3 KiB
C#
175 lines
6.3 KiB
C#
using Confluent.Kafka;
|
|
using Newtonsoft.Json;
|
|
using System;
|
|
using System.Collections.Generic;
|
|
using System.Linq;
|
|
using System.Reflection;
|
|
using System.Threading;
|
|
using System.Threading.Tasks;
|
|
|
|
namespace franz
|
|
{
|
|
public class Telefranz : IDisposable
|
|
{
|
|
private string handling_group { get; set; } = "Liszt";
|
|
const string namespace_prefix = "silver_messages";
|
|
private silver_messages.global.report howToReport { get; set; }
|
|
private bool shouldListen = false;
|
|
private IProducer<Null, string> producer { get; set; }
|
|
private IConsumer<Ignore, string> consumer { get; set; }
|
|
private static List<Type> allMessageTypes { get; set; } = null;
|
|
|
|
private Task listenTask = null;
|
|
private Dictionary<Type, List<Action<silver_messages.message>>> handlers { get; set; }
|
|
private Dictionary<object, Action<silver_messages.message>> handlerWrappings
|
|
= new Dictionary<object, Action<silver_messages.message>>();
|
|
|
|
public Telefranz(string name, string bootstrap_servers, string rdkafka_location,
|
|
List<string> commands = null, List<string> checks = null,
|
|
List<string> errors = null, List<string> warnings = null)
|
|
{
|
|
Library.Load(rdkafka_location);
|
|
this.handling_group = name;
|
|
this.howToReport = new silver_messages.global.report()
|
|
{
|
|
name = name,
|
|
errors = errors ?? new List<string>(),
|
|
warnings = warnings ?? new List<string>()
|
|
};
|
|
if (commands?.Count > 0)
|
|
{
|
|
howToReport.capabilites.commands = commands;
|
|
}
|
|
if (checks?.Count > 0)
|
|
{
|
|
howToReport.capabilites.checks = checks;
|
|
}
|
|
var producerConfig = new ProducerConfig
|
|
{
|
|
BootstrapServers = bootstrap_servers
|
|
};
|
|
var consumerConfig = new ConsumerConfig
|
|
{
|
|
BootstrapServers = bootstrap_servers,
|
|
GroupId = this.handling_group
|
|
};
|
|
producer = new ProducerBuilder<Null, string>(producerConfig).Build();
|
|
consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
|
|
|
|
staticSetup();
|
|
|
|
handlers = new Dictionary<Type, List<Action<silver_messages.message>>>();
|
|
foreach (var messageType in allMessageTypes)
|
|
{
|
|
handlers.Add(messageType, new List<Action<silver_messages.message>>());
|
|
}
|
|
addHandler<silver_messages.global.sound_off>((m) =>
|
|
{
|
|
ProduceMessage(this.howToReport);
|
|
});
|
|
addHandler<silver_messages.global.stop>((m) =>
|
|
{
|
|
if (m.name == this.handling_group)
|
|
{
|
|
Environment.Exit(0);
|
|
}
|
|
});
|
|
addHandler<silver_messages.global.restart>((m) =>
|
|
{
|
|
if (m.name == this.handling_group)
|
|
{
|
|
throw new NotImplementedException();//TODO
|
|
}
|
|
});
|
|
}
|
|
|
|
public void addHandler<t>(Action<t> theAction) where t : silver_messages.message
|
|
{
|
|
Action<silver_messages.message> wrapped = (m) =>
|
|
{
|
|
theAction(m as t);
|
|
};
|
|
|
|
handlerWrappings.Add(theAction, wrapped);
|
|
if (handlers[typeof(t)].Count == 0)
|
|
{
|
|
var sublist = consumer.Subscription;
|
|
sublist.Add(typeof(t).ToString());
|
|
consumer.Subscribe(sublist);
|
|
}
|
|
handlers[typeof(t)].Add(wrapped);
|
|
}
|
|
public void removeHandler<t>(Action<t> theAction) where t : silver_messages.message
|
|
{
|
|
handlers[typeof(t)].Remove(handlerWrappings[theAction as System.Action]);
|
|
handlerWrappings.Remove(theAction as System.Action);
|
|
|
|
if (handlers[typeof(t)].Count == 0)
|
|
{
|
|
var sublist = consumer.Subscription;
|
|
consumer.Unsubscribe();
|
|
sublist.Remove(typeof(t).ToString());
|
|
consumer.Subscribe(sublist);
|
|
}
|
|
}
|
|
|
|
public void ProduceMessage(silver_messages.message message)
|
|
{
|
|
var msgText = JsonConvert.SerializeObject(message, type: message.GetType(), null);
|
|
producer.Produce(message.GetType().ToString(), new Message<Null, string> { Value = msgText });
|
|
}
|
|
public void StartListening()
|
|
{
|
|
StopListening();
|
|
listenTask = Task.Run(() =>
|
|
{
|
|
shouldListen = true;
|
|
while (shouldListen)
|
|
{
|
|
var cr = consumer.Consume();
|
|
if (cr != null)
|
|
{
|
|
var typeHaver = JsonConvert.DeserializeAnonymousType(cr.Message.Value, new { type = "" });
|
|
var messageType = handlers.Keys.FirstOrDefault(hkey => hkey.ToString() == typeHaver.type && handlers[hkey].Count > 0);
|
|
if (messageType != null)
|
|
{
|
|
var message = JsonConvert.DeserializeObject(cr.Message.Value, messageType);
|
|
foreach (var handling in handlers[messageType])
|
|
{
|
|
handling(message as silver_messages.message);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
public void StopListening()
|
|
{
|
|
shouldListen = false;
|
|
}
|
|
|
|
private static void staticSetup()
|
|
{
|
|
if (allMessageTypes != null)
|
|
{
|
|
return;
|
|
}
|
|
allMessageTypes = new List<Type>();
|
|
|
|
|
|
foreach (var messageType in from t in Assembly.GetExecutingAssembly().GetTypes()
|
|
where t.IsClass && t.Namespace?.StartsWith(namespace_prefix) == true
|
|
select t)
|
|
{
|
|
allMessageTypes.Add(messageType);
|
|
}
|
|
}
|
|
|
|
public void Dispose()
|
|
{
|
|
StopListening();
|
|
producer.Dispose();
|
|
consumer.Dispose();
|
|
}
|
|
}
|
|
} |