Working communication via kafka

This commit is contained in:
Brandon Watson 2024-06-14 09:48:52 -05:00
parent d0f1e5bc22
commit 947bf35cf6
6 changed files with 120 additions and 4 deletions

52
consumer/Consumer.cs Normal file
View File

@ -0,0 +1,52 @@
using System;
using System.Threading;
using Confluent.Kafka;
using Model;
using Serializers;
class Consumer
{
public static void Main(string[] args)
{
var conf = new ConsumerConfig{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:29092",
AutoOffsetReset = AutoOffsetReset.Latest
};
using (var consumer = new ConsumerBuilder<Ignore, DataModel>(conf)
.SetValueDeserializer(new JsonSerializer<DataModel>())
.Build())
{
consumer.Subscribe("test-topic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};
try
{
while(true)
{
try
{
var cr = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
consumer.Close();
}
}
}
}

View File

@ -0,0 +1,18 @@
namespace Model
{
public class DataModel
{
public DataModel(string id)
{
this.id = id;
}
public string id { get; set; }
public string? name { get; set; }
public string? description { get; set; }
public string? notes { get; set; }
}
}

View File

@ -1,2 +0,0 @@
// See https://aka.ms/new-console-template for more information
Console.WriteLine("Hello, World!");

View File

@ -0,0 +1,22 @@
using Confluent.Kafka;
using System.Text.Json;
namespace Serializers
{
public class JsonSerializer<T> : ISerializer<T>, IDeserializer<T>
{
public byte[] Serialize(T data, SerializationContext context)
{
return JsonSerializer.SerializeToUtf8Bytes(data);
}
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (!isNull)
{
return JsonSerializer.Deserialize<T>(data);
}
throw new Exception ("Data cannot be null");
}
}
}

View File

@ -1,15 +1,19 @@
using System;
using System.Text.Json;
using System.Threading.Tasks;
using Confluent.Kafka;
using Serializers;
using Model;
class Producer
{
public static async Task Main (string[] args)
{
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
var config = new ProducerConfig { BootstrapServers = "localhost:29092" };
using (var producerBuilder = new ProducerBuilder<string, DataModel>(config).Build())
using (var producerBuilder = new ProducerBuilder<string, DataModel>(config)
.SetValueSerializer(new JsonSerializer<DataModel>())
.Build())
{
try
{

View File

@ -0,0 +1,22 @@
using Confluent.Kafka;
using System.Text.Json;
namespace Serializers
{
public class JsonSerializer<T> : ISerializer<T>, IDeserializer<T>
{
public byte[] Serialize(T data, SerializationContext context)
{
return JsonSerializer.SerializeToUtf8Bytes(data);
}
public T Deserialize(ReadOnlySpan<byte> data, bool isNull, SerializationContext context)
{
if (!isNull)
{
return JsonSerializer.Deserialize<T>(data);
}
throw new Exception ("Data cannot be null");
}
}
}