Apache Kafka for .NET - 1.0

Extension ID

com.castsoftware.dotnet.kafka

What’s new?

See Apache Kafka for .NET - 1.0 - Release Notes  for more information.

Description

This extension provides support for Apache Kafka .NET APIs (see Links) which are responsible for consumer and publisher operations on the cluster.

In what situation should you install this extension?

If your C# application utilizes Apache Kafka cluster for messaging-related operations and the producer and consumer are handled using the C# language, and you want to modelize the producer and consumer links with appropriate objects and links, then you should install this extension.

Technology Libraries

Library Version Supported
Confluent.Kafka up to: 2.1.1 (tick)
DotNetCore.CAP.Kafka up to: 7.1.4 (tick)
MassTransit.Kafka up to: 8.0.16 (tick)
Streamiz.Kafka.Net up to: 1.4.2 (tick)

AIP Core compatibility

AIP Core release Supported
8.3.x (tick)

Dependencies with other extensions

Some CAST extensions require the presence of other CAST extensions in order to function correctly. The .NET Kafka extension requires that the following other CAST extensions are also installed (this will be managed automatically by CAST Console):

  • CAST AIP Internal extension (internal technical extension)
  • Web Services Linker

Download and installation instructions

The extension will not be automatically downloaded and installed in CAST Console. If you need to use it, you should manually install the extension using the Application - Extensions interface:

What results can you expect?

Once the analysis/snapshot generation has been completed, you can view the results in the normal manner. The following objects and  links will be displayed in CAST Imaging:

Objects

Icon Description
DotNet Kafka Topic Publisher
DotNet Kafka Topic Receiver
DotNet Kafka Unknown Topic Publisher
DotNet Kafka Unknown Topic Receiver
Link Type Source and Destination of link Supported APIs
callLink callLink between the caller C# method and the DotNet Kafka Topic Publisher object Producer API’s support by .Net Kafka:

Confluent.Kafka.IProducer.Produce
Confluent.Kafka.IProducer.ProduceAsync
DotNetCore.CAP.ICapPublisher.Publish
DotNetCore.CAP.ICapPublisher.PublishAsync
DotNetCore.CAP.ICapPublisher.PublishDelay
DotNetCore.CAP.ICapPublisher.PublishDelayAsync
MassTransit.KafkaProducerRegistrationExtensions.AddProducer
MassTransit.KafkaConfiguratorExtensions.TopicProducer
Streamiz.Kafka.Net.Stream.IKStream.To
callLink callLink between the DotNet Kafka Topic Receiver object and the caller C# method Consumer API’s support by .Net Kafka:

Confluent.Kafka.IConsumer.Consume
MassTransit.KafkaConfiguratorExtensions.TopicEndpoint

Example code scenarios

Publisher APIs

Producer

.NET Producer

using System;
using System.Threading.Tasks;
using Confluent.Kafka;

class Program
{

    static void ProducerServer(string[] args)
    {
       
        Producer2();


    }
    
    public static async Task Producer2()
    {
        var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };

        using (var p = new ProducerBuilder<Null, string>(config).Build())
        {
            try
            {
               p.Produce("Topic-2", new Message<Null, string> { Value = "test" });
               
            }
            catch (ProduceException<Null, string> e)
            {
                Console.WriteLine($"Delivery failed: {e.Error.Reason}");
            }
        }
    }

    
}

AsyncProducer

.NET Producer

using System;
using System.Threading.Tasks;
using Confluent.Kafka;

class Program
{

    static void ProducerServer1(string[] args)
    {
        Producer1("Topic-1");

    }
    public static async Task Producer1(string topic)
    {
        var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };

        // If serializers are not specified, default serializers from
        // `Confluent.Kafka.Serializers` will be automatically used where
        // available. Note: by default strings are encoded as UTF8.
        using (var p = new ProducerBuilder<Null, string>(config).Build())
        {
            try
            {
                var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = "test" });
                Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
            }
            catch (ProduceException<Null, string> e)
            {
                Console.WriteLine($"Delivery failed: {e.Error.Reason}");
            }
        }
    }

}

Unknown Producer

.NET Producer

using System;
using System.Threading;
using Confluent.Kafka;

class consumer
{
    public void KafkaProducer(string topic)
    {
        var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe(topic);

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}

Reading from JSON files

Producer Code

using Confluent.Kafka;
using Microsoft.Extensions.Logging;

public async Task PublishMessage(EvenementAthena evenement)
        {
            TopicPrefix = _configuration["KafkaData:TopicPrefix"];
            TopicSuffix = _configuration["KafkaData:TopicSuffix"];
            string topicName = $"{TopicPrefix}.Dataset.{TopicSuffix}";
            string key = $"{evenement.Id}-{evenement.NumeroVersion}";
   
            using (var producer = new ProducerBuilder<string, string>(KafkaProviderConfiguration.BrokerClientConfig).Build())
            {
                try
                {
                    var dr = await producer.ProduceAsync(topicName, new Message<string, string> {Key= key, Value = evenement.Data.ToString() });
                    Log.Info($"produced to: {dr.TopicPartitionOffset}");
                }
                catch (ProduceException<string, GenericRecord> ex)
                {
                    Log.Error($"error producing message: {ex}");
                    throw new KafkaRetriableException(ex.Error);
                }         
            }
         
        }
  

appsettings.json

{
 "KafkaData": {
    "TopicPrefix": "TechProduct",
    "TopicSuffix": "Devices",
  },
} 

Receiver APIs

Consumer

.NET Consumer

using System;
using System.Threading;
using Confluent.Kafka;

class consumer1
{

    static void consumerServer1(string[] args)
    {
        KafkaConsumer1();

    }
    public static void KafkaConsumer1()
    {
        var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe("Topic-1");

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }

    
}

Unknown Consumer

.NET Consumer

using System;
using System.Threading;
using Confluent.Kafka;

class consumer
{
    public void KafkaProducer(string topic)
    {
        var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe(topic);

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}

.NET Publisher and Receiver

.NET Producer Consumer

Producer

using System;
using System.Threading.Tasks;
using Confluent.Kafka;

class Program
{

    static void Mainmethod(string[] args)
    {
        asyncProduceDOtNet("Topic2");

    }
    public static async Task asyncProduceDOtNet(string topic)
    {
        var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };
        using (var p = new ProducerBuilder<Null, string>(config).Build())
        {
            try
            {
                var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = "test" });
                Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
            }
            catch (ProduceException<Null, string> e)
            {
                Console.WriteLine($"Delivery failed: {e.Error.Reason}");
            }
        }
    }
}

Consumer

using System;
using System.Threading;
using Confluent.Kafka;

class consumer
{
    public void KafkaProducer()
    {
        var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe("Topic2");

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}

Cross Technology Java/.NET

Cross Technology

Java Producer

package com.springcore.com.kafka.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.BasicConfigurator;

public class JavaProducer {
    
    public void produce() {
        BasicConfigurator.configure();
        
        Properties properties=new Properties();
        
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        
        KafkaProducer<String,String> first_producer = new KafkaProducer<String, String>(properties);  
        
        ProducerRecord<String, String> record=new ProducerRecord<String, String>("Topic-CrossTech-java-dotNet", "Hye Kafka");  
        
        first_producer.send(record);  
        first_producer.flush();
        first_producer.close();
    }

}

.NET Consumer

using System;
using System.Threading;
using Confluent.Kafka;

class consumer
{
    public void KafkaProducer()
    {
        var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe("Topic-CrossTech-java-dotNet");

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}

Stream APIs

Kafka Streaming

.NET Streaming

using System;
using System.Threading;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Prometheus;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.Stream;

namespace sample_stream_demo
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string inputTopic = "input-stream";
            string outputTopic = "output-stream";
            int numberPartitions = 4;

            await CreateTopics(inputTopic, outputTopic, numberPartitions);

            var config = new StreamConfig<StringSerDes, StringSerDes>();
            config.ApplicationId = "sample-streamiz-demo";
            config.BootstrapServers = "localhost:9092";
            config.AutoOffsetReset = AutoOffsetReset.Earliest;
            config.StateDir = Path.Combine(".");
            config.CommitIntervalMs = 5000;
            config.Guarantee = ProcessingGuarantee.AT_LEAST_ONCE;
            config.MetricsRecording = MetricsRecordingLevel.DEBUG;
            config.UsePrometheusReporter(9090, true);

            StreamBuilder builder = new StreamBuilder();

            builder.Stream<string, string>(inputTopic)
                .FlatMapValues((v) => v.Split(" ").AsEnumerable())
                .GroupBy((k, v) => v)
                .Count()
                .ToStream()
                .MapValues((v) => v.ToString())
                .To(outputTopic);

            Topology t = builder.Build();
            KafkaStream stream = new KafkaStream(t, config);

            Console.CancelKeyPress += (o, e) => stream.Dispose();

            await stream.StartAsync();
        }

        private static async Task CreateTopics(string input, string output, int numberPartitions)
        {
            AdminClientConfig config = new AdminClientConfig();
            config.BootstrapServers = "localhost:9092";

            AdminClientBuilder builder = new AdminClientBuilder(config);
            var client = builder.Build();
            try
            {
                await client.CreateTopicsAsync(new List<TopicSpecification>
                {
                    new TopicSpecification() {Name = input, NumPartitions = numberPartitions},
                    new TopicSpecification() {Name = output, NumPartitions = numberPartitions},
                });
            }
            catch (Exception e)
            {
                // do nothing in case of topic already exist
            }
            finally
            {
                client.Dispose();
            }
        }
    }
}
  

Limitations

  • Objects will not be created if the evaluation fails to resolve the necessary parameter.