using System; using System.Threading; using Confluent.Kafka; using Model; using Repository; using Serializers; class Consumer { public static void Main(string[] args) { var conf = new ConsumerConfig{ GroupId = "test-consumer-group", BootstrapServers = "localhost:29092", AutoOffsetReset = AutoOffsetReset.Latest }; DataRepository dataRepository = new DataRepository("mongodb://mongo:mongo@localhost:27017", "mongo"); using (var consumer = new ConsumerBuilder(conf) .SetValueDeserializer(new JsonSerializer()) .Build()) { consumer.Subscribe("test-topic"); CancellationTokenSource cts = new CancellationTokenSource(); Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); }; try { while(true) { try { var cr = consumer.Consume(cts.Token); dataRepository.Save(cr.Message.Value); Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}' at: '{cr.TopicPartitionOffset}'."); } catch (ConsumeException e) { Console.WriteLine($"Error occured: {e.Error.Reason}"); } } } catch (OperationCanceledException) { // Ensure the consumer leaves the group cleanly and final offsets are committed. consumer.Close(); } } } }