using System.Text.Json; using Confluent.Kafka; using Model; using Serializers; namespace Publisher { public class DataPublisher { private string _topic; private string _bootstrapServers; private IProducer _producer; public DataPublisher(string topic, string bootstrapServers) { this._topic = topic; this._bootstrapServers = bootstrapServers; var config = new ProducerConfig { BootstrapServers = this._bootstrapServers }; this._producer = new ProducerBuilder(config) .SetValueSerializer(new JsonSerializer()) .Build(); } public async Task Publish(DataModel data) { try { var msg = new Message { Key = System.Guid.NewGuid().ToString(), Value = data }; var dr = await this._producer.ProduceAsync(this._topic, msg); Console.WriteLine($"Delivered message with id '{dr.Value.id}'"); } catch (ProduceException e) { Console.WriteLine($"Delivery failed: {e.Error.Reason}"); } } } }