directors-assistant/Program.cs

75 lines
2.5 KiB
C#
Raw Normal View History

2021-06-09 07:51:25 -04:00
using Confluent.Kafka;
2021-06-08 07:10:24 -04:00
using Microsoft.Extensions.Configuration;
2021-06-09 07:51:25 -04:00
using System;
using System.Threading;
namespace directors_assistant
{
class Program
{
static void Main(string[] args)
{
2021-06-09 07:51:25 -04:00
2021-06-08 07:10:24 -04:00
var config = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", true, true)
.Build();
2021-06-09 07:51:25 -04:00
Library.Load(config["kafka location"]);
var producerConfig = new ProducerConfig
{
BootstrapServers = config["bootstrap servers"]
};
var consumerConfig = new ConsumerConfig
{
BootstrapServers = config["bootstrap servers"],
GroupId = config["handling group"],
};
using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build())
{
try
{
producer.Produce(config["topic"], new Message<Null, string> { Value = "" + config["handling group"] });
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build())
{
consumer.Subscribe(config["topic"]);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Message.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
}
}
}
}