automated-testing-demo/producer/Publisher/DataPublisher.cs
2024-06-14 16:06:24 -05:00

45 lines
1.3 KiB
C#

using System.Text.Json;
using Confluent.Kafka;
using Model;
using Serializers;
namespace Publisher
{
public class DataPublisher
{
private string _topic;
private string _bootstrapServers;
private IProducer<string, DataModel> _producer;
public DataPublisher(string topic, string bootstrapServers)
{
this._topic = topic;
this._bootstrapServers = bootstrapServers;
var config = new ProducerConfig { BootstrapServers = this._bootstrapServers };
this._producer = new ProducerBuilder<string, DataModel>(config)
.SetValueSerializer(new JsonSerializer<DataModel>())
.Build();
}
public async Task Publish(DataModel data)
{
try
{
var msg = new Message<string, DataModel>
{
Key = System.Guid.NewGuid().ToString(),
Value = data
};
var dr = await this._producer.ProduceAsync(this._topic, msg);
Console.WriteLine($"Delivered message with id '{dr.Value.id}'");
}
catch (ProduceException<string, DataModel> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}