From 07a5f0a5f466fb42937aa01c64531b286ef62b86 Mon Sep 17 00:00:00 2001 From: "Adam R. Grey" Date: Sat, 21 Aug 2021 06:14:51 -0400 Subject: [PATCH] telefranz is a singleton kind of wrecked my ability to use telefranz.tests, but I'm not going to ruin my entire design just for the fun of writing twice as much code for no benefit --- franz.tests/Program.cs | 22 ++------------ franz/Telefranz.cs | 69 +++++++++++++++++++++++++++++++----------- franz/franz.csproj | 2 +- 3 files changed, 55 insertions(+), 38 deletions(-) diff --git a/franz.tests/Program.cs b/franz.tests/Program.cs index 38afda8..e9a5fd6 100644 --- a/franz.tests/Program.cs +++ b/franz.tests/Program.cs @@ -8,9 +8,9 @@ namespace franz.tests static async Task Main(string[] args) { Console.WriteLine("Hello World!"); - var f1 = new Telefranz("libfranztest1", "focalor:9092"); + Telefranz.Configure("libfranztest1", "focalor:9092"); var f1Reported = false; - f1.addHandler((r) => { + Telefranz.Instance.addHandler((r) => { Console.WriteLine($"someone reporting. {r}"); if(r.name =="libfranztest1") { @@ -19,24 +19,6 @@ namespace franz.tests } }); await Task.Delay(1000); - f1.ProduceMessage(new silver_messages.global.sound_off()); - Task.WaitAny( - Task.Run(async () => { - while(f1Reported == false) - { - Console.WriteLine("not ready, giving another 100ms"); - await Task.Delay(100); - } - Console.WriteLine("done, ready, green"); - Environment.Exit(0); - }), - Task.Run(async () => { - await Task.Delay(30000); - Console.WriteLine("time up"); - await Task.Delay(500); - Environment.Exit(-1); - }) - ); Console.WriteLine("done I guess?"); } } diff --git a/franz/Telefranz.cs b/franz/Telefranz.cs index 5c63f34..e79fcfd 100644 --- a/franz/Telefranz.cs +++ b/franz/Telefranz.cs @@ -12,35 +12,38 @@ namespace franz { public class Telefranz { - static class _TelefranzConsumers where T : silver_messages.message { - internal static readonly Dictionary, Action>> wrappings + private static Telefranz instance = null; + private static readonly object createLock = new object(); + + static class _TelefranzConsumers where T : silver_messages.message + { + internal static readonly Dictionary, Action>> wrappings = new Dictionary, Action>>(); } private string handling_group { get; set; } = "Liszt"; private silver_messages.global.report howToReport { get; set; } private ClusterClient clusterClient; - private Dictionary> topicConsumers {get;set;} + private Dictionary> topicConsumers { get; set; } = new Dictionary>(); - private Dictionary topicSubscribers {get;set;} = new Dictionary(); + private Dictionary topicSubscribers { get; set; } = new Dictionary(); private Kafka.Public.Configuration clusterClientConfig = null; private static ConsoleLogger consoleLogger = new ConsoleLogger(); - - public Telefranz(string name, string bootstrap_servers, + private Telefranz(string name, string bootstrap_servers, List commands = null, List checks = null, List errors = null, List warnings = null) { - var serializer = new StringSerializer(); var deserializer = new StringDeserializer(); - var serializationConfig = new SerializationConfig(){SerializeOnProduce = true}; + var serializationConfig = new SerializationConfig() { SerializeOnProduce = true }; serializationConfig.SetDefaultSerializers(serializer, serializer); serializationConfig.SetDefaultDeserializers(deserializer, deserializer); - clusterClientConfig = new Kafka.Public.Configuration { - Seeds = bootstrap_servers, + this.clusterClientConfig = new Kafka.Public.Configuration + { + Seeds = bootstrap_servers, ClientId = name, SerializationConfig = serializationConfig }; - clusterClient = new ClusterClient(clusterClientConfig, consoleLogger); + this.clusterClient = new ClusterClient(clusterClientConfig, consoleLogger); handling_group = name; this.howToReport = new silver_messages.global.report() @@ -77,28 +80,56 @@ namespace franz } }); } + public static void Configure(string name, string bootstrap_servers, + List commands = null, List checks = null, + List errors = null, List warnings = null) + { + lock (createLock) + { + if (instance == null) + { + instance = new Telefranz(name, bootstrap_servers, commands, checks, errors, warnings); + } + } + } + + public static Telefranz Instance + { + get + { + lock (createLock) + { + if (instance == null) + { + throw new NotInitializedException("Configure me first"); + } + } + return instance; + } + } public void addHandler(Action theAction) where T : silver_messages.message { var topic = typeof(T).ToString(); - + Action> wrapped = (kr) => { var deserialized = JsonConvert.DeserializeObject(kr.Value); - if(deserialized != null && deserialized.type == deserialized.GetType().ToString()) + if (deserialized != null && deserialized.type == deserialized.GetType().ToString()) { try { theAction(deserialized); } - catch(Exception e) + catch (Exception e) { consoleLogger.LogError("if I don't catch this error the whole thing dies so..."); consoleLogger.LogError(JsonConvert.SerializeObject(e)); } } }; - if(!topicConsumers.ContainsKey(typeof(T).ToString())){ + if (!topicConsumers.ContainsKey(typeof(T).ToString())) + { topicConsumers[topic] = new KafkaConsumer(topic, clusterClient); topicConsumers[topic].ConsumeFromLatest(); topicSubscribers.Add(topic, 0); @@ -112,7 +143,7 @@ namespace franz topicConsumers[typeof(T).ToString()].MessageReceived -= _TelefranzConsumers.wrappings[theAction]; _TelefranzConsumers.wrappings.Remove(theAction); topicSubscribers[typeof(T).ToString()]--; - if(topicSubscribers[typeof(T).ToString()] == 0) + if (topicSubscribers[typeof(T).ToString()] == 0) { topicConsumers[typeof(T).ToString()].StopConsume(); } @@ -122,6 +153,10 @@ namespace franz { clusterClient.Produce(typeof(T).ToString(), message.ToString()); } - public void HiImAFunction(){} + public void HiImAFunction() { } + } + public class NotInitializedException : Exception + { + public NotInitializedException(string message) : base(message) { } } } \ No newline at end of file diff --git a/franz/franz.csproj b/franz/franz.csproj index 4897ec2..b62fcff 100644 --- a/franz/franz.csproj +++ b/franz/franz.csproj @@ -3,7 +3,7 @@ net5.0 silvermeddlists.franz - 0.0.3 + 0.0.4 adam Silver Meddlists