diff --git a/franz.tests/UnitTest1.cs b/franz.tests/UnitTest1.cs index 1d91dd4..5d18402 100644 --- a/franz.tests/UnitTest1.cs +++ b/franz.tests/UnitTest1.cs @@ -9,20 +9,17 @@ namespace franz.tests public class Tests { Telefranz f1; - Telefranz f2; [SetUp] public void Setup() { f1 = new Telefranz("libfranztest1", "focalor:9092"); - f2 = new Telefranz("libfranztest2", "focalor:9092"); } [Test] public async Task reportsToSoundoff() { var f1Reported = false; - var f2Reported = false; f1.addHandler((silver_messages.global.report r) => { switch(r.name) { @@ -32,27 +29,28 @@ namespace franz.tests TestContext.Out.WriteLine("f1 reported"); break; } - case "libfranztest2": - { - f2Reported = true; - TestContext.Out.WriteLine("f2 reported"); - break; - } + //lol you can't have more than one kafka interface, right? so you can't do any (useful) tests on the sending of messages :P + // case "libfranztest2": + // { + // f2Reported = true; + // TestContext.Out.WriteLine("f2 reported"); + // break; + // } } }); await Task.Delay(2000); f1.ProduceMessage(new silver_messages.global.sound_off()); Task.WaitAny( Task.Run(async () => { - while(f1Reported == false || f2Reported == false) + while(f1Reported == false) { - //TestContext.Out.WriteLine("not ready, giving another 100ms"); + TestContext.Out.WriteLine("not ready, giving another 100ms"); await Task.Delay(100); } - //TestContext.Out.WriteLine("done, ready, green"); + TestContext.Out.WriteLine("done, ready, green"); }), Task.Run(async () => { - await Task.Delay(120000); + await Task.Delay(30000); TestContext.Out.WriteLine("time up"); Assert.Fail(); }) diff --git a/franz/Telefranz.cs b/franz/Telefranz.cs index e02ce9a..0f60b61 100644 --- a/franz/Telefranz.cs +++ b/franz/Telefranz.cs @@ -13,26 +13,36 @@ namespace franz public class Telefranz { static class _TelefranzConsumers where T : silver_messages.message, new() { - internal static KafkaConsumer consumer = null; - internal static readonly Dictionary, Action>> wrappings - = new Dictionary, Action>>(); - internal static uint references = 0; + internal static readonly Dictionary, Action>> wrappings + = new Dictionary, Action>>(); } private string handling_group { get; set; } = "Liszt"; - const string namespace_prefix = "silver_messages"; private silver_messages.global.report howToReport { get; set; } - private bool shouldListen = false; private ClusterClient clusterClient; - private Task listenTask = null; - private List listeningConsumers = new List(); + private Dictionary> topicConsumers {get;set;} + = new Dictionary>(); + private Dictionary topicSubscribers {get;set;} = new Dictionary(); + private Kafka.Public.Configuration clusterClientConfig = null; + private static ConsoleLogger consoleLogger = new ConsoleLogger(); public Telefranz(string name, string bootstrap_servers, List commands = null, List checks = null, List errors = null, List warnings = null) { - clusterClient = new ClusterClient(new Kafka.Public.Configuration { Seeds = bootstrap_servers, ClientId = name }, new ConsoleLogger()); - this.handling_group = name; + var serializer = new StringSerializer(); + var deserializer = new StringDeserializer(); + var serializationConfig = new SerializationConfig(){SerializeOnProduce = true}; + serializationConfig.SetDefaultSerializers(serializer, serializer); + serializationConfig.SetDefaultDeserializers(deserializer, deserializer); + clusterClientConfig = new Kafka.Public.Configuration { + Seeds = bootstrap_servers, + ClientId = name, + SerializationConfig = serializationConfig + }; + clusterClient = new ClusterClient(clusterClientConfig, consoleLogger); + + handling_group = name; this.howToReport = new silver_messages.global.report() { name = name, @@ -54,14 +64,14 @@ namespace franz }); addHandler((m) => { - if (m.name == this.handling_group) + if (m.name == handling_group) { Environment.Exit(0); } }); addHandler((m) => { - if (m.name == this.handling_group) + if (m.name == handling_group) { throw new NotImplementedException();//TODO } @@ -71,41 +81,48 @@ namespace franz public void addHandler(Action theAction) where T : silver_messages.message, new() { var dummy = new T(); - if(_TelefranzConsumers.consumer == null) - { - _TelefranzConsumers.consumer = new KafkaConsumer(dummy.topic, clusterClient); - } + Console.WriteLine($"created dummy, it reports as {dummy}. typeof(T) reports as {typeof(T)}. Topic is {dummy.topic}"); - Action> wrapped = (kr) => + Action> wrapped = (kr) => { - theAction(kr.Value); + Console.WriteLine("wrapped receives kafkarecord, is ready"); + var deserialized = JsonConvert.DeserializeObject(kr.Value); + if(deserialized != null) + { + Console.WriteLine("message deserialized for this handler :)"); + theAction(deserialized); + } + else + { + Console.WriteLine("message didn't deserialize for this handler :("); + } }; - _TelefranzConsumers.consumer.MessageReceived += wrapped; - _TelefranzConsumers.wrappings[theAction] = wrapped; - _TelefranzConsumers.references++; - if(_TelefranzConsumers.references == 1) - { - _TelefranzConsumers.consumer.Subscribe(this.handling_group, - new List(){dummy.topic}, - new ConsumerGroupConfiguration()); - listeningConsumers.Add(_TelefranzConsumers.consumer); - _TelefranzConsumers.consumer.ConsumeFromLatest(); + if(!topicConsumers.ContainsKey(dummy.topic)){ + Console.WriteLine($"making consumer for {handling_group}, topic {dummy.topic}"); + topicConsumers[dummy.topic] = new KafkaConsumer(dummy.topic, clusterClient); + topicConsumers[dummy.topic].ConsumeFromLatest(); + topicSubscribers.Add(dummy.topic, 0); } + topicConsumers[dummy.topic].MessageReceived += wrapped; + _TelefranzConsumers.wrappings[theAction] = wrapped; + topicSubscribers[dummy.topic]++; } public void removeHandler(Action theAction) where T : silver_messages.message, new() { - _TelefranzConsumers.consumer.MessageReceived -= _TelefranzConsumers.wrappings[theAction]; + var dummy = new T(); + topicConsumers[dummy.topic].MessageReceived -= _TelefranzConsumers.wrappings[theAction]; _TelefranzConsumers.wrappings.Remove(theAction); - _TelefranzConsumers.references--; - if(_TelefranzConsumers.references == 0) + topicSubscribers[dummy.topic]--; + if(topicSubscribers[dummy.topic] == 0) { - _TelefranzConsumers.consumer.StopConsume(); + topicConsumers[dummy.topic].StopConsume(); } } public void ProduceMessage(T message) where T : silver_messages.message { - clusterClient.Produce(message.topic, message); + Console.WriteLine($"producing message {message}"); + clusterClient.Produce(message.topic, message.ToString()); } } } \ No newline at end of file diff --git a/franz/silver_messages/directorial/message_directorial.cs b/franz/silver_messages/directorial/message_directorial.cs index 871a3cd..241ae0d 100644 --- a/franz/silver_messages/directorial/message_directorial.cs +++ b/franz/silver_messages/directorial/message_directorial.cs @@ -2,6 +2,6 @@ namespace silver_messages { public abstract class message_directorial : silver_messages.message { - public override string topic => "silver_messages.directorial"; + public override string topic => "silver_messages_directorial"; } } \ No newline at end of file diff --git a/franz/silver_messages/global/message_global.cs b/franz/silver_messages/global/message_global.cs index 746c109..1094543 100644 --- a/franz/silver_messages/global/message_global.cs +++ b/franz/silver_messages/global/message_global.cs @@ -2,6 +2,6 @@ namespace silver_messages { public class message_global : silver_messages.message { - public override string topic => "silver_messages.global"; + public override string topic => "silver_messages_global"; } } \ No newline at end of file diff --git a/franz/silver_messages/message.cs b/franz/silver_messages/message.cs index aff6720..ce1ec5d 100644 --- a/franz/silver_messages/message.cs +++ b/franz/silver_messages/message.cs @@ -1,16 +1,26 @@ using System; using System.Collections.Generic; +using System.IO; +using Kafka.Public; using Newtonsoft.Json; namespace silver_messages { - public abstract class message + public abstract class message : IMemorySerializable { public string type { get { return this.GetType().ToString(); } } public abstract string topic { get; } + + + public override string ToString() { return JsonConvert.SerializeObject(this); } + public void Serialize(MemoryStream toStream) + { + var stringBytes = System.Text.Encoding.UTF8.GetBytes(this.ToString()); + toStream.Write(stringBytes, 0, stringBytes.Length); + } } } diff --git a/franz/silver_messages/youtube/message_yt.cs b/franz/silver_messages/youtube/message_yt.cs index 67d6bb2..060c518 100644 --- a/franz/silver_messages/youtube/message_yt.cs +++ b/franz/silver_messages/youtube/message_yt.cs @@ -2,6 +2,6 @@ namespace silver_messages { public abstract class message_yt : silver_messages.message { - public override string topic => "silver_messages.youtube"; + public override string topic => "silver_messages_youtube"; } } \ No newline at end of file diff --git a/franz/silver_messages/youtube/metadata_needed.cs b/franz/silver_messages/youtube/metadata_needed.cs index 820bda8..78d26aa 100644 --- a/franz/silver_messages/youtube/metadata_needed.cs +++ b/franz/silver_messages/youtube/metadata_needed.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using silver_messages; namespace silver_messages.youtube { diff --git a/franz/silver_messages/youtube/request_metadata_needed.cs b/franz/silver_messages/youtube/request_metadata_needed.cs index 4f0308e..4e05937 100644 --- a/franz/silver_messages/youtube/request_metadata_needed.cs +++ b/franz/silver_messages/youtube/request_metadata_needed.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using silver_messages; namespace silver_messages.youtube { diff --git a/franz/silver_messages/youtube/update_metadata.cs b/franz/silver_messages/youtube/update_metadata.cs index 57df3ee..dbf0506 100644 --- a/franz/silver_messages/youtube/update_metadata.cs +++ b/franz/silver_messages/youtube/update_metadata.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using silver_messages; namespace silver_messages.youtube { diff --git a/franz/silver_messages/youtube/upload.cs b/franz/silver_messages/youtube/upload.cs index cf6b35c..83d42f4 100644 --- a/franz/silver_messages/youtube/upload.cs +++ b/franz/silver_messages/youtube/upload.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using silver_messages; namespace silver_messages.youtube { diff --git a/franz/silver_messages/youtube/upload_started.cs b/franz/silver_messages/youtube/upload_started.cs index e95550c..ccca306 100644 --- a/franz/silver_messages/youtube/upload_started.cs +++ b/franz/silver_messages/youtube/upload_started.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.IO; using silver_messages; namespace silver_messages.youtube {