87 lines
2.5 KiB
C#
87 lines
2.5 KiB
C#
using DotNet.Testcontainers.Builders;
|
|
using Testcontainers.Kafka;
|
|
using Testcontainers.MongoDb;
|
|
using Publisher;
|
|
using Subscriber;
|
|
using Repository;
|
|
using Model;
|
|
using MongoDB.Bson;
|
|
|
|
namespace Integration;
|
|
|
|
public sealed class IntegrationTests : IAsyncLifetime
|
|
{
|
|
private readonly MongoDbContainer _mongoDbContainer = new MongoDbBuilder()
|
|
.Build();
|
|
private readonly KafkaContainer _kafkaContainer = new KafkaBuilder()
|
|
.WithImage("confluentinc/cp-kafka:latest")
|
|
.Build();
|
|
|
|
private DataPublisher _dataPublisher;
|
|
private DataSubscriber _dataSubscriber;
|
|
private DataRepository _dataRepository;
|
|
|
|
public Task DisposeAsync()
|
|
{
|
|
|
|
Task[] tasks = new Task[]
|
|
{
|
|
_mongoDbContainer.DisposeAsync().AsTask(),
|
|
_kafkaContainer.DisposeAsync().AsTask()
|
|
};
|
|
return Task.WhenAll(tasks);
|
|
|
|
}
|
|
|
|
public Task InitializeAsync()
|
|
{
|
|
Task[] tasks = new Task[] {
|
|
_mongoDbContainer.StartAsync(),
|
|
_kafkaContainer.StartAsync()
|
|
};
|
|
|
|
return Task.WhenAll(tasks);
|
|
}
|
|
|
|
[Theory]
|
|
[InlineData("Test Message")]
|
|
[InlineData("12345")]
|
|
[InlineData("Hello\n")]
|
|
[InlineData("<xml></xml>")]
|
|
public async void ShouldSendAndProcessMessage(string msg)
|
|
{
|
|
// Arrange
|
|
await setup();
|
|
|
|
var id = ObjectId.GenerateNewId().ToString();
|
|
DataModel data = new DataModel(id.ToString())
|
|
{
|
|
message = msg
|
|
};
|
|
|
|
// Act
|
|
await this._dataPublisher.Publish(data);
|
|
|
|
// Assert
|
|
var storedData = await this._dataRepository.FindById(id.ToString());
|
|
|
|
Assert.Equal(id.ToString(), storedData.id);
|
|
Assert.Equal(msg, storedData.message);
|
|
}
|
|
|
|
private async Task setup()
|
|
{
|
|
this._dataRepository = new DataRepository(_mongoDbContainer.GetConnectionString(), "mongo");
|
|
|
|
var topic = "test-topic";
|
|
this._dataPublisher = new DataPublisher(topic, _kafkaContainer.GetBootstrapAddress());
|
|
await this._dataPublisher.Publish(new DataModel(ObjectId.GenerateNewId().ToString())
|
|
{
|
|
message = "Test message to setup topic"
|
|
});
|
|
this._dataSubscriber = new DataSubscriber(topic, _kafkaContainer.GetBootstrapAddress(), this._dataRepository);
|
|
|
|
// Not awaited to allow test to end
|
|
this._dataSubscriber.Subscribe();
|
|
}
|
|
} |