diff --git a/franz.tests/Program.cs b/franz.tests/Program.cs index 38afda8..e9a5fd6 100644 --- a/franz.tests/Program.cs +++ b/franz.tests/Program.cs @@ -8,9 +8,9 @@ namespace franz.tests static async Task Main(string[] args) { Console.WriteLine("Hello World!"); - var f1 = new Telefranz("libfranztest1", "focalor:9092"); + Telefranz.Configure("libfranztest1", "focalor:9092"); var f1Reported = false; - f1.addHandler((r) => { + Telefranz.Instance.addHandler((r) => { Console.WriteLine($"someone reporting. {r}"); if(r.name =="libfranztest1") { @@ -19,24 +19,6 @@ namespace franz.tests } }); await Task.Delay(1000); - f1.ProduceMessage(new silver_messages.global.sound_off()); - Task.WaitAny( - Task.Run(async () => { - while(f1Reported == false) - { - Console.WriteLine("not ready, giving another 100ms"); - await Task.Delay(100); - } - Console.WriteLine("done, ready, green"); - Environment.Exit(0); - }), - Task.Run(async () => { - await Task.Delay(30000); - Console.WriteLine("time up"); - await Task.Delay(500); - Environment.Exit(-1); - }) - ); Console.WriteLine("done I guess?"); } } diff --git a/franz/Telefranz.cs b/franz/Telefranz.cs index 5c63f34..e79fcfd 100644 --- a/franz/Telefranz.cs +++ b/franz/Telefranz.cs @@ -12,35 +12,38 @@ namespace franz { public class Telefranz { - static class _TelefranzConsumers where T : silver_messages.message { - internal static readonly Dictionary, Action>> wrappings + private static Telefranz instance = null; + private static readonly object createLock = new object(); + + static class _TelefranzConsumers where T : silver_messages.message + { + internal static readonly Dictionary, Action>> wrappings = new Dictionary, Action>>(); } private string handling_group { get; set; } = "Liszt"; private silver_messages.global.report howToReport { get; set; } private ClusterClient clusterClient; - private Dictionary> topicConsumers {get;set;} + private Dictionary> topicConsumers { get; set; } = new Dictionary>(); - private Dictionary topicSubscribers {get;set;} = new Dictionary(); + private Dictionary topicSubscribers { get; set; } = new Dictionary(); private Kafka.Public.Configuration clusterClientConfig = null; private static ConsoleLogger consoleLogger = new ConsoleLogger(); - - public Telefranz(string name, string bootstrap_servers, + private Telefranz(string name, string bootstrap_servers, List commands = null, List checks = null, List errors = null, List warnings = null) { - var serializer = new StringSerializer(); var deserializer = new StringDeserializer(); - var serializationConfig = new SerializationConfig(){SerializeOnProduce = true}; + var serializationConfig = new SerializationConfig() { SerializeOnProduce = true }; serializationConfig.SetDefaultSerializers(serializer, serializer); serializationConfig.SetDefaultDeserializers(deserializer, deserializer); - clusterClientConfig = new Kafka.Public.Configuration { - Seeds = bootstrap_servers, + this.clusterClientConfig = new Kafka.Public.Configuration + { + Seeds = bootstrap_servers, ClientId = name, SerializationConfig = serializationConfig }; - clusterClient = new ClusterClient(clusterClientConfig, consoleLogger); + this.clusterClient = new ClusterClient(clusterClientConfig, consoleLogger); handling_group = name; this.howToReport = new silver_messages.global.report() @@ -77,28 +80,56 @@ namespace franz } }); } + public static void Configure(string name, string bootstrap_servers, + List commands = null, List checks = null, + List errors = null, List warnings = null) + { + lock (createLock) + { + if (instance == null) + { + instance = new Telefranz(name, bootstrap_servers, commands, checks, errors, warnings); + } + } + } + + public static Telefranz Instance + { + get + { + lock (createLock) + { + if (instance == null) + { + throw new NotInitializedException("Configure me first"); + } + } + return instance; + } + } public void addHandler(Action theAction) where T : silver_messages.message { var topic = typeof(T).ToString(); - + Action> wrapped = (kr) => { var deserialized = JsonConvert.DeserializeObject(kr.Value); - if(deserialized != null && deserialized.type == deserialized.GetType().ToString()) + if (deserialized != null && deserialized.type == deserialized.GetType().ToString()) { try { theAction(deserialized); } - catch(Exception e) + catch (Exception e) { consoleLogger.LogError("if I don't catch this error the whole thing dies so..."); consoleLogger.LogError(JsonConvert.SerializeObject(e)); } } }; - if(!topicConsumers.ContainsKey(typeof(T).ToString())){ + if (!topicConsumers.ContainsKey(typeof(T).ToString())) + { topicConsumers[topic] = new KafkaConsumer(topic, clusterClient); topicConsumers[topic].ConsumeFromLatest(); topicSubscribers.Add(topic, 0); @@ -112,7 +143,7 @@ namespace franz topicConsumers[typeof(T).ToString()].MessageReceived -= _TelefranzConsumers.wrappings[theAction]; _TelefranzConsumers.wrappings.Remove(theAction); topicSubscribers[typeof(T).ToString()]--; - if(topicSubscribers[typeof(T).ToString()] == 0) + if (topicSubscribers[typeof(T).ToString()] == 0) { topicConsumers[typeof(T).ToString()].StopConsume(); } @@ -122,6 +153,10 @@ namespace franz { clusterClient.Produce(typeof(T).ToString(), message.ToString()); } - public void HiImAFunction(){} + public void HiImAFunction() { } + } + public class NotInitializedException : Exception + { + public NotInitializedException(string message) : base(message) { } } } \ No newline at end of file diff --git a/franz/franz.csproj b/franz/franz.csproj index 4897ec2..b62fcff 100644 --- a/franz/franz.csproj +++ b/franz/franz.csproj @@ -3,7 +3,7 @@ net5.0 silvermeddlists.franz - 0.0.3 + 0.0.4 adam Silver Meddlists