telefranz is a singleton

kind of wrecked my ability to use telefranz.tests, but I'm not going to ruin my entire design just for the fun of writing twice as much code for no benefit
This commit is contained in:
Adam R. Grey 2021-08-21 06:14:51 -04:00
parent 1e3ea299df
commit 07a5f0a5f4
3 changed files with 55 additions and 38 deletions

View File

@ -8,9 +8,9 @@ namespace franz.tests
static async Task Main(string[] args)
{
Console.WriteLine("Hello World!");
var f1 = new Telefranz("libfranztest1", "focalor:9092");
Telefranz.Configure("libfranztest1", "focalor:9092");
var f1Reported = false;
f1.addHandler<silver_messages.global.report>((r) => {
Telefranz.Instance.addHandler<silver_messages.global.report>((r) => {
Console.WriteLine($"someone reporting. {r}");
if(r.name =="libfranztest1")
{
@ -19,24 +19,6 @@ namespace franz.tests
}
});
await Task.Delay(1000);
f1.ProduceMessage(new silver_messages.global.sound_off());
Task.WaitAny(
Task.Run(async () => {
while(f1Reported == false)
{
Console.WriteLine("not ready, giving another 100ms");
await Task.Delay(100);
}
Console.WriteLine("done, ready, green");
Environment.Exit(0);
}),
Task.Run(async () => {
await Task.Delay(30000);
Console.WriteLine("time up");
await Task.Delay(500);
Environment.Exit(-1);
})
);
Console.WriteLine("done I guess?");
}
}

View File

@ -12,35 +12,38 @@ namespace franz
{
public class Telefranz
{
static class _TelefranzConsumers<T> where T : silver_messages.message {
internal static readonly Dictionary<Action<T>, Action<KafkaRecord<string, string>>> wrappings
private static Telefranz instance = null;
private static readonly object createLock = new object();
static class _TelefranzConsumers<T> where T : silver_messages.message
{
internal static readonly Dictionary<Action<T>, Action<KafkaRecord<string, string>>> wrappings
= new Dictionary<Action<T>, Action<KafkaRecord<string, string>>>();
}
private string handling_group { get; set; } = "Liszt";
private silver_messages.global.report howToReport { get; set; }
private ClusterClient clusterClient;
private Dictionary<string, KafkaConsumer<string, string>> topicConsumers {get;set;}
private Dictionary<string, KafkaConsumer<string, string>> topicConsumers { get; set; }
= new Dictionary<string, KafkaConsumer<string, string>>();
private Dictionary<string, int> topicSubscribers {get;set;} = new Dictionary<string, int>();
private Dictionary<string, int> topicSubscribers { get; set; } = new Dictionary<string, int>();
private Kafka.Public.Configuration clusterClientConfig = null;
private static ConsoleLogger consoleLogger = new ConsoleLogger();
public Telefranz(string name, string bootstrap_servers,
private Telefranz(string name, string bootstrap_servers,
List<string> commands = null, List<string> checks = null,
List<string> errors = null, List<string> warnings = null)
{
var serializer = new StringSerializer();
var deserializer = new StringDeserializer();
var serializationConfig = new SerializationConfig(){SerializeOnProduce = true};
var serializationConfig = new SerializationConfig() { SerializeOnProduce = true };
serializationConfig.SetDefaultSerializers(serializer, serializer);
serializationConfig.SetDefaultDeserializers(deserializer, deserializer);
clusterClientConfig = new Kafka.Public.Configuration {
Seeds = bootstrap_servers,
this.clusterClientConfig = new Kafka.Public.Configuration
{
Seeds = bootstrap_servers,
ClientId = name,
SerializationConfig = serializationConfig
};
clusterClient = new ClusterClient(clusterClientConfig, consoleLogger);
this.clusterClient = new ClusterClient(clusterClientConfig, consoleLogger);
handling_group = name;
this.howToReport = new silver_messages.global.report()
@ -77,28 +80,56 @@ namespace franz
}
});
}
public static void Configure(string name, string bootstrap_servers,
List<string> commands = null, List<string> checks = null,
List<string> errors = null, List<string> warnings = null)
{
lock (createLock)
{
if (instance == null)
{
instance = new Telefranz(name, bootstrap_servers, commands, checks, errors, warnings);
}
}
}
public static Telefranz Instance
{
get
{
lock (createLock)
{
if (instance == null)
{
throw new NotInitializedException("Configure me first");
}
}
return instance;
}
}
public void addHandler<T>(Action<T> theAction) where T : silver_messages.message
{
var topic = typeof(T).ToString();
Action<KafkaRecord<string, string>> wrapped = (kr) =>
{
var deserialized = JsonConvert.DeserializeObject<T>(kr.Value);
if(deserialized != null && deserialized.type == deserialized.GetType().ToString())
if (deserialized != null && deserialized.type == deserialized.GetType().ToString())
{
try
{
theAction(deserialized);
}
catch(Exception e)
catch (Exception e)
{
consoleLogger.LogError("if I don't catch this error the whole thing dies so...");
consoleLogger.LogError(JsonConvert.SerializeObject(e));
}
}
};
if(!topicConsumers.ContainsKey(typeof(T).ToString())){
if (!topicConsumers.ContainsKey(typeof(T).ToString()))
{
topicConsumers[topic] = new KafkaConsumer<string, string>(topic, clusterClient);
topicConsumers[topic].ConsumeFromLatest();
topicSubscribers.Add(topic, 0);
@ -112,7 +143,7 @@ namespace franz
topicConsumers[typeof(T).ToString()].MessageReceived -= _TelefranzConsumers<T>.wrappings[theAction];
_TelefranzConsumers<T>.wrappings.Remove(theAction);
topicSubscribers[typeof(T).ToString()]--;
if(topicSubscribers[typeof(T).ToString()] == 0)
if (topicSubscribers[typeof(T).ToString()] == 0)
{
topicConsumers[typeof(T).ToString()].StopConsume();
}
@ -122,6 +153,10 @@ namespace franz
{
clusterClient.Produce(typeof(T).ToString(), message.ToString());
}
public void HiImAFunction(){}
public void HiImAFunction() { }
}
public class NotInitializedException : Exception
{
public NotInitializedException(string message) : base(message) { }
}
}

View File

@ -3,7 +3,7 @@
<PropertyGroup>
<TargetFramework>net5.0</TargetFramework>
<PackageId>silvermeddlists.franz</PackageId>
<Version>0.0.3</Version>
<Version>0.0.4</Version>
<Authors>adam</Authors>
<Company>Silver Meddlists</Company>
</PropertyGroup>