Compare commits

...

3 Commits

Author SHA1 Message Date
5e3f417c55 WIP 2024-06-14 16:06:24 -05:00
e75c10d6f0 Adding test containers with kafka and mongo 2024-06-14 12:04:51 -05:00
4570c43453 Changing data model | allowing multiple messages to be sent via producer 2024-06-14 11:08:48 -05:00
11 changed files with 288 additions and 63 deletions

View File

@ -9,6 +9,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "consumer", "consumer\consum
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "shared", "shared\shared.csproj", "{6958D7C2-23D8-4383-9F03-6395DE86FD63}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "integration-test", "integration-test\integration-test.csproj", "{5EE89BB0-1679-4301-817C-A22C5B400C2A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -30,5 +32,9 @@ Global
{6958D7C2-23D8-4383-9F03-6395DE86FD63}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6958D7C2-23D8-4383-9F03-6395DE86FD63}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6958D7C2-23D8-4383-9F03-6395DE86FD63}.Release|Any CPU.Build.0 = Release|Any CPU
{5EE89BB0-1679-4301-817C-A22C5B400C2A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5EE89BB0-1679-4301-817C-A22C5B400C2A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5EE89BB0-1679-4301-817C-A22C5B400C2A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5EE89BB0-1679-4301-817C-A22C5B400C2A}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal

View File

@ -4,53 +4,17 @@ using Confluent.Kafka;
using Model;
using Repository;
using Serializers;
using Subscriber;
class Consumer
{
public static void Main(string[] args)
public static async Task Main(string[] args)
{
var conf = new ConsumerConfig{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:29092",
AutoOffsetReset = AutoOffsetReset.Latest
};
DataRepository dataRepository = new DataRepository("mongodb://mongo:mongo@localhost:27017", "mongo");
using (var consumer = new ConsumerBuilder<Ignore, DataModel>(conf)
.SetValueDeserializer(new JsonSerializer<DataModel>())
.Build())
{
consumer.Subscribe("test-topic");
DataSubscriber dataSubscriber = new DataSubscriber("test-topic", "localhost:29092", dataRepository);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
try
{
while(true)
{
try
{
var cr = consumer.Consume(cts.Token);
dataRepository.Save(cr.Message.Value);
Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}
await dataSubscriber.Subscribe();
}
}

View File

@ -1,4 +1,5 @@
using Model;
using MongoDB.Bson;
using MongoDB.Driver;
namespace Repository
@ -29,9 +30,15 @@ namespace Repository
this._dataCollection = db.GetCollection<DataModel>(COLLECTION_NAME);
}
public async void Save(DataModel data)
public async Task Save(DataModel data)
{
await this._dataCollection.InsertOneAsync(data);
}
public async Task<DataModel> FindById(string id)
{
var idFilter = Builders<DataModel>.Filter.Eq(data => data.id, id);
return await this._dataCollection.Find(idFilter).FirstOrDefaultAsync();
}
}
}

View File

@ -0,0 +1,68 @@
using Confluent.Kafka;
using Model;
using Repository;
using Serializers;
namespace Subscriber
{
public class DataSubscriber
{
private string _topic;
private string _bootstrapServers;
private DataRepository _dataRepository;
private IConsumer<Ignore, DataModel> _consumer;
public DataSubscriber(string topic, string bootstrapServers, DataRepository dataRepository)
{
this._topic = topic;
this._bootstrapServers = bootstrapServers;
this._dataRepository = dataRepository;
var conf = new ConsumerConfig{
GroupId = "test-consumer-group",
BootstrapServers = this._bootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest
};
this._consumer = new ConsumerBuilder<Ignore, DataModel>(conf)
.SetValueDeserializer(new JsonSerializer<DataModel>())
.Build();
}
public Task Subscribe()
{
this._consumer.Subscribe(_topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
return Task.Run(async () =>
{
try
{
while(true)
{
try
{
var cr = this._consumer.Consume(cts.Token);
await this._dataRepository.Save(cr.Message.Value);
Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}'. Saving value to database");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
this._consumer.Close();
}
});
}
}
}

View File

@ -0,0 +1 @@
global using Xunit;

View File

@ -0,0 +1,87 @@
using DotNet.Testcontainers.Builders;
using Testcontainers.Kafka;
using Testcontainers.MongoDb;
using Publisher;
using Subscriber;
using Repository;
using Model;
using MongoDB.Bson;
namespace Integration;
public sealed class IntegrationTests : IAsyncLifetime
{
private readonly MongoDbContainer _mongoDbContainer = new MongoDbBuilder()
.Build();
private readonly KafkaContainer _kafkaContainer = new KafkaBuilder()
.WithImage("confluentinc/cp-kafka:latest")
.Build();
private DataPublisher _dataPublisher;
private DataSubscriber _dataSubscriber;
private DataRepository _dataRepository;
public Task DisposeAsync()
{
Task[] tasks = new Task[]
{
_mongoDbContainer.DisposeAsync().AsTask(),
_kafkaContainer.DisposeAsync().AsTask()
};
return Task.WhenAll(tasks);
}
public Task InitializeAsync()
{
Task[] tasks = new Task[] {
_mongoDbContainer.StartAsync(),
_kafkaContainer.StartAsync()
};
return Task.WhenAll(tasks);
}
[Theory]
[InlineData("Test Message")]
[InlineData("12345")]
[InlineData("Hello\n")]
[InlineData("<xml></xml>")]
public async void ShouldSendAndProcessMessage(string msg)
{
// Arrange
await setup();
var id = ObjectId.GenerateNewId().ToString();
DataModel data = new DataModel(id.ToString())
{
message = msg
};
// Act
await this._dataPublisher.Publish(data);
// Assert
var storedData = await this._dataRepository.FindById(id.ToString());
Assert.Equal(id.ToString(), storedData.id);
Assert.Equal(msg, storedData.message);
}
private async Task setup()
{
this._dataRepository = new DataRepository(_mongoDbContainer.GetConnectionString(), "mongo");
var topic = "test-topic";
this._dataPublisher = new DataPublisher(topic, _kafkaContainer.GetBootstrapAddress());
await this._dataPublisher.Publish(new DataModel(ObjectId.GenerateNewId().ToString())
{
message = "Test message to setup topic"
});
this._dataSubscriber = new DataSubscriber(topic, _kafkaContainer.GetBootstrapAddress(), this._dataRepository);
// Not awaited to allow test to end
this._dataSubscriber.Subscribe();
}
}

View File

@ -0,0 +1,35 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<RootNamespace>integration_test</RootNamespace>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<IsPackable>false</IsPackable>
<IsTestProject>true</IsTestProject>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.6.0" />
<PackageReference Include="MongoDB.Driver" Version="2.26.0" />
<PackageReference Include="Testcontainers" Version="3.8.0" />
<PackageReference Include="Testcontainers.Kafka" Version="3.8.0" />
<PackageReference Include="Testcontainers.MongoDb" Version="3.8.0" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="6.0.0">
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
<PrivateAssets>all</PrivateAssets>
</PackageReference>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\consumer\consumer.csproj" />
<ProjectReference Include="..\producer\producer.csproj" />
</ItemGroup>
</Project>

View File

@ -4,38 +4,43 @@ using System.Threading.Tasks;
using Confluent.Kafka;
using Serializers;
using Model;
using Publisher;
class Producer
{
public static string TOPIC_NAME = "test-topic";
public static async Task Main (string[] args)
{
var config = new ProducerConfig { BootstrapServers = "localhost:29092" };
string bootstrapServers = "localhost:29092";
var DataPublisher = new DataPublisher(TOPIC_NAME, bootstrapServers);
using (var producerBuilder = new ProducerBuilder<string, DataModel>(config)
.SetValueSerializer(new JsonSerializer<DataModel>())
.Build())
string introText = "\nType a string of text then press Enter. Type '+' anywhere in the text to quit:\n";
var input = "";
Console.WriteLine(introText);
do
{
input = Console.ReadLine();
try
{
var msg = new Message<string, DataModel>
if (input != String.Empty)
{
Key = System.Guid.NewGuid().ToString(),
Value = new DataModel(System.Guid.NewGuid().ToString())
var data = new DataModel()
{
name = "Name",
description = "Description",
notes = "This is a test object"
}
};
message = input
};
await DataPublisher.Publish(data);
}
var dr = await producerBuilder.ProduceAsync("test-topic", msg);
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<string, DataModel> e)
catch (OverflowException e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
Console.WriteLine("An error has occurred", e.Message, input);
Console.WriteLine(introText);
}
}
while (input != "+");
}
}

View File

@ -0,0 +1,45 @@
using System.Text.Json;
using Confluent.Kafka;
using Model;
using Serializers;
namespace Publisher
{
public class DataPublisher
{
private string _topic;
private string _bootstrapServers;
private IProducer<string, DataModel> _producer;
public DataPublisher(string topic, string bootstrapServers)
{
this._topic = topic;
this._bootstrapServers = bootstrapServers;
var config = new ProducerConfig { BootstrapServers = this._bootstrapServers };
this._producer = new ProducerBuilder<string, DataModel>(config)
.SetValueSerializer(new JsonSerializer<DataModel>())
.Build();
}
public async Task Publish(DataModel data)
{
try
{
var msg = new Message<string, DataModel>
{
Key = System.Guid.NewGuid().ToString(),
Value = data
};
var dr = await this._producer.ProduceAsync(this._topic, msg);
Console.WriteLine($"Delivered message with id '{dr.Value.id}'");
}
catch (ProduceException<string, DataModel> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}

View File

@ -1,3 +1,6 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
namespace Model
{
public class DataModel
@ -7,12 +10,15 @@ namespace Model
this.id = id;
}
public DataModel()
{
this.id = ObjectId.GenerateNewId().ToString();
}
[BsonId]
public string id { get; set; }
public string? name { get; set; }
public string? description { get; set; }
public string? notes { get; set; }
[BsonElement("message")]
public string? message { get; set; }
}
}

View File

@ -8,6 +8,7 @@
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.4.0" />
<PackageReference Include="MongoDB.Driver" Version="2.26.0" />
</ItemGroup>
</Project>