From c1c3890211a669fce52ee106b85162247fd94095 Mon Sep 17 00:00:00 2001 From: "Adam R. Grey" Date: Wed, 9 Jun 2021 07:51:25 -0400 Subject: [PATCH] kafka woooooo --- Program.cs | 63 ++++++++++++++++++++++++++++++++++++-- README.md | 6 +++- appsettings.example.json | 6 +++- directors-assistant.csproj | 1 + 4 files changed, 71 insertions(+), 5 deletions(-) diff --git a/Program.cs b/Program.cs index ddbec3c..7a29d49 100644 --- a/Program.cs +++ b/Program.cs @@ -1,5 +1,7 @@ -using System; +using Confluent.Kafka; using Microsoft.Extensions.Configuration; +using System; +using System.Threading; namespace directors_assistant { @@ -7,11 +9,66 @@ namespace directors_assistant { static void Main(string[] args) { + var config = new ConfigurationBuilder() .AddJsonFile("appsettings.json", true, true) .Build(); - var name = config["name"]; - Console.WriteLine($"Hello {name}!"); + + Library.Load(config["kafka location"]); + var producerConfig = new ProducerConfig + { + BootstrapServers = config["bootstrap servers"] + }; + var consumerConfig = new ConsumerConfig + { + BootstrapServers = config["bootstrap servers"], + GroupId = config["handling group"], + }; + + using (var producer = new ProducerBuilder(producerConfig).Build()) + { + try + { + producer.Produce(config["topic"], new Message { Value = "" + config["handling group"] }); + } + catch (ProduceException e) + { + Console.WriteLine($"Delivery failed: {e.Error.Reason}"); + } + + + + using (var consumer = new ConsumerBuilder(consumerConfig).Build()) + { + consumer.Subscribe(config["topic"]); + + CancellationTokenSource cts = new CancellationTokenSource(); + Console.CancelKeyPress += (_, e) => + { + e.Cancel = true; // prevent the process from terminating. + cts.Cancel(); + }; + try + { + while (true) + { + try + { + var cr = consumer.Consume(cts.Token); + Console.WriteLine($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'."); + } + catch (ConsumeException e) + { + Console.WriteLine($"Error occured: {e.Error.Reason}"); + } + } + } + catch (OperationCanceledException) + { + consumer.Close(); + } + } + } } } } diff --git a/README.md b/README.md index eba0665..91c2b24 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,7 @@ # directors-assistant -listen on kafka for commands, execute without question \ No newline at end of file +listen on kafka for commands, execute without question + + +Depends on librdkafka.redist, but confluent's nuget package doesn't know that. +and worse, it sounds like the point where it just mysteriously loses an exception is where it would try to load it itself \ No newline at end of file diff --git a/appsettings.example.json b/appsettings.example.json index cef6d5d..740cc6c 100644 --- a/appsettings.example.json +++ b/appsettings.example.json @@ -1,3 +1,7 @@ { - "name": "guy who didn't configure" + "name": "guy who didn't configure", + "bootstrap servers": "localhost:9092", + "handling group": "foo", + "kafka location": "/usr/lib/librdkafka.so", + "topic": "SM" } \ No newline at end of file diff --git a/directors-assistant.csproj b/directors-assistant.csproj index b09ab0a..814fed8 100644 --- a/directors-assistant.csproj +++ b/directors-assistant.csproj @@ -7,6 +7,7 @@ +