From 0636157a02ac2174996746b72d55eadb953f2c94 Mon Sep 17 00:00:00 2001 From: "Adam R. Grey" Date: Tue, 29 Jun 2021 08:15:10 -0400 Subject: [PATCH] [wip] alright I think I figured out the issue that dummy that gets created to grab the topic off? I think somehow kafka.net is getting confused and that's why the name is getting clobbered. NEW PLAN: every message type is a topic. --- franz.tests/.vscode/launch.json | 26 ++++++++++++++ franz.tests/.vscode/tasks.json | 42 +++++++++++++++++++++++ franz.tests/Program.cs | 43 +++++++++++++++++++++++ franz.tests/UnitTest1.cs | 60 --------------------------------- franz.tests/franz.tests.csproj | 26 +++++--------- franz/Telefranz.cs | 24 +++++++------ 6 files changed, 133 insertions(+), 88 deletions(-) create mode 100644 franz.tests/.vscode/launch.json create mode 100644 franz.tests/.vscode/tasks.json create mode 100644 franz.tests/Program.cs delete mode 100644 franz.tests/UnitTest1.cs diff --git a/franz.tests/.vscode/launch.json b/franz.tests/.vscode/launch.json new file mode 100644 index 0000000..d02f69a --- /dev/null +++ b/franz.tests/.vscode/launch.json @@ -0,0 +1,26 @@ +{ + "version": "0.2.0", + "configurations": [ + { + // Use IntelliSense to find out which attributes exist for C# debugging + // Use hover for the description of the existing attributes + // For further information visit https://github.com/OmniSharp/omnisharp-vscode/blob/master/debugger-launchjson.md + "name": ".NET Core Launch (console)", + "type": "coreclr", + "request": "launch", + "preLaunchTask": "build", + // If you have changed target frameworks, make sure to update the program path. + "program": "${workspaceFolder}/bin/Debug/net5.0/franz.tests.dll", + "args": [], + "cwd": "${workspaceFolder}", + // For more information about the 'console' field, see https://aka.ms/VSCode-CS-LaunchJson-Console + "console": "internalConsole", + "stopAtEntry": false + }, + { + "name": ".NET Core Attach", + "type": "coreclr", + "request": "attach" + } + ] +} \ No newline at end of file diff --git a/franz.tests/.vscode/tasks.json b/franz.tests/.vscode/tasks.json new file mode 100644 index 0000000..1330609 --- /dev/null +++ b/franz.tests/.vscode/tasks.json @@ -0,0 +1,42 @@ +{ + "version": "2.0.0", + "tasks": [ + { + "label": "build", + "command": "dotnet", + "type": "process", + "args": [ + "build", + "${workspaceFolder}/franz.tests.csproj", + "/property:GenerateFullPaths=true", + "/consoleloggerparameters:NoSummary" + ], + "problemMatcher": "$msCompile" + }, + { + "label": "publish", + "command": "dotnet", + "type": "process", + "args": [ + "publish", + "${workspaceFolder}/franz.tests.csproj", + "/property:GenerateFullPaths=true", + "/consoleloggerparameters:NoSummary" + ], + "problemMatcher": "$msCompile" + }, + { + "label": "watch", + "command": "dotnet", + "type": "process", + "args": [ + "watch", + "run", + "${workspaceFolder}/franz.tests.csproj", + "/property:GenerateFullPaths=true", + "/consoleloggerparameters:NoSummary" + ], + "problemMatcher": "$msCompile" + } + ] +} \ No newline at end of file diff --git a/franz.tests/Program.cs b/franz.tests/Program.cs new file mode 100644 index 0000000..38afda8 --- /dev/null +++ b/franz.tests/Program.cs @@ -0,0 +1,43 @@ +using System; +using System.Threading.Tasks; + +namespace franz.tests +{ + class Program + { + static async Task Main(string[] args) + { + Console.WriteLine("Hello World!"); + var f1 = new Telefranz("libfranztest1", "focalor:9092"); + var f1Reported = false; + f1.addHandler((r) => { + Console.WriteLine($"someone reporting. {r}"); + if(r.name =="libfranztest1") + { + f1Reported = true; + Console.WriteLine("f1 reported"); + } + }); + await Task.Delay(1000); + f1.ProduceMessage(new silver_messages.global.sound_off()); + Task.WaitAny( + Task.Run(async () => { + while(f1Reported == false) + { + Console.WriteLine("not ready, giving another 100ms"); + await Task.Delay(100); + } + Console.WriteLine("done, ready, green"); + Environment.Exit(0); + }), + Task.Run(async () => { + await Task.Delay(30000); + Console.WriteLine("time up"); + await Task.Delay(500); + Environment.Exit(-1); + }) + ); + Console.WriteLine("done I guess?"); + } + } +} diff --git a/franz.tests/UnitTest1.cs b/franz.tests/UnitTest1.cs deleted file mode 100644 index 5d18402..0000000 --- a/franz.tests/UnitTest1.cs +++ /dev/null @@ -1,60 +0,0 @@ -using System; -using System.Threading.Tasks; -using franz; -using NUnit.Framework; - -namespace franz.tests -{ - [TestFixture] - public class Tests - { - Telefranz f1; - - [SetUp] - public void Setup() - { - f1 = new Telefranz("libfranztest1", "focalor:9092"); - } - - [Test] - public async Task reportsToSoundoff() - { - var f1Reported = false; - f1.addHandler((silver_messages.global.report r) => { - switch(r.name) - { - case "libfranztest1": - { - f1Reported = true; - TestContext.Out.WriteLine("f1 reported"); - break; - } - //lol you can't have more than one kafka interface, right? so you can't do any (useful) tests on the sending of messages :P - // case "libfranztest2": - // { - // f2Reported = true; - // TestContext.Out.WriteLine("f2 reported"); - // break; - // } - } - }); - await Task.Delay(2000); - f1.ProduceMessage(new silver_messages.global.sound_off()); - Task.WaitAny( - Task.Run(async () => { - while(f1Reported == false) - { - TestContext.Out.WriteLine("not ready, giving another 100ms"); - await Task.Delay(100); - } - TestContext.Out.WriteLine("done, ready, green"); - }), - Task.Run(async () => { - await Task.Delay(30000); - TestContext.Out.WriteLine("time up"); - Assert.Fail(); - }) - ); - } - } -} \ No newline at end of file diff --git a/franz.tests/franz.tests.csproj b/franz.tests/franz.tests.csproj index e1babdc..79d3280 100644 --- a/franz.tests/franz.tests.csproj +++ b/franz.tests/franz.tests.csproj @@ -1,19 +1,11 @@ - - - - net5.0 - - false - - - - - - - - + + + + Exe + net5.0 + + - - - + + diff --git a/franz/Telefranz.cs b/franz/Telefranz.cs index 0f60b61..f9317a0 100644 --- a/franz/Telefranz.cs +++ b/franz/Telefranz.cs @@ -82,30 +82,32 @@ namespace franz { var dummy = new T(); Console.WriteLine($"created dummy, it reports as {dummy}. typeof(T) reports as {typeof(T)}. Topic is {dummy.topic}"); + var topic = dummy.topic; Action> wrapped = (kr) => { - Console.WriteLine("wrapped receives kafkarecord, is ready"); + Console.WriteLine($"wrapped receives kafkarecord, is ready to process: {kr.Value}"); var deserialized = JsonConvert.DeserializeObject(kr.Value); - if(deserialized != null) + if(deserialized != null && deserialized.type == deserialized.GetType().ToString()) { - Console.WriteLine("message deserialized for this handler :)"); + Console.WriteLine("message deserialized for this handler"); theAction(deserialized); + Console.WriteLine("action survived"); } else { - Console.WriteLine("message didn't deserialize for this handler :("); + Console.WriteLine("message didn't deserialize for this handler"); } }; - if(!topicConsumers.ContainsKey(dummy.topic)){ - Console.WriteLine($"making consumer for {handling_group}, topic {dummy.topic}"); - topicConsumers[dummy.topic] = new KafkaConsumer(dummy.topic, clusterClient); - topicConsumers[dummy.topic].ConsumeFromLatest(); - topicSubscribers.Add(dummy.topic, 0); + if(!topicConsumers.ContainsKey(topic)){ + Console.WriteLine($"making consumer for {handling_group}, topic {topic}"); + topicConsumers[topic] = new KafkaConsumer(topic, clusterClient); + topicConsumers[topic].ConsumeFromLatest(); + topicSubscribers.Add(topic, 0); } - topicConsumers[dummy.topic].MessageReceived += wrapped; + topicConsumers[topic].MessageReceived += wrapped; _TelefranzConsumers.wrappings[theAction] = wrapped; - topicSubscribers[dummy.topic]++; + topicSubscribers[topic]++; } public void removeHandler(Action theAction) where T : silver_messages.message, new() {