[wip] alright I think I figured out the issue

that dummy that gets created to grab the topic off? I think somehow kafka.net is getting confused and that's why the name is getting clobbered.

NEW PLAN: every message type is a topic.
This commit is contained in:
Adam R. Grey 2021-06-29 08:15:10 -04:00
parent 0f1868d85e
commit 0636157a02
6 changed files with 133 additions and 88 deletions

26
franz.tests/.vscode/launch.json vendored Normal file
View File

@ -0,0 +1,26 @@
{
"version": "0.2.0",
"configurations": [
{
// Use IntelliSense to find out which attributes exist for C# debugging
// Use hover for the description of the existing attributes
// For further information visit https://github.com/OmniSharp/omnisharp-vscode/blob/master/debugger-launchjson.md
"name": ".NET Core Launch (console)",
"type": "coreclr",
"request": "launch",
"preLaunchTask": "build",
// If you have changed target frameworks, make sure to update the program path.
"program": "${workspaceFolder}/bin/Debug/net5.0/franz.tests.dll",
"args": [],
"cwd": "${workspaceFolder}",
// For more information about the 'console' field, see https://aka.ms/VSCode-CS-LaunchJson-Console
"console": "internalConsole",
"stopAtEntry": false
},
{
"name": ".NET Core Attach",
"type": "coreclr",
"request": "attach"
}
]
}

42
franz.tests/.vscode/tasks.json vendored Normal file
View File

@ -0,0 +1,42 @@
{
"version": "2.0.0",
"tasks": [
{
"label": "build",
"command": "dotnet",
"type": "process",
"args": [
"build",
"${workspaceFolder}/franz.tests.csproj",
"/property:GenerateFullPaths=true",
"/consoleloggerparameters:NoSummary"
],
"problemMatcher": "$msCompile"
},
{
"label": "publish",
"command": "dotnet",
"type": "process",
"args": [
"publish",
"${workspaceFolder}/franz.tests.csproj",
"/property:GenerateFullPaths=true",
"/consoleloggerparameters:NoSummary"
],
"problemMatcher": "$msCompile"
},
{
"label": "watch",
"command": "dotnet",
"type": "process",
"args": [
"watch",
"run",
"${workspaceFolder}/franz.tests.csproj",
"/property:GenerateFullPaths=true",
"/consoleloggerparameters:NoSummary"
],
"problemMatcher": "$msCompile"
}
]
}

43
franz.tests/Program.cs Normal file
View File

@ -0,0 +1,43 @@
using System;
using System.Threading.Tasks;
namespace franz.tests
{
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Hello World!");
var f1 = new Telefranz("libfranztest1", "focalor:9092");
var f1Reported = false;
f1.addHandler<silver_messages.global.report>((r) => {
Console.WriteLine($"someone reporting. {r}");
if(r.name =="libfranztest1")
{
f1Reported = true;
Console.WriteLine("f1 reported");
}
});
await Task.Delay(1000);
f1.ProduceMessage(new silver_messages.global.sound_off());
Task.WaitAny(
Task.Run(async () => {
while(f1Reported == false)
{
Console.WriteLine("not ready, giving another 100ms");
await Task.Delay(100);
}
Console.WriteLine("done, ready, green");
Environment.Exit(0);
}),
Task.Run(async () => {
await Task.Delay(30000);
Console.WriteLine("time up");
await Task.Delay(500);
Environment.Exit(-1);
})
);
Console.WriteLine("done I guess?");
}
}
}

View File

@ -1,60 +0,0 @@
using System;
using System.Threading.Tasks;
using franz;
using NUnit.Framework;
namespace franz.tests
{
[TestFixture]
public class Tests
{
Telefranz f1;
[SetUp]
public void Setup()
{
f1 = new Telefranz("libfranztest1", "focalor:9092");
}
[Test]
public async Task reportsToSoundoff()
{
var f1Reported = false;
f1.addHandler((silver_messages.global.report r) => {
switch(r.name)
{
case "libfranztest1":
{
f1Reported = true;
TestContext.Out.WriteLine("f1 reported");
break;
}
//lol you can't have more than one kafka interface, right? so you can't do any (useful) tests on the sending of messages :P
// case "libfranztest2":
// {
// f2Reported = true;
// TestContext.Out.WriteLine("f2 reported");
// break;
// }
}
});
await Task.Delay(2000);
f1.ProduceMessage(new silver_messages.global.sound_off());
Task.WaitAny(
Task.Run(async () => {
while(f1Reported == false)
{
TestContext.Out.WriteLine("not ready, giving another 100ms");
await Task.Delay(100);
}
TestContext.Out.WriteLine("done, ready, green");
}),
Task.Run(async () => {
await Task.Delay(30000);
TestContext.Out.WriteLine("time up");
Assert.Fail();
})
);
}
}
}

View File

@ -1,19 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk"> <Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup> <PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework> <TargetFramework>net5.0</TargetFramework>
<IsPackable>false</IsPackable>
</PropertyGroup> </PropertyGroup>
<ItemGroup>
<PackageReference Include="NUnit" Version="3.12.0" />
<PackageReference Include="NUnit3TestAdapter" Version="3.16.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.5.0" />
</ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\franz\franz.csproj" /> <ProjectReference Include="..\franz\franz.csproj" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -82,30 +82,32 @@ namespace franz
{ {
var dummy = new T(); var dummy = new T();
Console.WriteLine($"created dummy, it reports as {dummy}. typeof(T) reports as {typeof(T)}. Topic is {dummy.topic}"); Console.WriteLine($"created dummy, it reports as {dummy}. typeof(T) reports as {typeof(T)}. Topic is {dummy.topic}");
var topic = dummy.topic;
Action<KafkaRecord<string, string>> wrapped = (kr) => Action<KafkaRecord<string, string>> wrapped = (kr) =>
{ {
Console.WriteLine("wrapped receives kafkarecord, is ready"); Console.WriteLine($"wrapped receives kafkarecord, is ready to process: {kr.Value}");
var deserialized = JsonConvert.DeserializeObject<T>(kr.Value); var deserialized = JsonConvert.DeserializeObject<T>(kr.Value);
if(deserialized != null) if(deserialized != null && deserialized.type == deserialized.GetType().ToString())
{ {
Console.WriteLine("message deserialized for this handler :)"); Console.WriteLine("message deserialized for this handler");
theAction(deserialized); theAction(deserialized);
Console.WriteLine("action survived");
} }
else else
{ {
Console.WriteLine("message didn't deserialize for this handler :("); Console.WriteLine("message didn't deserialize for this handler");
} }
}; };
if(!topicConsumers.ContainsKey(dummy.topic)){ if(!topicConsumers.ContainsKey(topic)){
Console.WriteLine($"making consumer for {handling_group}, topic {dummy.topic}"); Console.WriteLine($"making consumer for {handling_group}, topic {topic}");
topicConsumers[dummy.topic] = new KafkaConsumer<string, string>(dummy.topic, clusterClient); topicConsumers[topic] = new KafkaConsumer<string, string>(topic, clusterClient);
topicConsumers[dummy.topic].ConsumeFromLatest(); topicConsumers[topic].ConsumeFromLatest();
topicSubscribers.Add(dummy.topic, 0); topicSubscribers.Add(topic, 0);
} }
topicConsumers[dummy.topic].MessageReceived += wrapped; topicConsumers[topic].MessageReceived += wrapped;
_TelefranzConsumers<T>.wrappings[theAction] = wrapped; _TelefranzConsumers<T>.wrappings[theAction] = wrapped;
topicSubscribers[dummy.topic]++; topicSubscribers[topic]++;
} }
public void removeHandler<T>(Action<T> theAction) where T : silver_messages.message, new() public void removeHandler<T>(Action<T> theAction) where T : silver_messages.message, new()
{ {