This commit is contained in:
Brandon Watson 2024-06-14 16:06:24 -05:00
parent e75c10d6f0
commit 5e3f417c55
8 changed files with 215 additions and 90 deletions

View File

@ -4,53 +4,17 @@ using Confluent.Kafka;
using Model;
using Repository;
using Serializers;
using Subscriber;
class Consumer
{
public static void Main(string[] args)
public static async Task Main(string[] args)
{
var conf = new ConsumerConfig{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:29092",
AutoOffsetReset = AutoOffsetReset.Latest
};
DataRepository dataRepository = new DataRepository("mongodb://mongo:mongo@localhost:27017", "mongo");
using (var consumer = new ConsumerBuilder<Ignore, DataModel>(conf)
.SetValueDeserializer(new JsonSerializer<DataModel>())
.Build())
{
consumer.Subscribe("test-topic");
DataSubscriber dataSubscriber = new DataSubscriber("test-topic", "localhost:29092", dataRepository);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
try
{
while(true)
{
try
{
var cr = consumer.Consume(cts.Token);
dataRepository.Save(cr.Message.Value);
Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}'. Saving value to database");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}
await dataSubscriber.Subscribe();
}
}

View File

@ -1,4 +1,5 @@
using Model;
using MongoDB.Bson;
using MongoDB.Driver;
namespace Repository
@ -29,9 +30,15 @@ namespace Repository
this._dataCollection = db.GetCollection<DataModel>(COLLECTION_NAME);
}
public async void Save(DataModel data)
public async Task Save(DataModel data)
{
await this._dataCollection.InsertOneAsync(data);
}
public async Task<DataModel> FindById(string id)
{
var idFilter = Builders<DataModel>.Filter.Eq(data => data.id, id);
return await this._dataCollection.Find(idFilter).FirstOrDefaultAsync();
}
}
}

View File

@ -0,0 +1,68 @@
using Confluent.Kafka;
using Model;
using Repository;
using Serializers;
namespace Subscriber
{
public class DataSubscriber
{
private string _topic;
private string _bootstrapServers;
private DataRepository _dataRepository;
private IConsumer<Ignore, DataModel> _consumer;
public DataSubscriber(string topic, string bootstrapServers, DataRepository dataRepository)
{
this._topic = topic;
this._bootstrapServers = bootstrapServers;
this._dataRepository = dataRepository;
var conf = new ConsumerConfig{
GroupId = "test-consumer-group",
BootstrapServers = this._bootstrapServers,
AutoOffsetReset = AutoOffsetReset.Earliest
};
this._consumer = new ConsumerBuilder<Ignore, DataModel>(conf)
.SetValueDeserializer(new JsonSerializer<DataModel>())
.Build();
}
public Task Subscribe()
{
this._consumer.Subscribe(_topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
return Task.Run(async () =>
{
try
{
while(true)
{
try
{
var cr = this._consumer.Consume(cts.Token);
await this._dataRepository.Save(cr.Message.Value);
Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}'. Saving value to database");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
this._consumer.Close();
}
});
}
}
}

View File

@ -1,16 +1,29 @@
using DotNet.Testcontainers.Builders;
using Testcontainers.Kafka;
using Testcontainers.MongoDb;
using Publisher;
using Subscriber;
using Repository;
using Model;
using MongoDB.Bson;
namespace integration_test;
namespace Integration;
public sealed class UnitTest1 : IAsyncLifetime
public sealed class IntegrationTests : IAsyncLifetime
{
private readonly MongoDbContainer _mongoDbContainer = new MongoDbBuilder().Build();
private readonly KafkaContainer _kafkaContainer = new KafkaBuilder().WithImage("confluentinc/cp-kafka:latest").Build();
private readonly MongoDbContainer _mongoDbContainer = new MongoDbBuilder()
.Build();
private readonly KafkaContainer _kafkaContainer = new KafkaBuilder()
.WithImage("confluentinc/cp-kafka:latest")
.Build();
private DataPublisher _dataPublisher;
private DataSubscriber _dataSubscriber;
private DataRepository _dataRepository;
public Task DisposeAsync()
{
Task[] tasks = new Task[]
{
_mongoDbContainer.DisposeAsync().AsTask(),
@ -26,13 +39,49 @@ public sealed class UnitTest1 : IAsyncLifetime
_mongoDbContainer.StartAsync(),
_kafkaContainer.StartAsync()
};
return Task.WhenAll(tasks);
}
[Fact]
public void ShouldProcessMessage()
[Theory]
[InlineData("Test Message")]
[InlineData("12345")]
[InlineData("Hello\n")]
[InlineData("<xml></xml>")]
public async void ShouldSendAndProcessMessage(string msg)
{
// Arrange
await setup();
var id = ObjectId.GenerateNewId().ToString();
DataModel data = new DataModel(id.ToString())
{
message = msg
};
// Act
await this._dataPublisher.Publish(data);
// Assert
var storedData = await this._dataRepository.FindById(id.ToString());
Assert.Equal(id.ToString(), storedData.id);
Assert.Equal(msg, storedData.message);
}
private async Task setup()
{
this._dataRepository = new DataRepository(_mongoDbContainer.GetConnectionString(), "mongo");
var topic = "test-topic";
this._dataPublisher = new DataPublisher(topic, _kafkaContainer.GetBootstrapAddress());
await this._dataPublisher.Publish(new DataModel(ObjectId.GenerateNewId().ToString())
{
message = "Test message to setup topic"
});
this._dataSubscriber = new DataSubscriber(topic, _kafkaContainer.GetBootstrapAddress(), this._dataRepository);
// Not awaited to allow test to end
this._dataSubscriber.Subscribe();
}
}

View File

@ -4,62 +4,43 @@ using System.Threading.Tasks;
using Confluent.Kafka;
using Serializers;
using Model;
using Publisher;
class Producer
{
public static string TOPIC_NAME = "test-topic";
public static async Task Main (string[] args)
{
var config = new ProducerConfig { BootstrapServers = "localhost:29092" };
string bootstrapServers = "localhost:29092";
var DataPublisher = new DataPublisher(TOPIC_NAME, bootstrapServers);
string introText = "\nType a string of text then press Enter. Type '+' anywhere in the text to quit:\n";
var input = "";
using (var producer = new ProducerBuilder<string, DataModel>(config)
.SetValueSerializer(new JsonSerializer<DataModel>())
.Build())
Console.WriteLine(introText);
do
{
Console.WriteLine(introText);
do
{
input = Console.ReadLine();
try
{
if (input != String.Empty)
{
SendMessage(producer, input);
}
}
catch (OverflowException e)
{
Console.WriteLine("An error has occurred", e.Message, input);
Console.WriteLine(introText);
}
} while (input != "+");
}
}
private static async void SendMessage(IProducer<String, DataModel> producer, string str)
{
try
{
var msg = new Message<string, DataModel>
input = Console.ReadLine();
try
{
Key = System.Guid.NewGuid().ToString(),
Value = new DataModel(System.Guid.NewGuid().ToString())
if (input != String.Empty)
{
message = str
}
};
var data = new DataModel()
{
message = input
};
var dr = await producer.ProduceAsync(TOPIC_NAME, msg);
Console.WriteLine($"Delivered message with id '{dr.Value.id}'");
}
catch (ProduceException<string, DataModel> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
await DataPublisher.Publish(data);
}
}
catch (OverflowException e)
{
Console.WriteLine("An error has occurred", e.Message, input);
Console.WriteLine(introText);
}
}
while (input != "+");
}
}

View File

@ -0,0 +1,45 @@
using System.Text.Json;
using Confluent.Kafka;
using Model;
using Serializers;
namespace Publisher
{
public class DataPublisher
{
private string _topic;
private string _bootstrapServers;
private IProducer<string, DataModel> _producer;
public DataPublisher(string topic, string bootstrapServers)
{
this._topic = topic;
this._bootstrapServers = bootstrapServers;
var config = new ProducerConfig { BootstrapServers = this._bootstrapServers };
this._producer = new ProducerBuilder<string, DataModel>(config)
.SetValueSerializer(new JsonSerializer<DataModel>())
.Build();
}
public async Task Publish(DataModel data)
{
try
{
var msg = new Message<string, DataModel>
{
Key = System.Guid.NewGuid().ToString(),
Value = data
};
var dr = await this._producer.ProduceAsync(this._topic, msg);
Console.WriteLine($"Delivered message with id '{dr.Value.id}'");
}
catch (ProduceException<string, DataModel> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}

View File

@ -1,3 +1,6 @@
using MongoDB.Bson;
using MongoDB.Bson.Serialization.Attributes;
namespace Model
{
public class DataModel
@ -7,8 +10,15 @@ namespace Model
this.id = id;
}
public DataModel()
{
this.id = ObjectId.GenerateNewId().ToString();
}
[BsonId]
public string id { get; set; }
[BsonElement("message")]
public string? message { get; set; }
}
}

View File

@ -8,6 +8,7 @@
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="2.4.0" />
<PackageReference Include="MongoDB.Driver" Version="2.26.0" />
</ItemGroup>
</Project>