switch to confluent kafka
This commit is contained in:
parent
db8e8f5636
commit
e472087189
@ -1,3 +1,6 @@
|
||||
# franz
|
||||
|
||||
allows you to speak kafka
|
||||
|
||||
RIP kafka-sharp. T_T
|
||||
documentation for confluent kafka: https://docs.confluent.io/platform/current/clients/confluent-kafka-dotnet/_site/api/Confluent.Kafka.html
|
@ -8,13 +8,17 @@ namespace franz.tests
|
||||
{
|
||||
static void Main(string[] args)
|
||||
{
|
||||
Console.WriteLine("Hello World, Im' " + Dns.GetHostName());
|
||||
Console.WriteLine("Hello World, I'm " + Dns.GetHostName());
|
||||
Telefranz.Configure("tester", "alloces:9092", new System.Collections.Generic.List<string>(){"a!"});
|
||||
|
||||
Console.WriteLine("telefranz configured");
|
||||
|
||||
Telefranz.Instance.addHandler<gray_messages.global.report>(r => {
|
||||
Console.WriteLine("a report (response to soundoff) has come in");
|
||||
Console.WriteLine(r);
|
||||
});
|
||||
Telefranz.Instance.addHandler<gray_messages.directorial.execute_command>((ec) => {
|
||||
Console.WriteLine("message received: execute_command");
|
||||
if(ec.command == "a!"){
|
||||
Console.WriteLine($"a! command executing! {string.Join("; ", ec.args)}");
|
||||
}
|
||||
|
@ -1,5 +1,4 @@
|
||||
using Kafka.Public;
|
||||
using Kafka.Public.Loggers;
|
||||
using Confluent.Kafka;
|
||||
using Newtonsoft.Json;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
@ -11,41 +10,101 @@ using System.Threading.Tasks;
|
||||
|
||||
namespace franz
|
||||
{
|
||||
|
||||
public class Telefranz
|
||||
{
|
||||
private class TopicConsumption
|
||||
{
|
||||
private string topic { get; set; }
|
||||
public IConsumer<string, string> topicConsumer { get; set; }
|
||||
public List<Action<object>> externalSubscribers { get; set; } = new List<Action<object>>();
|
||||
private CancellationTokenSource cancellationSource { get; set; } = new CancellationTokenSource();
|
||||
private Task consumptionTask = null;
|
||||
private System.Type messageType;
|
||||
public TopicConsumption(System.Type messageType)
|
||||
{
|
||||
this.messageType = messageType;
|
||||
topic = messageType.ToString();
|
||||
topicConsumer = new ConsumerBuilder<string, string>(Telefranz.config).Build();
|
||||
}
|
||||
private async void ConsumeTaskFor(string topic, CancellationToken token)
|
||||
{
|
||||
try
|
||||
{
|
||||
while (!token.IsCancellationRequested)
|
||||
{
|
||||
var cr = topicConsumer.Consume();
|
||||
var deserialized = JsonConvert.DeserializeObject(cr.Message.Value, messageType);
|
||||
if (deserialized != null && (deserialized as gray_messages.message).type == deserialized.GetType().ToString())
|
||||
{
|
||||
foreach (var waitingAction in externalSubscribers)
|
||||
{
|
||||
try
|
||||
{
|
||||
waitingAction(deserialized);
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
Console.Error.WriteLine("if I don't catch this error the whole thing dies so...");
|
||||
Console.Error.WriteLine(JsonConvert.SerializeObject(e));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
topicConsumer.Close();
|
||||
cancellationSource.Cancel();
|
||||
}
|
||||
}
|
||||
|
||||
internal void Unsubscribe(Action<object> wrapped)
|
||||
{
|
||||
externalSubscribers.Remove(wrapped);
|
||||
if (externalSubscribers.Count == 0)
|
||||
{
|
||||
topicConsumer.Unsubscribe();
|
||||
}
|
||||
}
|
||||
|
||||
internal void Subscribe(Action<object> wrapped)
|
||||
{
|
||||
if(cancellationSource.IsCancellationRequested)
|
||||
{
|
||||
consumptionTask = null;
|
||||
}
|
||||
if(consumptionTask == null || consumptionTask.IsCanceled || consumptionTask.IsCompleted)
|
||||
{
|
||||
consumptionTask = Task.Run(() => {this.ConsumeTaskFor(topic, cancellationSource.Token);});
|
||||
topicConsumer.Unsubscribe();
|
||||
topicConsumer.Subscribe(topic);
|
||||
}
|
||||
externalSubscribers.Add(wrapped);
|
||||
}
|
||||
}
|
||||
private static Telefranz instance = null;
|
||||
private static readonly object createLock = new object();
|
||||
|
||||
static class _TelefranzConsumers<T> where T : gray_messages.message
|
||||
{
|
||||
internal static readonly Dictionary<Action<T>, Action<KafkaRecord<string, string>>> wrappings
|
||||
= new Dictionary<Action<T>, Action<KafkaRecord<string, string>>>();
|
||||
}
|
||||
private string handling_group { get; set; } = "Liszt";
|
||||
private IProducer<string, string> producer { get; set; }
|
||||
private gray_messages.global.report howToReport { get; set; }
|
||||
private ClusterClient clusterClient;
|
||||
private Dictionary<string, KafkaConsumer<string, string>> topicConsumers { get; set; }
|
||||
= new Dictionary<string, KafkaConsumer<string, string>>();
|
||||
private Dictionary<string, int> topicSubscribers { get; set; } = new Dictionary<string, int>();
|
||||
private Kafka.Public.Configuration clusterClientConfig = null;
|
||||
private static quieterLogger consoleLogger = new quieterLogger();
|
||||
///yours to mine
|
||||
private Dictionary<object, Action<object>> wrappedEventMap = new Dictionary<object, Action<object>>();
|
||||
private Dictionary<string, TopicConsumption> Consumptions { get; set; } = new Dictionary<string, TopicConsumption>();
|
||||
protected static ConsumerConfig config { get; set; }
|
||||
private Telefranz(string name, string bootstrap_servers,
|
||||
List<string> commands = null, List<string> checks = null,
|
||||
List<string> errors = null, List<string> warnings = null)
|
||||
{
|
||||
var serializer = new StringSerializer();
|
||||
var deserializer = new StringDeserializer();
|
||||
var serializationConfig = new SerializationConfig() { SerializeOnProduce = true };
|
||||
serializationConfig.SetDefaultSerializers(serializer, serializer);
|
||||
serializationConfig.SetDefaultDeserializers(deserializer, deserializer);
|
||||
this.clusterClientConfig = new Kafka.Public.Configuration
|
||||
config = new ConsumerConfig
|
||||
{
|
||||
Seeds = bootstrap_servers,
|
||||
ClientId = name,
|
||||
SerializationConfig = serializationConfig
|
||||
BootstrapServers = bootstrap_servers,
|
||||
GroupId = name,
|
||||
AutoOffsetReset = AutoOffsetReset.Earliest
|
||||
};
|
||||
this.clusterClient = new ClusterClient(clusterClientConfig, consoleLogger);
|
||||
|
||||
producer = new ProducerBuilder<string, string>(
|
||||
new ProducerConfig(){BootstrapServers = config.BootstrapServers}
|
||||
).Build();
|
||||
handling_group = name;
|
||||
this.howToReport = new gray_messages.global.report()
|
||||
{
|
||||
@ -75,6 +134,7 @@ namespace franz
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public static void Configure(string name, string bootstrap_servers,
|
||||
List<string> commands = null, List<string> checks = null,
|
||||
List<string> errors = null, List<string> warnings = null)
|
||||
@ -86,7 +146,6 @@ namespace franz
|
||||
instance = new Telefranz(name, bootstrap_servers, commands, checks, errors, warnings);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static Telefranz Instance
|
||||
@ -103,59 +162,40 @@ namespace franz
|
||||
return instance;
|
||||
}
|
||||
}
|
||||
public void addHandler<T>(Action<T> theAction) where T : gray_messages.message
|
||||
public void addHandler<T>(Action<T> theAction) where T : gray_messages.message, new()
|
||||
{
|
||||
var topic = typeof(T).ToString();
|
||||
|
||||
Action<KafkaRecord<string, string>> wrapped = (kr) =>
|
||||
if (!Consumptions.ContainsKey(topic))
|
||||
{
|
||||
var deserialized = JsonConvert.DeserializeObject<T>(kr.Value);
|
||||
if (deserialized != null && deserialized.type == deserialized.GetType().ToString())
|
||||
{
|
||||
try
|
||||
{
|
||||
theAction(deserialized);
|
||||
Consumptions[topic] = new TopicConsumption(typeof(T));
|
||||
}
|
||||
catch (Exception e)
|
||||
var wrapped = (object msg) => { theAction(msg as T); };
|
||||
wrappedEventMap[theAction] = wrapped;
|
||||
Consumptions[topic].Subscribe(wrapped);
|
||||
}
|
||||
|
||||
public void removeHandler<T>(Action<T> theAction) where T : gray_messages.message
|
||||
{
|
||||
consoleLogger.LogError("if I don't catch this error the whole thing dies so...");
|
||||
consoleLogger.LogError(JsonConvert.SerializeObject(e));
|
||||
}
|
||||
}
|
||||
};
|
||||
if (!topicConsumers.ContainsKey(typeof(T).ToString()))
|
||||
{
|
||||
topicConsumers[topic] = new KafkaConsumer<string, string>(topic, clusterClient);
|
||||
topicConsumers[topic].ConsumeFromLatest();
|
||||
topicSubscribers.Add(topic, 0);
|
||||
}
|
||||
topicConsumers[topic].MessageReceived += wrapped;
|
||||
_TelefranzConsumers<T>.wrappings[theAction] = wrapped;
|
||||
topicSubscribers[topic]++;
|
||||
}
|
||||
public bool removeHandler<T>(Action<T> theAction) where T : gray_messages.message
|
||||
{
|
||||
topicConsumers[typeof(T).ToString()].MessageReceived -= _TelefranzConsumers<T>.wrappings[theAction];
|
||||
if (_TelefranzConsumers<T>.wrappings.ContainsKey(theAction))
|
||||
{
|
||||
_TelefranzConsumers<T>.wrappings.Remove(theAction);
|
||||
topicSubscribers[typeof(T).ToString()]--;
|
||||
if (topicSubscribers[typeof(T).ToString()] == 0)
|
||||
{
|
||||
topicConsumers[typeof(T).ToString()].StopConsume();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
var topic = typeof(T).ToString();
|
||||
var wrapped = wrappedEventMap[theAction];
|
||||
Consumptions[topic].Unsubscribe(wrapped);
|
||||
|
||||
wrappedEventMap.Remove(theAction);
|
||||
}
|
||||
|
||||
public void ProduceMessage<T>(T message) where T : gray_messages.message
|
||||
{
|
||||
Console.WriteLine(message.ToString());
|
||||
clusterClient.Produce(typeof(T).ToString(), message.ToString());
|
||||
|
||||
producer.Produce(typeof(T).ToString(), new Message<string, string> { Key = handling_group, Value = message.ToString() },
|
||||
(deliveryReport) =>
|
||||
{
|
||||
if (deliveryReport.Error.Code != ErrorCode.NoError)
|
||||
{
|
||||
Console.Error.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
public class NotInitializedException : Exception
|
||||
|
@ -8,7 +8,7 @@
|
||||
</PropertyGroup>
|
||||
|
||||
<ItemGroup>
|
||||
<PackageReference Include="kafka-sharp" Version="1.4.3" />
|
||||
<PackageReference Include="Confluent.Kafka" Version="1.9.3" />
|
||||
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
|
||||
</ItemGroup>
|
||||
|
||||
|
@ -1,13 +1,12 @@
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.IO;
|
||||
using Kafka.Public;
|
||||
using Newtonsoft.Json;
|
||||
|
||||
namespace gray_messages
|
||||
{
|
||||
//protip: you can derive from message and I'll dispatch it back to you just as you would expect
|
||||
public abstract class message : IMemorySerializable
|
||||
public abstract class message
|
||||
{
|
||||
public string type { get { return this.GetType().ToString(); } }
|
||||
|
||||
@ -15,10 +14,5 @@ namespace gray_messages
|
||||
{
|
||||
return JsonConvert.SerializeObject(this);
|
||||
}
|
||||
public void Serialize(MemoryStream toStream)
|
||||
{
|
||||
var stringBytes = System.Text.Encoding.UTF8.GetBytes(this.ToString());
|
||||
toStream.Write(stringBytes, 0, stringBytes.Length);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,26 +0,0 @@
|
||||
using System;
|
||||
using Kafka.Public;
|
||||
using Kafka.Public.Loggers;
|
||||
|
||||
namespace franz
|
||||
{
|
||||
internal class quieterLogger : ILogger
|
||||
{
|
||||
public void LogDebug(string message)
|
||||
{
|
||||
Console.WriteLine($"[{DateTime.Now}] [DEBUG]: {message}");
|
||||
}
|
||||
public void LogError(string message)
|
||||
{
|
||||
Console.Error.WriteLine($"[{DateTime.Now}] [ERROR]: {message}");
|
||||
}
|
||||
public void LogInformation(string message)
|
||||
{
|
||||
//Console.WriteLine($"[INFO]: {message}");
|
||||
}
|
||||
public void LogWarning(string message)
|
||||
{
|
||||
Console.WriteLine($"[{DateTime.Now}] [Warning]: {message}");
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user