[wip] switch to kafkasharp

This commit is contained in:
Adam R. Grey 2021-06-29 04:20:09 -04:00
parent 634713171d
commit b9f8b4ec9f
23 changed files with 81 additions and 125 deletions

View File

@ -14,8 +14,8 @@ namespace franz.tests
[SetUp] [SetUp]
public void Setup() public void Setup()
{ {
f1 = new Telefranz("libfranztest1", "focalor:9092", "/usr/lib/librdkafka.so"); f1 = new Telefranz("libfranztest1", "focalor:9092");
f2 = new Telefranz("libfranztest2", "focalor:9092", "/usr/lib/librdkafka.so"); f2 = new Telefranz("libfranztest2", "focalor:9092");
} }
[Test] [Test]
@ -40,8 +40,6 @@ namespace franz.tests
} }
} }
}); });
f1.StartListening();
f2.StartListening();
await Task.Delay(2000); await Task.Delay(2000);
f1.ProduceMessage(new silver_messages.global.sound_off()); f1.ProduceMessage(new silver_messages.global.sound_off());
Task.WaitAny( Task.WaitAny(

View File

@ -1,4 +1,5 @@
using Confluent.Kafka; using Kafka.Public;
using Kafka.Public.Loggers;
using Newtonsoft.Json; using Newtonsoft.Json;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
@ -9,26 +10,28 @@ using System.Threading.Tasks;
namespace franz namespace franz
{ {
public class Telefranz : IDisposable public class Telefranz
{ {
static class _TelefranzConsumers<T> where T : silver_messages.message, new() {
internal static KafkaConsumer<string, T> consumer = null;
internal static readonly Dictionary<Action<T>, Action<KafkaRecord<string, T>>> wrappings
= new Dictionary<Action<T>, Action<KafkaRecord<string, T>>>();
internal static uint references = 0;
}
private string handling_group { get; set; } = "Liszt"; private string handling_group { get; set; } = "Liszt";
const string namespace_prefix = "silver_messages"; const string namespace_prefix = "silver_messages";
private silver_messages.global.report howToReport { get; set; } private silver_messages.global.report howToReport { get; set; }
private bool shouldListen = false; private bool shouldListen = false;
private IProducer<Null, string> producer { get; set; } private ClusterClient clusterClient;
private IConsumer<Ignore, string> consumer { get; set; }
private static List<Type> allMessageTypes { get; set; } = null;
private Task listenTask = null; private Task listenTask = null;
private Dictionary<Type, List<Action<silver_messages.message>>> handlers { get; set; } private List<object> listeningConsumers = new List<object>();
private Dictionary<object, Action<silver_messages.message>> handlerWrappings
= new Dictionary<object, Action<silver_messages.message>>();
public Telefranz(string name, string bootstrap_servers, string rdkafka_location, public Telefranz(string name, string bootstrap_servers,
List<string> commands = null, List<string> checks = null, List<string> commands = null, List<string> checks = null,
List<string> errors = null, List<string> warnings = null) List<string> errors = null, List<string> warnings = null)
{ {
Library.Load(rdkafka_location); clusterClient = new ClusterClient(new Kafka.Public.Configuration { Seeds = bootstrap_servers, ClientId = name }, new ConsoleLogger());
this.handling_group = name; this.handling_group = name;
this.howToReport = new silver_messages.global.report() this.howToReport = new silver_messages.global.report()
{ {
@ -44,25 +47,7 @@ namespace franz
{ {
howToReport.capabilites.checks = checks; howToReport.capabilites.checks = checks;
} }
var producerConfig = new ProducerConfig
{
BootstrapServers = bootstrap_servers
};
var consumerConfig = new ConsumerConfig
{
BootstrapServers = bootstrap_servers,
GroupId = this.handling_group
};
producer = new ProducerBuilder<Null, string>(producerConfig).Build();
consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
staticSetup();
handlers = new Dictionary<Type, List<Action<silver_messages.message>>>();
foreach (var messageType in allMessageTypes)
{
handlers.Add(messageType, new List<Action<silver_messages.message>>());
}
addHandler<silver_messages.global.sound_off>((m) => addHandler<silver_messages.global.sound_off>((m) =>
{ {
ProduceMessage(this.howToReport); ProduceMessage(this.howToReport);
@ -83,93 +68,44 @@ namespace franz
}); });
} }
public void addHandler<t>(Action<t> theAction) where t : silver_messages.message public void addHandler<T>(Action<T> theAction) where T : silver_messages.message, new()
{ {
Action<silver_messages.message> wrapped = (m) => var dummy = new T();
if(_TelefranzConsumers<T>.consumer == null)
{ {
theAction(m as t); _TelefranzConsumers<T>.consumer = new KafkaConsumer<string, T>(dummy.topic, clusterClient);
}
Action<KafkaRecord<string, T>> wrapped = (kr) =>
{
theAction(kr.Value);
}; };
_TelefranzConsumers<T>.consumer.MessageReceived += wrapped;
handlerWrappings.Add(theAction, wrapped); _TelefranzConsumers<T>.wrappings[theAction] = wrapped;
if (handlers[typeof(t)].Count == 0) _TelefranzConsumers<T>.references++;
if(_TelefranzConsumers<T>.references == 1)
{ {
var sublist = consumer.Subscription; _TelefranzConsumers<T>.consumer.Subscribe(this.handling_group,
sublist.Add(typeof(t).ToString()); new List<string>(){dummy.topic},
consumer.Subscribe(sublist); new ConsumerGroupConfiguration());
listeningConsumers.Add(_TelefranzConsumers<T>.consumer);
_TelefranzConsumers<T>.consumer.ConsumeFromLatest();
} }
handlers[typeof(t)].Add(wrapped);
} }
public void removeHandler<t>(Action<t> theAction) where t : silver_messages.message public void removeHandler<T>(Action<T> theAction) where T : silver_messages.message, new()
{ {
handlers[typeof(t)].Remove(handlerWrappings[theAction as System.Action]); _TelefranzConsumers<T>.consumer.MessageReceived -= _TelefranzConsumers<T>.wrappings[theAction];
handlerWrappings.Remove(theAction as System.Action); _TelefranzConsumers<T>.wrappings.Remove(theAction);
_TelefranzConsumers<T>.references--;
if (handlers[typeof(t)].Count == 0) if(_TelefranzConsumers<T>.references == 0)
{ {
var sublist = consumer.Subscription; _TelefranzConsumers<T>.consumer.StopConsume();
consumer.Unsubscribe();
sublist.Remove(typeof(t).ToString());
consumer.Subscribe(sublist);
} }
} }
public void ProduceMessage(silver_messages.message message) public void ProduceMessage<T>(T message) where T : silver_messages.message
{ {
var msgText = JsonConvert.SerializeObject(message, type: message.GetType(), null); clusterClient.Produce(message.topic, message);
producer.Produce(message.GetType().ToString(), new Message<Null, string> { Value = msgText });
}
public void StartListening()
{
StopListening();
listenTask = Task.Run(() =>
{
shouldListen = true;
while (shouldListen)
{
var cr = consumer.Consume();
if (cr != null)
{
var typeHaver = JsonConvert.DeserializeAnonymousType(cr.Message.Value, new { type = "" });
var messageType = handlers.Keys.FirstOrDefault(hkey => hkey.ToString() == typeHaver.type && handlers[hkey].Count > 0);
if (messageType != null)
{
var message = JsonConvert.DeserializeObject(cr.Message.Value, messageType);
foreach (var handling in handlers[messageType])
{
handling(message as silver_messages.message);
}
}
}
}
});
}
public void StopListening()
{
shouldListen = false;
}
private static void staticSetup()
{
if (allMessageTypes != null)
{
return;
}
allMessageTypes = new List<Type>();
foreach (var messageType in from t in Assembly.GetExecutingAssembly().GetTypes()
where t.IsClass && t.Namespace?.StartsWith(namespace_prefix) == true
select t)
{
allMessageTypes.Add(messageType);
}
}
public void Dispose()
{
StopListening();
producer.Dispose();
consumer.Dispose();
} }
} }
} }

View File

@ -9,7 +9,7 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.7.0" /> <PackageReference Include="kafka-sharp" Version="1.4.3" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.1" /> <PackageReference Include="Newtonsoft.Json" Version="13.0.1" />
</ItemGroup> </ItemGroup>

View File

@ -4,7 +4,7 @@ using silver_messages;
namespace silver_messages.directorial namespace silver_messages.directorial
{ {
public class check_complete : message public class check_complete : message_directorial
{ {
public string check { get; set; } public string check { get; set; }
public string result { get; set; } public string result { get; set; }

View File

@ -4,7 +4,7 @@ using silver_messages;
namespace silver_messages.directorial namespace silver_messages.directorial
{ {
public class command_completed : message public class command_completed : message_directorial
{ {
//name of the command that was called //name of the command that was called
public string command { get; set; } public string command { get; set; }

View File

@ -3,7 +3,7 @@ using System.Collections.Generic;
using silver_messages; using silver_messages;
namespace silver_messages.directorial namespace silver_messages.directorial
{ {
public class command_error : message public class command_error : message_directorial
{ {
//name of the command that was called //name of the command that was called
public string command { get; set; } public string command { get; set; }

View File

@ -3,7 +3,7 @@ using System.Collections.Generic;
using silver_messages; using silver_messages;
namespace silver_messages.directorial namespace silver_messages.directorial
{ {
public class command_expired : message public class command_expired : message_directorial
{ {
//name of the command that was called //name of the command that was called
public string command { get; set; } public string command { get; set; }

View File

@ -3,7 +3,7 @@ using System.Collections.Generic;
using silver_messages; using silver_messages;
namespace silver_messages.directorial namespace silver_messages.directorial
{ {
public class command_output : message public class command_output : message_directorial
{ {
//name of the command that was called //name of the command that was called
public string command { get; set; } public string command { get; set; }

View File

@ -7,7 +7,7 @@ namespace silver_messages.directorial
/* /*
* run it and send a check_complete * run it and send a check_complete
*/ */
public class execute_check : message public class execute_check : message_directorial
{ {
public string check { get; set; } public string check { get; set; }
public List<string> args { get; set; } = new List<string>(); public List<string> args { get; set; } = new List<string>();

View File

@ -12,7 +12,7 @@ namespace silver_messages.directorial
* command_error as you get them, and if it ends some other way, send command_completed * command_error as you get them, and if it ends some other way, send command_completed
* with empty stdout and stderr. * with empty stdout and stderr.
*/ */
public class execute_command : message public class execute_command : message_directorial
{ {
public string command { get; set; } public string command { get; set; }
public List<string> args { get; set; } = new List<string>(); public List<string> args { get; set; } = new List<string>();

View File

@ -0,0 +1,7 @@
namespace silver_messages
{
public abstract class message_directorial : silver_messages.message
{
public override string topic => "silver_messages.directorial";
}
}

View File

@ -0,0 +1,7 @@
namespace silver_messages
{
public class message_global : silver_messages.message
{
public override string topic => "silver_messages.global";
}
}

View File

@ -2,7 +2,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
namespace silver_messages.global namespace silver_messages.global
{ {
public class report : message public class report : message_global
{ {
//your name //your name
public string name { get; set; } public string name { get; set; }

View File

@ -4,7 +4,7 @@ using silver_messages;
namespace silver_messages.global namespace silver_messages.global
{ {
public class restart : message public class restart : message_global
{ {
//check if this is you (i.e., your handling group). If so, restart. Yourself, not the hardware. //check if this is you (i.e., your handling group). If so, restart. Yourself, not the hardware.
public string name { get; set; } public string name { get; set; }

View File

@ -5,6 +5,6 @@ using silver_messages;
namespace silver_messages.global namespace silver_messages.global
{ {
//if you receive this, respond with a report //if you receive this, respond with a report
public class sound_off : message { } public class sound_off : message_global { }
} }

View File

@ -4,7 +4,7 @@ using silver_messages;
namespace silver_messages.global namespace silver_messages.global
{ {
public class stop : message public class stop : message_global
{ {
//check if this is you (i.e., your handling group). If so, stop. Yourself, not the hardware. //check if this is you (i.e., your handling group). If so, stop. Yourself, not the hardware.
public string name { get; set; } public string name { get; set; }

View File

@ -7,6 +7,7 @@ namespace silver_messages
public abstract class message public abstract class message
{ {
public string type { get { return this.GetType().ToString(); } } public string type { get { return this.GetType().ToString(); } }
public abstract string topic { get; }
public override string ToString() public override string ToString()
{ {
return JsonConvert.SerializeObject(this); return JsonConvert.SerializeObject(this);

View File

@ -0,0 +1,7 @@
namespace silver_messages
{
public abstract class message_yt : silver_messages.message
{
public override string topic => "silver_messages.youtube";
}
}

View File

@ -7,7 +7,7 @@ namespace silver_messages.youtube
* for all the scheduled, not-yet released videos, what required metadata * for all the scheduled, not-yet released videos, what required metadata
* do they need * do they need
*/ */
public class metadata_needed : message public class metadata_needed : message_yt
{ {
//key is a yt id //key is a yt id
public Dictionary<string, yt_metadata> needed{get;set;} = new Dictionary<string, yt_metadata>(); public Dictionary<string, yt_metadata> needed{get;set;} = new Dictionary<string, yt_metadata>();

View File

@ -7,5 +7,5 @@ namespace silver_messages.youtube
* check all the scheduled, not-yet released videos for required metadata. * check all the scheduled, not-yet released videos for required metadata.
* respond with a metadata_needed - empty if none. * respond with a metadata_needed - empty if none.
*/ */
public class request_metadata_needed : message { } public class request_metadata_needed : message_yt { }
} }

View File

@ -6,7 +6,7 @@ namespace silver_messages.youtube
/* /*
* someone will send as much metadata as they have - set non-null fields to the provided values * someone will send as much metadata as they have - set non-null fields to the provided values
*/ */
public class update_metadata : message public class update_metadata : message_yt
{ {
public string yt_id { get; set; } public string yt_id { get; set; }
public yt_metadata metadata { get; set; } public yt_metadata metadata { get; set; }

View File

@ -6,7 +6,7 @@ namespace silver_messages.youtube
/** if you receive this, start the upload (of filename with metadata) /** if you receive this, start the upload (of filename with metadata)
* and then respond with an upload_started * and then respond with an upload_started
*/ */
public class upload : message public class upload : message_yt
{ {
public string filename { get; set; } public string filename { get; set; }
public yt_metadata metadata { get; set; } public yt_metadata metadata { get; set; }

View File

@ -3,7 +3,7 @@ using System.Collections.Generic;
using silver_messages; using silver_messages;
namespace silver_messages.youtube namespace silver_messages.youtube
{ {
public class upload_started : message public class upload_started : message_yt
{ {
public string filename { get; set; } public string filename { get; set; }
public string yt_id { get; set; } public string yt_id { get; set; }