diff --git a/consumer/Consumer.cs b/consumer/Consumer.cs new file mode 100644 index 0000000..c478b9b --- /dev/null +++ b/consumer/Consumer.cs @@ -0,0 +1,52 @@ +using System; +using System.Threading; +using Confluent.Kafka; +using Model; +using Serializers; + +class Consumer +{ + public static void Main(string[] args) + { + var conf = new ConsumerConfig{ + GroupId = "test-consumer-group", + BootstrapServers = "localhost:29092", + AutoOffsetReset = AutoOffsetReset.Latest + }; + + using (var consumer = new ConsumerBuilder(conf) + .SetValueDeserializer(new JsonSerializer()) + .Build()) + { + consumer.Subscribe("test-topic"); + + CancellationTokenSource cts = new CancellationTokenSource(); + Console.CancelKeyPress += (_, e) => { + e.Cancel = true; + cts.Cancel(); + }; + + try + { + while(true) + { + try + { + var cr = consumer.Consume(cts.Token); + + Console.WriteLine($"Consumed message with id '{cr.Message.Value.id}' at: '{cr.TopicPartitionOffset}'."); + } + catch (ConsumeException e) + { + Console.WriteLine($"Error occured: {e.Error.Reason}"); + } + } + } + catch (OperationCanceledException) + { + // Ensure the consumer leaves the group cleanly and final offsets are committed. + consumer.Close(); + } + } + } +} diff --git a/consumer/Model/DataModel.cs b/consumer/Model/DataModel.cs new file mode 100644 index 0000000..7e473f3 --- /dev/null +++ b/consumer/Model/DataModel.cs @@ -0,0 +1,18 @@ +namespace Model +{ + public class DataModel + { + public DataModel(string id) + { + this.id = id; + } + + public string id { get; set; } + + public string? name { get; set; } + + public string? description { get; set; } + + public string? notes { get; set; } + } +} \ No newline at end of file diff --git a/consumer/Program.cs b/consumer/Program.cs deleted file mode 100644 index 3751555..0000000 --- a/consumer/Program.cs +++ /dev/null @@ -1,2 +0,0 @@ -// See https://aka.ms/new-console-template for more information -Console.WriteLine("Hello, World!"); diff --git a/consumer/Serializer/JsonSerializer.cs b/consumer/Serializer/JsonSerializer.cs new file mode 100644 index 0000000..f2ac15c --- /dev/null +++ b/consumer/Serializer/JsonSerializer.cs @@ -0,0 +1,22 @@ +using Confluent.Kafka; +using System.Text.Json; + +namespace Serializers +{ + public class JsonSerializer : ISerializer, IDeserializer + { + public byte[] Serialize(T data, SerializationContext context) + { + return JsonSerializer.SerializeToUtf8Bytes(data); + } + + public T Deserialize(ReadOnlySpan data, bool isNull, SerializationContext context) + { + if (!isNull) + { + return JsonSerializer.Deserialize(data); + } + throw new Exception ("Data cannot be null"); + } + } +} \ No newline at end of file diff --git a/producer/Producer.cs b/producer/Producer.cs index 910efbd..ba4c9e8 100644 --- a/producer/Producer.cs +++ b/producer/Producer.cs @@ -1,15 +1,19 @@ using System; +using System.Text.Json; using System.Threading.Tasks; using Confluent.Kafka; +using Serializers; using Model; class Producer { public static async Task Main (string[] args) { - var config = new ProducerConfig { BootstrapServers = "localhost:9092" }; + var config = new ProducerConfig { BootstrapServers = "localhost:29092" }; - using (var producerBuilder = new ProducerBuilder(config).Build()) + using (var producerBuilder = new ProducerBuilder(config) + .SetValueSerializer(new JsonSerializer()) + .Build()) { try { diff --git a/producer/Serializer/JsonSerializer.cs b/producer/Serializer/JsonSerializer.cs new file mode 100644 index 0000000..f2ac15c --- /dev/null +++ b/producer/Serializer/JsonSerializer.cs @@ -0,0 +1,22 @@ +using Confluent.Kafka; +using System.Text.Json; + +namespace Serializers +{ + public class JsonSerializer : ISerializer, IDeserializer + { + public byte[] Serialize(T data, SerializationContext context) + { + return JsonSerializer.SerializeToUtf8Bytes(data); + } + + public T Deserialize(ReadOnlySpan data, bool isNull, SerializationContext context) + { + if (!isNull) + { + return JsonSerializer.Deserialize(data); + } + throw new Exception ("Data cannot be null"); + } + } +} \ No newline at end of file