diff --git a/consumer/Consumer.cs b/consumer/Consumer.cs index d714881..2398ca8 100644 --- a/consumer/Consumer.cs +++ b/consumer/Consumer.cs @@ -4,53 +4,17 @@ using Confluent.Kafka; using Model; using Repository; using Serializers; +using Subscriber; class Consumer { - public static void Main(string[] args) + public static async Task Main(string[] args) { - var conf = new ConsumerConfig{ - GroupId = "test-consumer-group", - BootstrapServers = "localhost:29092", - AutoOffsetReset = AutoOffsetReset.Latest - }; - DataRepository dataRepository = new DataRepository("mongodb://mongo:mongo@localhost:27017", "mongo"); - using (var consumer = new ConsumerBuilder(conf) - .SetValueDeserializer(new JsonSerializer()) - .Build()) - { - consumer.Subscribe("test-topic"); + DataSubscriber dataSubscriber = new DataSubscriber("test-topic", "localhost:29092", dataRepository); - CancellationTokenSource cts = new CancellationTokenSource(); - Console.CancelKeyPress += (_, e) => { - e.Cancel = true; - cts.Cancel(); - }; - try - { - while(true) - { - try - { - var cr = consumer.Consume(cts.Token); - dataRepository.Save(cr.Message.Value); - - Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}'. Saving value to database"); - } - catch (ConsumeException e) - { - Console.WriteLine($"Error occured: {e.Error.Reason}"); - } - } - } - catch (OperationCanceledException) - { - // Ensure the consumer leaves the group cleanly and final offsets are committed. - consumer.Close(); - } - } + await dataSubscriber.Subscribe(); } } diff --git a/consumer/Repository/DataRepository.cs b/consumer/Repository/DataRepository.cs index 81692ba..a498a67 100644 --- a/consumer/Repository/DataRepository.cs +++ b/consumer/Repository/DataRepository.cs @@ -1,4 +1,5 @@ using Model; +using MongoDB.Bson; using MongoDB.Driver; namespace Repository @@ -29,9 +30,15 @@ namespace Repository this._dataCollection = db.GetCollection(COLLECTION_NAME); } - public async void Save(DataModel data) + public async Task Save(DataModel data) { await this._dataCollection.InsertOneAsync(data); } + + public async Task FindById(string id) + { + var idFilter = Builders.Filter.Eq(data => data.id, id); + return await this._dataCollection.Find(idFilter).FirstOrDefaultAsync(); + } } } \ No newline at end of file diff --git a/consumer/Subscriber/DataSubscriber.cs b/consumer/Subscriber/DataSubscriber.cs new file mode 100644 index 0000000..70931c5 --- /dev/null +++ b/consumer/Subscriber/DataSubscriber.cs @@ -0,0 +1,68 @@ +using Confluent.Kafka; +using Model; +using Repository; +using Serializers; + +namespace Subscriber +{ + public class DataSubscriber + { + private string _topic; + private string _bootstrapServers; + private DataRepository _dataRepository; + private IConsumer _consumer; + + public DataSubscriber(string topic, string bootstrapServers, DataRepository dataRepository) + { + this._topic = topic; + this._bootstrapServers = bootstrapServers; + this._dataRepository = dataRepository; + + var conf = new ConsumerConfig{ + GroupId = "test-consumer-group", + BootstrapServers = this._bootstrapServers, + AutoOffsetReset = AutoOffsetReset.Earliest + }; + + this._consumer = new ConsumerBuilder(conf) + .SetValueDeserializer(new JsonSerializer()) + .Build(); + } + + public Task Subscribe() + { + this._consumer.Subscribe(_topic); + CancellationTokenSource cts = new CancellationTokenSource(); + Console.CancelKeyPress += (_, e) => { + e.Cancel = true; + cts.Cancel(); + }; + return Task.Run(async () => + { + try + { + while(true) + { + try + { + var cr = this._consumer.Consume(cts.Token); + await this._dataRepository.Save(cr.Message.Value); + + Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}'. Saving value to database"); + } + catch (ConsumeException e) + { + Console.WriteLine($"Error occured: {e.Error.Reason}"); + } + } + } + catch (OperationCanceledException) + { + // Ensure the consumer leaves the group cleanly and final offsets are committed. + this._consumer.Close(); + } + }); + + } + } +} \ No newline at end of file diff --git a/integration-test/IntegrationTests.cs b/integration-test/IntegrationTests.cs index 9ccad39..b856c8d 100644 --- a/integration-test/IntegrationTests.cs +++ b/integration-test/IntegrationTests.cs @@ -1,16 +1,29 @@ using DotNet.Testcontainers.Builders; using Testcontainers.Kafka; using Testcontainers.MongoDb; +using Publisher; +using Subscriber; +using Repository; +using Model; +using MongoDB.Bson; -namespace integration_test; +namespace Integration; -public sealed class UnitTest1 : IAsyncLifetime +public sealed class IntegrationTests : IAsyncLifetime { - private readonly MongoDbContainer _mongoDbContainer = new MongoDbBuilder().Build(); - private readonly KafkaContainer _kafkaContainer = new KafkaBuilder().WithImage("confluentinc/cp-kafka:latest").Build(); + private readonly MongoDbContainer _mongoDbContainer = new MongoDbBuilder() + .Build(); + private readonly KafkaContainer _kafkaContainer = new KafkaBuilder() + .WithImage("confluentinc/cp-kafka:latest") + .Build(); + + private DataPublisher _dataPublisher; + private DataSubscriber _dataSubscriber; + private DataRepository _dataRepository; public Task DisposeAsync() { + Task[] tasks = new Task[] { _mongoDbContainer.DisposeAsync().AsTask(), @@ -26,13 +39,49 @@ public sealed class UnitTest1 : IAsyncLifetime _mongoDbContainer.StartAsync(), _kafkaContainer.StartAsync() }; + return Task.WhenAll(tasks); } - [Fact] - public void ShouldProcessMessage() + [Theory] + [InlineData("Test Message")] + [InlineData("12345")] + [InlineData("Hello\n")] + [InlineData("")] + public async void ShouldSendAndProcessMessage(string msg) { + // Arrange + await setup(); + + var id = ObjectId.GenerateNewId().ToString(); + DataModel data = new DataModel(id.ToString()) + { + message = msg + }; + + // Act + await this._dataPublisher.Publish(data); + // Assert + var storedData = await this._dataRepository.FindById(id.ToString()); + + Assert.Equal(id.ToString(), storedData.id); + Assert.Equal(msg, storedData.message); } + private async Task setup() + { + this._dataRepository = new DataRepository(_mongoDbContainer.GetConnectionString(), "mongo"); + + var topic = "test-topic"; + this._dataPublisher = new DataPublisher(topic, _kafkaContainer.GetBootstrapAddress()); + await this._dataPublisher.Publish(new DataModel(ObjectId.GenerateNewId().ToString()) + { + message = "Test message to setup topic" + }); + this._dataSubscriber = new DataSubscriber(topic, _kafkaContainer.GetBootstrapAddress(), this._dataRepository); + + // Not awaited to allow test to end + this._dataSubscriber.Subscribe(); + } } \ No newline at end of file diff --git a/producer/Producer.cs b/producer/Producer.cs index a2df803..dbb0f17 100644 --- a/producer/Producer.cs +++ b/producer/Producer.cs @@ -4,62 +4,43 @@ using System.Threading.Tasks; using Confluent.Kafka; using Serializers; using Model; +using Publisher; class Producer { public static string TOPIC_NAME = "test-topic"; public static async Task Main (string[] args) { - var config = new ProducerConfig { BootstrapServers = "localhost:29092" }; + string bootstrapServers = "localhost:29092"; + var DataPublisher = new DataPublisher(TOPIC_NAME, bootstrapServers); string introText = "\nType a string of text then press Enter. Type '+' anywhere in the text to quit:\n"; var input = ""; - using (var producer = new ProducerBuilder(config) - .SetValueSerializer(new JsonSerializer()) - .Build()) + Console.WriteLine(introText); + do { - - Console.WriteLine(introText); - do - { - input = Console.ReadLine(); - try - { - if (input != String.Empty) - { - SendMessage(producer, input); - } - - } - catch (OverflowException e) - { - Console.WriteLine("An error has occurred", e.Message, input); - Console.WriteLine(introText); - } - } while (input != "+"); - } - } - - private static async void SendMessage(IProducer producer, string str) - { - try - { - var msg = new Message + input = Console.ReadLine(); + try { - Key = System.Guid.NewGuid().ToString(), - Value = new DataModel(System.Guid.NewGuid().ToString()) + if (input != String.Empty) { - message = str - } - }; + var data = new DataModel() + { + message = input + }; - var dr = await producer.ProduceAsync(TOPIC_NAME, msg); - Console.WriteLine($"Delivered message with id '{dr.Value.id}'"); - } - catch (ProduceException e) - { - Console.WriteLine($"Delivery failed: {e.Error.Reason}"); - } + await DataPublisher.Publish(data); + } + + } + catch (OverflowException e) + { + Console.WriteLine("An error has occurred", e.Message, input); + Console.WriteLine(introText); + } + } + while (input != "+"); + } } \ No newline at end of file diff --git a/producer/Publisher/DataPublisher.cs b/producer/Publisher/DataPublisher.cs new file mode 100644 index 0000000..1feba71 --- /dev/null +++ b/producer/Publisher/DataPublisher.cs @@ -0,0 +1,45 @@ +using System.Text.Json; +using Confluent.Kafka; +using Model; +using Serializers; + +namespace Publisher +{ + public class DataPublisher + { + private string _topic; + private string _bootstrapServers; + private IProducer _producer; + + public DataPublisher(string topic, string bootstrapServers) + { + this._topic = topic; + this._bootstrapServers = bootstrapServers; + + var config = new ProducerConfig { BootstrapServers = this._bootstrapServers }; + this._producer = new ProducerBuilder(config) + .SetValueSerializer(new JsonSerializer()) + .Build(); + + } + + public async Task Publish(DataModel data) + { + try + { + var msg = new Message + { + Key = System.Guid.NewGuid().ToString(), + Value = data + }; + + var dr = await this._producer.ProduceAsync(this._topic, msg); + Console.WriteLine($"Delivered message with id '{dr.Value.id}'"); + } + catch (ProduceException e) + { + Console.WriteLine($"Delivery failed: {e.Error.Reason}"); + } + } + } +} \ No newline at end of file diff --git a/shared/Model/DataModel.cs b/shared/Model/DataModel.cs index 0f93887..74b92b5 100644 --- a/shared/Model/DataModel.cs +++ b/shared/Model/DataModel.cs @@ -1,3 +1,6 @@ +using MongoDB.Bson; +using MongoDB.Bson.Serialization.Attributes; + namespace Model { public class DataModel @@ -7,8 +10,15 @@ namespace Model this.id = id; } + public DataModel() + { + this.id = ObjectId.GenerateNewId().ToString(); + } + + [BsonId] public string id { get; set; } + [BsonElement("message")] public string? message { get; set; } } } \ No newline at end of file diff --git a/shared/shared.csproj b/shared/shared.csproj index fdab8e5..c85023f 100644 --- a/shared/shared.csproj +++ b/shared/shared.csproj @@ -8,6 +8,7 @@ +