using DotNet.Testcontainers.Builders; using Testcontainers.Kafka; using Testcontainers.MongoDb; using Publisher; using Subscriber; using Repository; using Model; using MongoDB.Bson; namespace Integration; public sealed class IntegrationTests : IAsyncLifetime { private readonly MongoDbContainer _mongoDbContainer = new MongoDbBuilder() .Build(); private readonly KafkaContainer _kafkaContainer = new KafkaBuilder() .WithImage("confluentinc/cp-kafka:latest") .Build(); private DataPublisher _dataPublisher; private DataSubscriber _dataSubscriber; private DataRepository _dataRepository; public Task DisposeAsync() { Task[] tasks = new Task[] { _mongoDbContainer.DisposeAsync().AsTask(), _kafkaContainer.DisposeAsync().AsTask() }; return Task.WhenAll(tasks); } public Task InitializeAsync() { Task[] tasks = new Task[] { _mongoDbContainer.StartAsync(), _kafkaContainer.StartAsync() }; return Task.WhenAll(tasks); } [Theory] [InlineData("Test Message")] [InlineData("12345")] [InlineData("Hello\n")] [InlineData("")] public async void ShouldSendAndProcessMessage(string msg) { // Arrange await setup(); var id = ObjectId.GenerateNewId().ToString(); DataModel data = new DataModel(id.ToString()) { message = msg }; // Act await this._dataPublisher.Publish(data); // Assert var storedData = await this._dataRepository.FindById(id.ToString()); Assert.Equal(id.ToString(), storedData.id); Assert.Equal(msg, storedData.message); } private async Task setup() { this._dataRepository = new DataRepository(_mongoDbContainer.GetConnectionString(), "mongo"); var topic = "test-topic"; this._dataPublisher = new DataPublisher(topic, _kafkaContainer.GetBootstrapAddress()); await this._dataPublisher.Publish(new DataModel(ObjectId.GenerateNewId().ToString()) { message = "Test message to setup topic" }); this._dataSubscriber = new DataSubscriber(topic, _kafkaContainer.GetBootstrapAddress(), this._dataRepository); // Not awaited to allow test to end this._dataSubscriber.Subscribe(); } }