Working data repository for mongo

This commit is contained in:
Brandon Watson 2024-06-14 10:27:10 -05:00
parent 947bf35cf6
commit 1ce55a583a
2 changed files with 41 additions and 0 deletions

View File

@ -2,6 +2,7 @@
using System.Threading; using System.Threading;
using Confluent.Kafka; using Confluent.Kafka;
using Model; using Model;
using Repository;
using Serializers; using Serializers;
class Consumer class Consumer
@ -14,6 +15,8 @@ class Consumer
AutoOffsetReset = AutoOffsetReset.Latest AutoOffsetReset = AutoOffsetReset.Latest
}; };
DataRepository dataRepository = new DataRepository("mongodb://mongo:mongo@localhost:27017", "mongo");
using (var consumer = new ConsumerBuilder<Ignore, DataModel>(conf) using (var consumer = new ConsumerBuilder<Ignore, DataModel>(conf)
.SetValueDeserializer(new JsonSerializer<DataModel>()) .SetValueDeserializer(new JsonSerializer<DataModel>())
.Build()) .Build())
@ -33,6 +36,7 @@ class Consumer
try try
{ {
var cr = consumer.Consume(cts.Token); var cr = consumer.Consume(cts.Token);
dataRepository.Save(cr.Message.Value);
Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}' at: '{cr.TopicPartitionOffset}'."); Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}' at: '{cr.TopicPartitionOffset}'.");
} }

View File

@ -0,0 +1,37 @@
using Model;
using MongoDB.Driver;
namespace Repository
{
public class DataRepository
{
private static string COLLECTION_NAME = "data";
private string _connectionString = "mongodb://mongo:mongo@localhost:27017";
private string _databaseName = "mongo";
private IMongoCollection<DataModel> _dataCollection;
private MongoClient _mongoClient;
public DataRepository(string connectionString, string databaseName)
{
this._connectionString = connectionString;
this._databaseName = databaseName;
this._mongoClient = new MongoClient(this._connectionString);
var db = this._mongoClient.GetDatabase(this._databaseName);
this._dataCollection = db.GetCollection<DataModel>(COLLECTION_NAME);
}
public DataRepository()
{
this._mongoClient = new MongoClient(this._connectionString);
var db = this._mongoClient.GetDatabase(this._databaseName);
this._dataCollection = db.GetCollection<DataModel>(COLLECTION_NAME);
}
public async void Save(DataModel data)
{
await this._dataCollection.InsertOneAsync(data);
}
}
}