diff --git a/consumer/Consumer.cs b/consumer/Consumer.cs index c478b9b..fb4b3ff 100644 --- a/consumer/Consumer.cs +++ b/consumer/Consumer.cs @@ -2,6 +2,7 @@ using System.Threading; using Confluent.Kafka; using Model; +using Repository; using Serializers; class Consumer @@ -14,6 +15,8 @@ class Consumer AutoOffsetReset = AutoOffsetReset.Latest }; + DataRepository dataRepository = new DataRepository("mongodb://mongo:mongo@localhost:27017", "mongo"); + using (var consumer = new ConsumerBuilder(conf) .SetValueDeserializer(new JsonSerializer()) .Build()) @@ -33,6 +36,7 @@ class Consumer try { var cr = consumer.Consume(cts.Token); + dataRepository.Save(cr.Message.Value); Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}' at: '{cr.TopicPartitionOffset}'."); } diff --git a/consumer/Repository/DataRepository.cs b/consumer/Repository/DataRepository.cs new file mode 100644 index 0000000..81692ba --- /dev/null +++ b/consumer/Repository/DataRepository.cs @@ -0,0 +1,37 @@ +using Model; +using MongoDB.Driver; + +namespace Repository +{ + public class DataRepository + { + private static string COLLECTION_NAME = "data"; + private string _connectionString = "mongodb://mongo:mongo@localhost:27017"; + private string _databaseName = "mongo"; + private IMongoCollection _dataCollection; + private MongoClient _mongoClient; + + public DataRepository(string connectionString, string databaseName) + { + this._connectionString = connectionString; + this._databaseName = databaseName; + this._mongoClient = new MongoClient(this._connectionString); + + var db = this._mongoClient.GetDatabase(this._databaseName); + this._dataCollection = db.GetCollection(COLLECTION_NAME); + } + + public DataRepository() + { + this._mongoClient = new MongoClient(this._connectionString); + + var db = this._mongoClient.GetDatabase(this._databaseName); + this._dataCollection = db.GetCollection(COLLECTION_NAME); + } + + public async void Save(DataModel data) + { + await this._dataCollection.InsertOneAsync(data); + } + } +} \ No newline at end of file