using Confluent.Kafka; using Model; using Repository; using Serializers; namespace Subscriber { public class DataSubscriber { private string _topic; private string _bootstrapServers; private DataRepository _dataRepository; private IConsumer _consumer; public DataSubscriber(string topic, string bootstrapServers, DataRepository dataRepository) { this._topic = topic; this._bootstrapServers = bootstrapServers; this._dataRepository = dataRepository; var conf = new ConsumerConfig{ GroupId = "test-consumer-group", BootstrapServers = this._bootstrapServers, AutoOffsetReset = AutoOffsetReset.Earliest }; this._consumer = new ConsumerBuilder(conf) .SetValueDeserializer(new JsonSerializer()) .Build(); } public Task Subscribe() { this._consumer.Subscribe(_topic); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; return Task.Run(async () => { try { while(true) { try { var cr = this._consumer.Consume(cts.Token); await this._dataRepository.Save(cr.Message.Value); Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}'. Saving value to database"); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. this._consumer.Close(); } }); } } }