45 lines
1.3 KiB
C#
45 lines
1.3 KiB
C#
|
using System.Text.Json;
|
||
|
using Confluent.Kafka;
|
||
|
using Model;
|
||
|
using Serializers;
|
||
|
|
||
|
namespace Publisher
|
||
|
{
|
||
|
public class DataPublisher
|
||
|
{
|
||
|
private string _topic;
|
||
|
private string _bootstrapServers;
|
||
|
private IProducer<string, DataModel> _producer;
|
||
|
|
||
|
public DataPublisher(string topic, string bootstrapServers)
|
||
|
{
|
||
|
this._topic = topic;
|
||
|
this._bootstrapServers = bootstrapServers;
|
||
|
|
||
|
var config = new ProducerConfig { BootstrapServers = this._bootstrapServers };
|
||
|
this._producer = new ProducerBuilder<string, DataModel>(config)
|
||
|
.SetValueSerializer(new JsonSerializer<DataModel>())
|
||
|
.Build();
|
||
|
|
||
|
}
|
||
|
|
||
|
public async Task Publish(DataModel data)
|
||
|
{
|
||
|
try
|
||
|
{
|
||
|
var msg = new Message<string, DataModel>
|
||
|
{
|
||
|
Key = System.Guid.NewGuid().ToString(),
|
||
|
Value = data
|
||
|
};
|
||
|
|
||
|
var dr = await this._producer.ProduceAsync(this._topic, msg);
|
||
|
Console.WriteLine($"Delivered message with id '{dr.Value.id}'");
|
||
|
}
|
||
|
catch (ProduceException<string, DataModel> e)
|
||
|
{
|
||
|
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|