[wip] ought to work

but testing frameworks are counter productive
This commit is contained in:
Adam R. Grey 2021-06-29 07:39:11 -04:00
parent b9f8b4ec9f
commit 0f1868d85e
11 changed files with 80 additions and 50 deletions

View File

@ -9,20 +9,17 @@ namespace franz.tests
public class Tests
{
Telefranz f1;
Telefranz f2;
[SetUp]
public void Setup()
{
f1 = new Telefranz("libfranztest1", "focalor:9092");
f2 = new Telefranz("libfranztest2", "focalor:9092");
}
[Test]
public async Task reportsToSoundoff()
{
var f1Reported = false;
var f2Reported = false;
f1.addHandler((silver_messages.global.report r) => {
switch(r.name)
{
@ -32,27 +29,28 @@ namespace franz.tests
TestContext.Out.WriteLine("f1 reported");
break;
}
case "libfranztest2":
{
f2Reported = true;
TestContext.Out.WriteLine("f2 reported");
break;
}
//lol you can't have more than one kafka interface, right? so you can't do any (useful) tests on the sending of messages :P
// case "libfranztest2":
// {
// f2Reported = true;
// TestContext.Out.WriteLine("f2 reported");
// break;
// }
}
});
await Task.Delay(2000);
f1.ProduceMessage(new silver_messages.global.sound_off());
Task.WaitAny(
Task.Run(async () => {
while(f1Reported == false || f2Reported == false)
while(f1Reported == false)
{
//TestContext.Out.WriteLine("not ready, giving another 100ms");
TestContext.Out.WriteLine("not ready, giving another 100ms");
await Task.Delay(100);
}
//TestContext.Out.WriteLine("done, ready, green");
TestContext.Out.WriteLine("done, ready, green");
}),
Task.Run(async () => {
await Task.Delay(120000);
await Task.Delay(30000);
TestContext.Out.WriteLine("time up");
Assert.Fail();
})

View File

@ -13,26 +13,36 @@ namespace franz
public class Telefranz
{
static class _TelefranzConsumers<T> where T : silver_messages.message, new() {
internal static KafkaConsumer<string, T> consumer = null;
internal static readonly Dictionary<Action<T>, Action<KafkaRecord<string, T>>> wrappings
= new Dictionary<Action<T>, Action<KafkaRecord<string, T>>>();
internal static uint references = 0;
internal static readonly Dictionary<Action<T>, Action<KafkaRecord<string, string>>> wrappings
= new Dictionary<Action<T>, Action<KafkaRecord<string, string>>>();
}
private string handling_group { get; set; } = "Liszt";
const string namespace_prefix = "silver_messages";
private silver_messages.global.report howToReport { get; set; }
private bool shouldListen = false;
private ClusterClient clusterClient;
private Task listenTask = null;
private List<object> listeningConsumers = new List<object>();
private Dictionary<string, KafkaConsumer<string, string>> topicConsumers {get;set;}
= new Dictionary<string, KafkaConsumer<string, string>>();
private Dictionary<string, int> topicSubscribers {get;set;} = new Dictionary<string, int>();
private Kafka.Public.Configuration clusterClientConfig = null;
private static ConsoleLogger consoleLogger = new ConsoleLogger();
public Telefranz(string name, string bootstrap_servers,
List<string> commands = null, List<string> checks = null,
List<string> errors = null, List<string> warnings = null)
{
clusterClient = new ClusterClient(new Kafka.Public.Configuration { Seeds = bootstrap_servers, ClientId = name }, new ConsoleLogger());
this.handling_group = name;
var serializer = new StringSerializer();
var deserializer = new StringDeserializer();
var serializationConfig = new SerializationConfig(){SerializeOnProduce = true};
serializationConfig.SetDefaultSerializers(serializer, serializer);
serializationConfig.SetDefaultDeserializers(deserializer, deserializer);
clusterClientConfig = new Kafka.Public.Configuration {
Seeds = bootstrap_servers,
ClientId = name,
SerializationConfig = serializationConfig
};
clusterClient = new ClusterClient(clusterClientConfig, consoleLogger);
handling_group = name;
this.howToReport = new silver_messages.global.report()
{
name = name,
@ -54,14 +64,14 @@ namespace franz
});
addHandler<silver_messages.global.stop>((m) =>
{
if (m.name == this.handling_group)
if (m.name == handling_group)
{
Environment.Exit(0);
}
});
addHandler<silver_messages.global.restart>((m) =>
{
if (m.name == this.handling_group)
if (m.name == handling_group)
{
throw new NotImplementedException();//TODO
}
@ -71,41 +81,48 @@ namespace franz
public void addHandler<T>(Action<T> theAction) where T : silver_messages.message, new()
{
var dummy = new T();
if(_TelefranzConsumers<T>.consumer == null)
{
_TelefranzConsumers<T>.consumer = new KafkaConsumer<string, T>(dummy.topic, clusterClient);
}
Console.WriteLine($"created dummy, it reports as {dummy}. typeof(T) reports as {typeof(T)}. Topic is {dummy.topic}");
Action<KafkaRecord<string, T>> wrapped = (kr) =>
Action<KafkaRecord<string, string>> wrapped = (kr) =>
{
theAction(kr.Value);
};
_TelefranzConsumers<T>.consumer.MessageReceived += wrapped;
_TelefranzConsumers<T>.wrappings[theAction] = wrapped;
_TelefranzConsumers<T>.references++;
if(_TelefranzConsumers<T>.references == 1)
Console.WriteLine("wrapped receives kafkarecord, is ready");
var deserialized = JsonConvert.DeserializeObject<T>(kr.Value);
if(deserialized != null)
{
_TelefranzConsumers<T>.consumer.Subscribe(this.handling_group,
new List<string>(){dummy.topic},
new ConsumerGroupConfiguration());
listeningConsumers.Add(_TelefranzConsumers<T>.consumer);
_TelefranzConsumers<T>.consumer.ConsumeFromLatest();
Console.WriteLine("message deserialized for this handler :)");
theAction(deserialized);
}
else
{
Console.WriteLine("message didn't deserialize for this handler :(");
}
};
if(!topicConsumers.ContainsKey(dummy.topic)){
Console.WriteLine($"making consumer for {handling_group}, topic {dummy.topic}");
topicConsumers[dummy.topic] = new KafkaConsumer<string, string>(dummy.topic, clusterClient);
topicConsumers[dummy.topic].ConsumeFromLatest();
topicSubscribers.Add(dummy.topic, 0);
}
topicConsumers[dummy.topic].MessageReceived += wrapped;
_TelefranzConsumers<T>.wrappings[theAction] = wrapped;
topicSubscribers[dummy.topic]++;
}
public void removeHandler<T>(Action<T> theAction) where T : silver_messages.message, new()
{
_TelefranzConsumers<T>.consumer.MessageReceived -= _TelefranzConsumers<T>.wrappings[theAction];
var dummy = new T();
topicConsumers[dummy.topic].MessageReceived -= _TelefranzConsumers<T>.wrappings[theAction];
_TelefranzConsumers<T>.wrappings.Remove(theAction);
_TelefranzConsumers<T>.references--;
if(_TelefranzConsumers<T>.references == 0)
topicSubscribers[dummy.topic]--;
if(topicSubscribers[dummy.topic] == 0)
{
_TelefranzConsumers<T>.consumer.StopConsume();
topicConsumers[dummy.topic].StopConsume();
}
}
public void ProduceMessage<T>(T message) where T : silver_messages.message
{
clusterClient.Produce(message.topic, message);
Console.WriteLine($"producing message {message}");
clusterClient.Produce(message.topic, message.ToString());
}
}
}

View File

@ -2,6 +2,6 @@ namespace silver_messages
{
public abstract class message_directorial : silver_messages.message
{
public override string topic => "silver_messages.directorial";
public override string topic => "silver_messages_directorial";
}
}

View File

@ -2,6 +2,6 @@ namespace silver_messages
{
public class message_global : silver_messages.message
{
public override string topic => "silver_messages.global";
public override string topic => "silver_messages_global";
}
}

View File

@ -1,16 +1,26 @@
using System;
using System.Collections.Generic;
using System.IO;
using Kafka.Public;
using Newtonsoft.Json;
namespace silver_messages
{
public abstract class message
public abstract class message : IMemorySerializable
{
public string type { get { return this.GetType().ToString(); } }
public abstract string topic { get; }
public override string ToString()
{
return JsonConvert.SerializeObject(this);
}
public void Serialize(MemoryStream toStream)
{
var stringBytes = System.Text.Encoding.UTF8.GetBytes(this.ToString());
toStream.Write(stringBytes, 0, stringBytes.Length);
}
}
}

View File

@ -2,6 +2,6 @@ namespace silver_messages
{
public abstract class message_yt : silver_messages.message
{
public override string topic => "silver_messages.youtube";
public override string topic => "silver_messages_youtube";
}
}

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using silver_messages;
namespace silver_messages.youtube
{

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using silver_messages;
namespace silver_messages.youtube
{

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using silver_messages;
namespace silver_messages.youtube
{

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using silver_messages;
namespace silver_messages.youtube
{

View File

@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using silver_messages;
namespace silver_messages.youtube
{