automated-testing-demo/producer/Producer.cs

65 lines
1.9 KiB
C#

using System;
using System.Text.Json;
using System.Threading.Tasks;
using Confluent.Kafka;
using Serializers;
using Model;
class Producer
{
public static string TOPIC_NAME = "test-topic";
public static async Task Main (string[] args)
{
var config = new ProducerConfig { BootstrapServers = "localhost:29092" };
string introText = "\nType a string of text then press Enter. Type '+' anywhere in the text to quit:\n";
var input = "";
using (var producer = new ProducerBuilder<string, DataModel>(config)
.SetValueSerializer(new JsonSerializer<DataModel>())
.Build())
{
Console.WriteLine(introText);
do
{
input = Console.ReadLine();
try
{
if (input != String.Empty)
{
SendMessage(producer, input);
}
}
catch (OverflowException e)
{
Console.WriteLine("An error has occurred", e.Message, input);
Console.WriteLine(introText);
}
} while (input != "+");
}
}
private static async void SendMessage(IProducer<String, DataModel> producer, string str)
{
try
{
var msg = new Message<string, DataModel>
{
Key = System.Guid.NewGuid().ToString(),
Value = new DataModel(System.Guid.NewGuid().ToString())
{
message = str
}
};
var dr = await producer.ProduceAsync(TOPIC_NAME, msg);
Console.WriteLine($"Delivered message with id '{dr.Value.id}'");
}
catch (ProduceException<string, DataModel> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}