automated-testing-demo/producer/Producer.cs

65 lines
1.9 KiB
C#
Raw Normal View History

2024-06-14 13:42:51 +00:00
using System;
2024-06-14 14:48:52 +00:00
using System.Text.Json;
2024-06-14 13:42:51 +00:00
using System.Threading.Tasks;
using Confluent.Kafka;
2024-06-14 14:48:52 +00:00
using Serializers;
2024-06-14 13:42:51 +00:00
using Model;
class Producer
{
public static string TOPIC_NAME = "test-topic";
2024-06-14 13:42:51 +00:00
public static async Task Main (string[] args)
{
2024-06-14 14:48:52 +00:00
var config = new ProducerConfig { BootstrapServers = "localhost:29092" };
2024-06-14 13:42:51 +00:00
string introText = "\nType a string of text then press Enter. Type '+' anywhere in the text to quit:\n";
var input = "";
using (var producer = new ProducerBuilder<string, DataModel>(config)
2024-06-14 14:48:52 +00:00
.SetValueSerializer(new JsonSerializer<DataModel>())
.Build())
2024-06-14 13:42:51 +00:00
{
Console.WriteLine(introText);
do
{
input = Console.ReadLine();
try
2024-06-14 13:42:51 +00:00
{
if (input != String.Empty)
2024-06-14 13:42:51 +00:00
{
SendMessage(producer, input);
2024-06-14 13:42:51 +00:00
}
}
catch (OverflowException e)
{
Console.WriteLine("An error has occurred", e.Message, input);
Console.WriteLine(introText);
}
} while (input != "+");
}
}
2024-06-14 13:42:51 +00:00
private static async void SendMessage(IProducer<String, DataModel> producer, string str)
{
try
{
var msg = new Message<string, DataModel>
2024-06-14 13:42:51 +00:00
{
Key = System.Guid.NewGuid().ToString(),
Value = new DataModel(System.Guid.NewGuid().ToString())
{
message = str
}
};
var dr = await producer.ProduceAsync(TOPIC_NAME, msg);
Console.WriteLine($"Delivered message with id '{dr.Value.id}'");
}
catch (ProduceException<string, DataModel> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
2024-06-14 13:42:51 +00:00
}
}
}