diff --git a/README.md b/README.md index 106b31a..aa2b1be 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,6 @@ # franz -allows you to speak kafka \ No newline at end of file +allows you to speak kafka + +RIP kafka-sharp. T_T +documentation for confluent kafka: https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.html \ No newline at end of file diff --git a/franz.tests/Program.cs b/franz.tests/Program.cs index fed7b27..8cc91e5 100644 --- a/franz.tests/Program.cs +++ b/franz.tests/Program.cs @@ -8,13 +8,17 @@ namespace franz.tests { static void Main(string[] args) { - Console.WriteLine("Hello World, Im' " + Dns.GetHostName()); + Console.WriteLine("Hello World, I'm " + Dns.GetHostName()); Telefranz.Configure("tester", "alloces:9092", new System.Collections.Generic.List(){"a!"}); + + Console.WriteLine("telefranz configured"); Telefranz.Instance.addHandler(r => { + Console.WriteLine("a report (response to soundoff) has come in"); Console.WriteLine(r); }); Telefranz.Instance.addHandler((ec) => { + Console.WriteLine("message received: execute_command"); if(ec.command == "a!"){ Console.WriteLine($"a! command executing! {string.Join("; ", ec.args)}"); } diff --git a/franz/Telefranz.cs b/franz/Telefranz.cs index 4ce7f5a..15a6983 100644 --- a/franz/Telefranz.cs +++ b/franz/Telefranz.cs @@ -1,5 +1,4 @@ -using Kafka.Public; -using Kafka.Public.Loggers; +using Confluent.Kafka; using Newtonsoft.Json; using System; using System.Collections.Generic; @@ -11,41 +10,101 @@ using System.Threading.Tasks; namespace franz { + public class Telefranz { + private class TopicConsumption + { + private string topic { get; set; } + public IConsumer topicConsumer { get; set; } + public List> externalSubscribers { get; set; } = new List>(); + private CancellationTokenSource cancellationSource { get; set; } = new CancellationTokenSource(); + private Task consumptionTask = null; + private System.Type messageType; + public TopicConsumption(System.Type messageType) + { + this.messageType = messageType; + topic = messageType.ToString(); + topicConsumer = new ConsumerBuilder(Telefranz.config).Build(); + } + private async void ConsumeTaskFor(string topic, CancellationToken token) + { + try + { + while (!token.IsCancellationRequested) + { + var cr = topicConsumer.Consume(); + var deserialized = JsonConvert.DeserializeObject(cr.Message.Value, messageType); + if (deserialized != null && (deserialized as gray_messages.message).type == deserialized.GetType().ToString()) + { + foreach (var waitingAction in externalSubscribers) + { + try + { + waitingAction(deserialized); + } + catch (Exception e) + { + Console.Error.WriteLine("if I don't catch this error the whole thing dies so..."); + Console.Error.WriteLine(JsonConvert.SerializeObject(e)); + } + } + } + } + } + finally + { + topicConsumer.Close(); + cancellationSource.Cancel(); + } + } + + internal void Unsubscribe(Action wrapped) + { + externalSubscribers.Remove(wrapped); + if (externalSubscribers.Count == 0) + { + topicConsumer.Unsubscribe(); + } + } + + internal void Subscribe(Action wrapped) + { + if(cancellationSource.IsCancellationRequested) + { + consumptionTask = null; + } + if(consumptionTask == null || consumptionTask.IsCanceled || consumptionTask.IsCompleted) + { + consumptionTask = Task.Run(() => {this.ConsumeTaskFor(topic, cancellationSource.Token);}); + topicConsumer.Unsubscribe(); + topicConsumer.Subscribe(topic); + } + externalSubscribers.Add(wrapped); + } + } private static Telefranz instance = null; private static readonly object createLock = new object(); - - static class _TelefranzConsumers where T : gray_messages.message - { - internal static readonly Dictionary, Action>> wrappings - = new Dictionary, Action>>(); - } private string handling_group { get; set; } = "Liszt"; + private IProducer producer { get; set; } private gray_messages.global.report howToReport { get; set; } - private ClusterClient clusterClient; - private Dictionary> topicConsumers { get; set; } - = new Dictionary>(); - private Dictionary topicSubscribers { get; set; } = new Dictionary(); - private Kafka.Public.Configuration clusterClientConfig = null; - private static quieterLogger consoleLogger = new quieterLogger(); + ///yours to mine + private Dictionary> wrappedEventMap = new Dictionary>(); + private Dictionary Consumptions { get; set; } = new Dictionary(); + protected static ConsumerConfig config { get; set; } private Telefranz(string name, string bootstrap_servers, List commands = null, List checks = null, List errors = null, List warnings = null) { - var serializer = new StringSerializer(); - var deserializer = new StringDeserializer(); - var serializationConfig = new SerializationConfig() { SerializeOnProduce = true }; - serializationConfig.SetDefaultSerializers(serializer, serializer); - serializationConfig.SetDefaultDeserializers(deserializer, deserializer); - this.clusterClientConfig = new Kafka.Public.Configuration + config = new ConsumerConfig { - Seeds = bootstrap_servers, - ClientId = name, - SerializationConfig = serializationConfig + BootstrapServers = bootstrap_servers, + GroupId = name, + AutoOffsetReset = AutoOffsetReset.Earliest }; - this.clusterClient = new ClusterClient(clusterClientConfig, consoleLogger); - + producer = new ProducerBuilder( + new ProducerConfig(){BootstrapServers = config.BootstrapServers} + ).Build(); handling_group = name; this.howToReport = new gray_messages.global.report() { @@ -75,6 +134,7 @@ namespace franz } }); } + public static void Configure(string name, string bootstrap_servers, List commands = null, List checks = null, List errors = null, List warnings = null) @@ -86,7 +146,6 @@ namespace franz instance = new Telefranz(name, bootstrap_servers, commands, checks, errors, warnings); } } - } public static Telefranz Instance @@ -103,59 +162,40 @@ namespace franz return instance; } } - public void addHandler(Action theAction) where T : gray_messages.message + public void addHandler(Action theAction) where T : gray_messages.message, new() { var topic = typeof(T).ToString(); - Action> wrapped = (kr) => + if (!Consumptions.ContainsKey(topic)) { - var deserialized = JsonConvert.DeserializeObject(kr.Value); - if (deserialized != null && deserialized.type == deserialized.GetType().ToString()) - { - try - { - theAction(deserialized); - } - catch (Exception e) - { - consoleLogger.LogError("if I don't catch this error the whole thing dies so..."); - consoleLogger.LogError(JsonConvert.SerializeObject(e)); - } - } - }; - if (!topicConsumers.ContainsKey(typeof(T).ToString())) - { - topicConsumers[topic] = new KafkaConsumer(topic, clusterClient); - topicConsumers[topic].ConsumeFromLatest(); - topicSubscribers.Add(topic, 0); + Consumptions[topic] = new TopicConsumption(typeof(T)); } - topicConsumers[topic].MessageReceived += wrapped; - _TelefranzConsumers.wrappings[theAction] = wrapped; - topicSubscribers[topic]++; + var wrapped = (object msg) => { theAction(msg as T); }; + wrappedEventMap[theAction] = wrapped; + Consumptions[topic].Subscribe(wrapped); } - public bool removeHandler(Action theAction) where T : gray_messages.message + + public void removeHandler(Action theAction) where T : gray_messages.message { - topicConsumers[typeof(T).ToString()].MessageReceived -= _TelefranzConsumers.wrappings[theAction]; - if (_TelefranzConsumers.wrappings.ContainsKey(theAction)) - { - _TelefranzConsumers.wrappings.Remove(theAction); - topicSubscribers[typeof(T).ToString()]--; - if (topicSubscribers[typeof(T).ToString()] == 0) - { - topicConsumers[typeof(T).ToString()].StopConsume(); - } - return true; - } - else - { - return false; - } + var topic = typeof(T).ToString(); + var wrapped = wrappedEventMap[theAction]; + Consumptions[topic].Unsubscribe(wrapped); + + wrappedEventMap.Remove(theAction); } public void ProduceMessage(T message) where T : gray_messages.message { Console.WriteLine(message.ToString()); - clusterClient.Produce(typeof(T).ToString(), message.ToString()); + + producer.Produce(typeof(T).ToString(), new Message { Key = handling_group, Value = message.ToString() }, + (deliveryReport) => + { + if (deliveryReport.Error.Code != ErrorCode.NoError) + { + Console.Error.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}"); + } + }); } } public class NotInitializedException : Exception diff --git a/franz/franz.csproj b/franz/franz.csproj index b37e5e5..8827975 100644 --- a/franz/franz.csproj +++ b/franz/franz.csproj @@ -8,7 +8,7 @@ - + diff --git a/franz/gray_messages/message.cs b/franz/gray_messages/message.cs index a1b982b..0fb8793 100644 --- a/franz/gray_messages/message.cs +++ b/franz/gray_messages/message.cs @@ -1,13 +1,12 @@ using System; using System.Collections.Generic; using System.IO; -using Kafka.Public; using Newtonsoft.Json; namespace gray_messages { //protip: you can derive from message and I'll dispatch it back to you just as you would expect - public abstract class message : IMemorySerializable + public abstract class message { public string type { get { return this.GetType().ToString(); } } @@ -15,10 +14,5 @@ namespace gray_messages { return JsonConvert.SerializeObject(this); } - public void Serialize(MemoryStream toStream) - { - var stringBytes = System.Text.Encoding.UTF8.GetBytes(this.ToString()); - toStream.Write(stringBytes, 0, stringBytes.Length); - } } } diff --git a/franz/quieterLogger.cs b/franz/quieterLogger.cs deleted file mode 100644 index db27046..0000000 --- a/franz/quieterLogger.cs +++ /dev/null @@ -1,26 +0,0 @@ -using System; -using Kafka.Public; -using Kafka.Public.Loggers; - -namespace franz -{ - internal class quieterLogger : ILogger - { - public void LogDebug(string message) - { - Console.WriteLine($"[{DateTime.Now}] [DEBUG]: {message}"); - } - public void LogError(string message) - { - Console.Error.WriteLine($"[{DateTime.Now}] [ERROR]: {message}"); - } - public void LogInformation(string message) - { - //Console.WriteLine($"[INFO]: {message}"); - } - public void LogWarning(string message) - { - Console.WriteLine($"[{DateTime.Now}] [Warning]: {message}"); - } - } -} \ No newline at end of file