diff --git a/franz.tests/UnitTest1.cs b/franz.tests/UnitTest1.cs index 3d20ed1..1d91dd4 100644 --- a/franz.tests/UnitTest1.cs +++ b/franz.tests/UnitTest1.cs @@ -14,8 +14,8 @@ namespace franz.tests [SetUp] public void Setup() { - f1 = new Telefranz("libfranztest1", "focalor:9092", "/usr/lib/librdkafka.so"); - f2 = new Telefranz("libfranztest2", "focalor:9092", "/usr/lib/librdkafka.so"); + f1 = new Telefranz("libfranztest1", "focalor:9092"); + f2 = new Telefranz("libfranztest2", "focalor:9092"); } [Test] @@ -40,8 +40,6 @@ namespace franz.tests } } }); - f1.StartListening(); - f2.StartListening(); await Task.Delay(2000); f1.ProduceMessage(new silver_messages.global.sound_off()); Task.WaitAny( diff --git a/franz/Telefranz.cs b/franz/Telefranz.cs index f2edd67..e02ce9a 100644 --- a/franz/Telefranz.cs +++ b/franz/Telefranz.cs @@ -1,4 +1,5 @@ -using Confluent.Kafka; +using Kafka.Public; +using Kafka.Public.Loggers; using Newtonsoft.Json; using System; using System.Collections.Generic; @@ -9,26 +10,28 @@ using System.Threading.Tasks; namespace franz { - public class Telefranz : IDisposable + public class Telefranz { + static class _TelefranzConsumers where T : silver_messages.message, new() { + internal static KafkaConsumer consumer = null; + internal static readonly Dictionary, Action>> wrappings + = new Dictionary, Action>>(); + internal static uint references = 0; + } private string handling_group { get; set; } = "Liszt"; const string namespace_prefix = "silver_messages"; private silver_messages.global.report howToReport { get; set; } private bool shouldListen = false; - private IProducer producer { get; set; } - private IConsumer consumer { get; set; } - private static List allMessageTypes { get; set; } = null; - + private ClusterClient clusterClient; private Task listenTask = null; - private Dictionary>> handlers { get; set; } - private Dictionary> handlerWrappings - = new Dictionary>(); + private List listeningConsumers = new List(); - public Telefranz(string name, string bootstrap_servers, string rdkafka_location, + public Telefranz(string name, string bootstrap_servers, List commands = null, List checks = null, List errors = null, List warnings = null) { - Library.Load(rdkafka_location); + clusterClient = new ClusterClient(new Kafka.Public.Configuration { Seeds = bootstrap_servers, ClientId = name }, new ConsoleLogger()); + this.handling_group = name; this.howToReport = new silver_messages.global.report() { @@ -44,25 +47,7 @@ namespace franz { howToReport.capabilites.checks = checks; } - var producerConfig = new ProducerConfig - { - BootstrapServers = bootstrap_servers - }; - var consumerConfig = new ConsumerConfig - { - BootstrapServers = bootstrap_servers, - GroupId = this.handling_group - }; - producer = new ProducerBuilder(producerConfig).Build(); - consumer = new ConsumerBuilder(consumerConfig).Build(); - staticSetup(); - - handlers = new Dictionary>>(); - foreach (var messageType in allMessageTypes) - { - handlers.Add(messageType, new List>()); - } addHandler((m) => { ProduceMessage(this.howToReport); @@ -83,93 +68,44 @@ namespace franz }); } - public void addHandler(Action theAction) where t : silver_messages.message + public void addHandler(Action theAction) where T : silver_messages.message, new() { - Action wrapped = (m) => + var dummy = new T(); + if(_TelefranzConsumers.consumer == null) { - theAction(m as t); + _TelefranzConsumers.consumer = new KafkaConsumer(dummy.topic, clusterClient); + } + + Action> wrapped = (kr) => + { + theAction(kr.Value); }; - - handlerWrappings.Add(theAction, wrapped); - if (handlers[typeof(t)].Count == 0) + _TelefranzConsumers.consumer.MessageReceived += wrapped; + _TelefranzConsumers.wrappings[theAction] = wrapped; + _TelefranzConsumers.references++; + if(_TelefranzConsumers.references == 1) { - var sublist = consumer.Subscription; - sublist.Add(typeof(t).ToString()); - consumer.Subscribe(sublist); + _TelefranzConsumers.consumer.Subscribe(this.handling_group, + new List(){dummy.topic}, + new ConsumerGroupConfiguration()); + listeningConsumers.Add(_TelefranzConsumers.consumer); + _TelefranzConsumers.consumer.ConsumeFromLatest(); } - handlers[typeof(t)].Add(wrapped); } - public void removeHandler(Action theAction) where t : silver_messages.message + public void removeHandler(Action theAction) where T : silver_messages.message, new() { - handlers[typeof(t)].Remove(handlerWrappings[theAction as System.Action]); - handlerWrappings.Remove(theAction as System.Action); - - if (handlers[typeof(t)].Count == 0) + _TelefranzConsumers.consumer.MessageReceived -= _TelefranzConsumers.wrappings[theAction]; + _TelefranzConsumers.wrappings.Remove(theAction); + _TelefranzConsumers.references--; + if(_TelefranzConsumers.references == 0) { - var sublist = consumer.Subscription; - consumer.Unsubscribe(); - sublist.Remove(typeof(t).ToString()); - consumer.Subscribe(sublist); + _TelefranzConsumers.consumer.StopConsume(); } } - public void ProduceMessage(silver_messages.message message) + public void ProduceMessage(T message) where T : silver_messages.message { - var msgText = JsonConvert.SerializeObject(message, type: message.GetType(), null); - producer.Produce(message.GetType().ToString(), new Message { Value = msgText }); - } - public void StartListening() - { - StopListening(); - listenTask = Task.Run(() => - { - shouldListen = true; - while (shouldListen) - { - var cr = consumer.Consume(); - if (cr != null) - { - var typeHaver = JsonConvert.DeserializeAnonymousType(cr.Message.Value, new { type = "" }); - var messageType = handlers.Keys.FirstOrDefault(hkey => hkey.ToString() == typeHaver.type && handlers[hkey].Count > 0); - if (messageType != null) - { - var message = JsonConvert.DeserializeObject(cr.Message.Value, messageType); - foreach (var handling in handlers[messageType]) - { - handling(message as silver_messages.message); - } - } - } - } - }); - } - public void StopListening() - { - shouldListen = false; - } - - private static void staticSetup() - { - if (allMessageTypes != null) - { - return; - } - allMessageTypes = new List(); - - - foreach (var messageType in from t in Assembly.GetExecutingAssembly().GetTypes() - where t.IsClass && t.Namespace?.StartsWith(namespace_prefix) == true - select t) - { - allMessageTypes.Add(messageType); - } - } - - public void Dispose() - { - StopListening(); - producer.Dispose(); - consumer.Dispose(); + clusterClient.Produce(message.topic, message); } } } \ No newline at end of file diff --git a/franz/franz.csproj b/franz/franz.csproj index bc37bbe..546eec7 100644 --- a/franz/franz.csproj +++ b/franz/franz.csproj @@ -9,7 +9,7 @@ - + diff --git a/franz/silver_messages/directorial/check_complete.cs b/franz/silver_messages/directorial/check_complete.cs index 845a192..2a97660 100644 --- a/franz/silver_messages/directorial/check_complete.cs +++ b/franz/silver_messages/directorial/check_complete.cs @@ -4,7 +4,7 @@ using silver_messages; namespace silver_messages.directorial { - public class check_complete : message + public class check_complete : message_directorial { public string check { get; set; } public string result { get; set; } diff --git a/franz/silver_messages/directorial/command_completed.cs b/franz/silver_messages/directorial/command_completed.cs index f43f741..fe88c36 100644 --- a/franz/silver_messages/directorial/command_completed.cs +++ b/franz/silver_messages/directorial/command_completed.cs @@ -4,7 +4,7 @@ using silver_messages; namespace silver_messages.directorial { - public class command_completed : message + public class command_completed : message_directorial { //name of the command that was called public string command { get; set; } diff --git a/franz/silver_messages/directorial/command_error.cs b/franz/silver_messages/directorial/command_error.cs index ace432f..35d396a 100644 --- a/franz/silver_messages/directorial/command_error.cs +++ b/franz/silver_messages/directorial/command_error.cs @@ -3,7 +3,7 @@ using System.Collections.Generic; using silver_messages; namespace silver_messages.directorial { - public class command_error : message + public class command_error : message_directorial { //name of the command that was called public string command { get; set; } diff --git a/franz/silver_messages/directorial/command_expired.cs b/franz/silver_messages/directorial/command_expired.cs index 3bf7dd3..e406cfd 100644 --- a/franz/silver_messages/directorial/command_expired.cs +++ b/franz/silver_messages/directorial/command_expired.cs @@ -3,7 +3,7 @@ using System.Collections.Generic; using silver_messages; namespace silver_messages.directorial { - public class command_expired : message + public class command_expired : message_directorial { //name of the command that was called public string command { get; set; } diff --git a/franz/silver_messages/directorial/command_output.cs b/franz/silver_messages/directorial/command_output.cs index 7d67258..4724a15 100644 --- a/franz/silver_messages/directorial/command_output.cs +++ b/franz/silver_messages/directorial/command_output.cs @@ -3,7 +3,7 @@ using System.Collections.Generic; using silver_messages; namespace silver_messages.directorial { - public class command_output : message + public class command_output : message_directorial { //name of the command that was called public string command { get; set; } diff --git a/franz/silver_messages/directorial/execute_check.cs b/franz/silver_messages/directorial/execute_check.cs index 93eccb1..98d2777 100644 --- a/franz/silver_messages/directorial/execute_check.cs +++ b/franz/silver_messages/directorial/execute_check.cs @@ -7,7 +7,7 @@ namespace silver_messages.directorial /* * run it and send a check_complete */ - public class execute_check : message + public class execute_check : message_directorial { public string check { get; set; } public List args { get; set; } = new List(); diff --git a/franz/silver_messages/directorial/execute_command.cs b/franz/silver_messages/directorial/execute_command.cs index 125afe8..c1bf557 100644 --- a/franz/silver_messages/directorial/execute_command.cs +++ b/franz/silver_messages/directorial/execute_command.cs @@ -12,7 +12,7 @@ namespace silver_messages.directorial * command_error as you get them, and if it ends some other way, send command_completed * with empty stdout and stderr. */ - public class execute_command : message + public class execute_command : message_directorial { public string command { get; set; } public List args { get; set; } = new List(); diff --git a/franz/silver_messages/directorial/message_directorial.cs b/franz/silver_messages/directorial/message_directorial.cs new file mode 100644 index 0000000..871a3cd --- /dev/null +++ b/franz/silver_messages/directorial/message_directorial.cs @@ -0,0 +1,7 @@ +namespace silver_messages +{ + public abstract class message_directorial : silver_messages.message + { + public override string topic => "silver_messages.directorial"; + } +} \ No newline at end of file diff --git a/franz/silver_messages/global/message_global.cs b/franz/silver_messages/global/message_global.cs new file mode 100644 index 0000000..746c109 --- /dev/null +++ b/franz/silver_messages/global/message_global.cs @@ -0,0 +1,7 @@ +namespace silver_messages +{ + public class message_global : silver_messages.message + { + public override string topic => "silver_messages.global"; + } +} \ No newline at end of file diff --git a/franz/silver_messages/global/report.cs b/franz/silver_messages/global/report.cs index ba7a0b6..a15f610 100644 --- a/franz/silver_messages/global/report.cs +++ b/franz/silver_messages/global/report.cs @@ -2,7 +2,7 @@ using System; using System.Collections.Generic; namespace silver_messages.global { - public class report : message + public class report : message_global { //your name public string name { get; set; } diff --git a/franz/silver_messages/global/restart.cs b/franz/silver_messages/global/restart.cs index abb914e..ff4f8e2 100644 --- a/franz/silver_messages/global/restart.cs +++ b/franz/silver_messages/global/restart.cs @@ -4,7 +4,7 @@ using silver_messages; namespace silver_messages.global { - public class restart : message + public class restart : message_global { //check if this is you (i.e., your handling group). If so, restart. Yourself, not the hardware. public string name { get; set; } diff --git a/franz/silver_messages/global/sound_off.cs b/franz/silver_messages/global/sound_off.cs index d8529cf..38a0681 100644 --- a/franz/silver_messages/global/sound_off.cs +++ b/franz/silver_messages/global/sound_off.cs @@ -5,6 +5,6 @@ using silver_messages; namespace silver_messages.global { //if you receive this, respond with a report - public class sound_off : message { } + public class sound_off : message_global { } } diff --git a/franz/silver_messages/global/stop.cs b/franz/silver_messages/global/stop.cs index 10dc27b..b2cfc10 100644 --- a/franz/silver_messages/global/stop.cs +++ b/franz/silver_messages/global/stop.cs @@ -4,7 +4,7 @@ using silver_messages; namespace silver_messages.global { - public class stop : message + public class stop : message_global { //check if this is you (i.e., your handling group). If so, stop. Yourself, not the hardware. public string name { get; set; } diff --git a/franz/silver_messages/message.cs b/franz/silver_messages/message.cs index e6cdd38..aff6720 100644 --- a/franz/silver_messages/message.cs +++ b/franz/silver_messages/message.cs @@ -7,6 +7,7 @@ namespace silver_messages public abstract class message { public string type { get { return this.GetType().ToString(); } } + public abstract string topic { get; } public override string ToString() { return JsonConvert.SerializeObject(this); diff --git a/franz/silver_messages/youtube/message_yt.cs b/franz/silver_messages/youtube/message_yt.cs new file mode 100644 index 0000000..67d6bb2 --- /dev/null +++ b/franz/silver_messages/youtube/message_yt.cs @@ -0,0 +1,7 @@ +namespace silver_messages +{ + public abstract class message_yt : silver_messages.message + { + public override string topic => "silver_messages.youtube"; + } +} \ No newline at end of file diff --git a/franz/silver_messages/youtube/metadata_needed.cs b/franz/silver_messages/youtube/metadata_needed.cs index 9f375d5..820bda8 100644 --- a/franz/silver_messages/youtube/metadata_needed.cs +++ b/franz/silver_messages/youtube/metadata_needed.cs @@ -7,7 +7,7 @@ namespace silver_messages.youtube * for all the scheduled, not-yet released videos, what required metadata * do they need */ - public class metadata_needed : message + public class metadata_needed : message_yt { //key is a yt id public Dictionary needed{get;set;} = new Dictionary(); diff --git a/franz/silver_messages/youtube/request_metadata_needed.cs b/franz/silver_messages/youtube/request_metadata_needed.cs index cb37d2a..4f0308e 100644 --- a/franz/silver_messages/youtube/request_metadata_needed.cs +++ b/franz/silver_messages/youtube/request_metadata_needed.cs @@ -7,5 +7,5 @@ namespace silver_messages.youtube * check all the scheduled, not-yet released videos for required metadata. * respond with a metadata_needed - empty if none. */ - public class request_metadata_needed : message { } + public class request_metadata_needed : message_yt { } } \ No newline at end of file diff --git a/franz/silver_messages/youtube/update_metadata.cs b/franz/silver_messages/youtube/update_metadata.cs index 4bf5150..57df3ee 100644 --- a/franz/silver_messages/youtube/update_metadata.cs +++ b/franz/silver_messages/youtube/update_metadata.cs @@ -6,7 +6,7 @@ namespace silver_messages.youtube /* * someone will send as much metadata as they have - set non-null fields to the provided values */ - public class update_metadata : message + public class update_metadata : message_yt { public string yt_id { get; set; } public yt_metadata metadata { get; set; } diff --git a/franz/silver_messages/youtube/upload.cs b/franz/silver_messages/youtube/upload.cs index 365bfdd..cf6b35c 100644 --- a/franz/silver_messages/youtube/upload.cs +++ b/franz/silver_messages/youtube/upload.cs @@ -6,7 +6,7 @@ namespace silver_messages.youtube /** if you receive this, start the upload (of filename with metadata) * and then respond with an upload_started */ - public class upload : message + public class upload : message_yt { public string filename { get; set; } public yt_metadata metadata { get; set; } diff --git a/franz/silver_messages/youtube/upload_started.cs b/franz/silver_messages/youtube/upload_started.cs index 1b0ad16..e95550c 100644 --- a/franz/silver_messages/youtube/upload_started.cs +++ b/franz/silver_messages/youtube/upload_started.cs @@ -3,7 +3,7 @@ using System.Collections.Generic; using silver_messages; namespace silver_messages.youtube { - public class upload_started : message + public class upload_started : message_yt { public string filename { get; set; } public string yt_id { get; set; }