This documentation is not maintained. Please refer to doc.castsoftware.com/technologies to find the latest updates.

Extension ID

com.castsoftware.dotnet.kafka

What's new?

See Apache Kafka for .NET - 1.0 - Release Notes for more information.

Description

This extension provides support for Apache Kafka .NET APIs (see Links) which are responsible for consumer and publisher operations on the cluster.

In what situation should you install this extension?

If your C# application utilizes Apache Kafka cluster for messaging-related operations and the producer and consumer are handled using the C# language, and you want to modelize the producer and consumer links with appropriate objects and links, then you should install this extension.

Technology Libraries

Library

Version

Supported

Confluent.Kafkaup to: 2.1.1(tick)
DotNetCore.CAP.Kafkaup to: 7.1.4(tick)
MassTransit.Kafkaup to: 8.0.16(tick)
Streamiz.Kafka.Netup to: 1.4.2(tick)

AIP Core compatibility

AIP Core release

Supported

8.3.x(tick)

Supported DBMS servers

DBMS

Supported

CSS/PostgreSQL      (tick)

Dependencies with other extensions

  • CAST AIP Internal extension (internal technical extension)
  • Web Services Linker

Download and installation instructions

The extension will not be automatically downloaded and installed in CAST Console. If you need to use it, you should manually install the extension using the Application - Extensions interface:

What results can you expect?

Once the analysis/snapshot generation has been completed, you can view the results in the normal manner. The following objects and  links will be displayed in CAST Imaging:

Objects

The following objects are generated:

Icon

Description

DotNet Kafka Topic Publisher

DotNet Kafka Topic Receiver

DotNet Kafka Unknown Topic Publisher

DotNet Kafka Unknown Topic Receiver

       Link Type

Source and Destination of link

             Supported APIs

callLinkcallLink between the caller C# method and the DotNet Kafka Topic Publisher object
Producer API's support by .Net Kafka
  • Confluent.Kafka.IProducer.Produce
  • Confluent.Kafka.IProducer.ProduceAsync
  • DotNetCore.CAP.ICapPublisher.Publish
  • DotNetCore.CAP.ICapPublisher.PublishAsync
  • DotNetCore.CAP.ICapPublisher.PublishDelay
  • DotNetCore.CAP.ICapPublisher.PublishDelayAsync
  • MassTransit.KafkaProducerRegistrationExtensions.AddProducer
  • MassTransit.KafkaConfiguratorExtensions.TopicProducer
  • Streamiz.Kafka.Net.Stream.IKStream.To
callLink

callLink between the DotNet Kafka Topic Receiver object and the caller  C# method 

Consumer API's support by .Net Kafka
  • Confluent.Kafka.IConsumer.Consume
  • MassTransit.KafkaConfiguratorExtensions.TopicEndpoint

Example code scenarios

Publisher APIs

Producer
.NET Producer
using System;
using System.Threading.Tasks;
using Confluent.Kafka;

class Program
{

    static void ProducerServer(string[] args)
    {
       
        Producer2();


    }
    
    public static async Task Producer2()
    {
        var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };

        using (var p = new ProducerBuilder<Null, string>(config).Build())
        {
            try
            {
               p.Produce("Topic-2", new Message<Null, string> { Value = "test" });
               
            }
            catch (ProduceException<Null, string> e)
            {
                Console.WriteLine($"Delivery failed: {e.Error.Reason}");
            }
        }
    }

    
}

AsyncProducer
.NET Producer
using System;
using System.Threading.Tasks;
using Confluent.Kafka;

class Program
{

    static void ProducerServer1(string[] args)
    {
        Producer1("Topic-1");

    }
    public static async Task Producer1(string topic)
    {
        var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };

        // If serializers are not specified, default serializers from
        // `Confluent.Kafka.Serializers` will be automatically used where
        // available. Note: by default strings are encoded as UTF8.
        using (var p = new ProducerBuilder<Null, string>(config).Build())
        {
            try
            {
                var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = "test" });
                Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
            }
            catch (ProduceException<Null, string> e)
            {
                Console.WriteLine($"Delivery failed: {e.Error.Reason}");
            }
        }
    }

}

Unknown Producer
.NET Producer
using System;
using System.Threading;
using Confluent.Kafka;

class consumer
{
    public void KafkaProducer(string topic)
    {
        var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe(topic);

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}

Reading from JSON files
Producer Code
using Confluent.Kafka;
using Microsoft.Extensions.Logging;

public async Task PublishMessage(EvenementAthena evenement)
        {
            TopicPrefix = _configuration["KafkaData:TopicPrefix"];
            TopicSuffix = _configuration["KafkaData:TopicSuffix"];
            string topicName = $"{TopicPrefix}.Dataset.{TopicSuffix}";
            string key = $"{evenement.Id}-{evenement.NumeroVersion}";
   
            using (var producer = new ProducerBuilder<string, string>(KafkaProviderConfiguration.BrokerClientConfig).Build())
            {
                try
                {
                    var dr = await producer.ProduceAsync(topicName, new Message<string, string> {Key= key, Value = evenement.Data.ToString() });
                    Log.Info($"produced to: {dr.TopicPartitionOffset}");
                }
                catch (ProduceException<string, GenericRecord> ex)
                {
                    Log.Error($"error producing message: {ex}");
                    throw new KafkaRetriableException(ex.Error);
                }         
            }
         
        }
  
appsettings.json
{
 "KafkaData": {
    "TopicPrefix": "TechProduct",
    "TopicSuffix": "Devices",
  },
} 

Receiver APIs

Consumer
.NET Consumer
using System;
using System.Threading;
using Confluent.Kafka;

class consumer1
{

    static void consumerServer1(string[] args)
    {
        KafkaConsumer1();

    }
    public static void KafkaConsumer1()
    {
        var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe("Topic-1");

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }

    
}

Unknown Consumer
.NET Consumer
using System;
using System.Threading;
using Confluent.Kafka;

class consumer
{
    public void KafkaProducer(string topic)
    {
        var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe(topic);

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}

.NET Publisher and Receiver

.NET Producer Consumer
Producer
using System;
using System.Threading.Tasks;
using Confluent.Kafka;

class Program
{

    static void Mainmethod(string[] args)
    {
        asyncProduceDOtNet("Topic2");

    }
    public static async Task asyncProduceDOtNet(string topic)
    {
        var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };
        using (var p = new ProducerBuilder<Null, string>(config).Build())
        {
            try
            {
                var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = "test" });
                Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
            }
            catch (ProduceException<Null, string> e)
            {
                Console.WriteLine($"Delivery failed: {e.Error.Reason}");
            }
        }
    }
}
Consumer
using System;
using System.Threading;
using Confluent.Kafka;

class consumer
{
    public void KafkaProducer()
    {
        var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe("Topic2");

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}

Cross Technology Java/.NET

Cross Technology
Java Producer
package com.springcore.com.kafka.producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.BasicConfigurator;

public class JavaProducer {
	
	public void produce() {
		BasicConfigurator.configure();
		
		Properties properties=new Properties();
		
		properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
		properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
		
		KafkaProducer<String,String> first_producer = new KafkaProducer<String, String>(properties);  
		
		ProducerRecord<String, String> record=new ProducerRecord<String, String>("Topic-CrossTech-java-dotNet", "Hye Kafka");  
		
		first_producer.send(record);  
		first_producer.flush();
		first_producer.close();
	}

}
.NET Consumer
using System;
using System.Threading;
using Confluent.Kafka;

class consumer
{
    public void KafkaProducer()
    {
        var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };

        using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
        {
            c.Subscribe("Topic-CrossTech-java-dotNet");

            CancellationTokenSource cts = new CancellationTokenSource();
            Console.CancelKeyPress += (_, e) => {
                e.Cancel = true; // prevent the process from terminating.
                cts.Cancel();
            };

            try
            {
                while (true)
                {
                    try
                    {
                        var cr = c.Consume(cts.Token);
                        Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
                    }
                    catch (ConsumeException e)
                    {
                        Console.WriteLine($"Error occured: {e.Error.Reason}");
                    }
                }
            }
            catch (OperationCanceledException)
            {
                // Ensure the consumer leaves the group cleanly and final offsets are committed.
                c.Close();
            }
        }
    }
}

Stream APIs

Kafka Streaming
.NET Streaming
using System;
using System.Threading;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Prometheus;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.Stream;

namespace sample_stream_demo
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string inputTopic = "input-stream";
            string outputTopic = "output-stream";
            int numberPartitions = 4;

            await CreateTopics(inputTopic, outputTopic, numberPartitions);

            var config = new StreamConfig<StringSerDes, StringSerDes>();
            config.ApplicationId = "sample-streamiz-demo";
            config.BootstrapServers = "localhost:9092";
            config.AutoOffsetReset = AutoOffsetReset.Earliest;
            config.StateDir = Path.Combine(".");
            config.CommitIntervalMs = 5000;
            config.Guarantee = ProcessingGuarantee.AT_LEAST_ONCE;
            config.MetricsRecording = MetricsRecordingLevel.DEBUG;
            config.UsePrometheusReporter(9090, true);

            StreamBuilder builder = new StreamBuilder();

            builder.Stream<string, string>(inputTopic)
                .FlatMapValues((v) => v.Split(" ").AsEnumerable())
                .GroupBy((k, v) => v)
                .Count()
                .ToStream()
                .MapValues((v) => v.ToString())
                .To(outputTopic);

            Topology t = builder.Build();
            KafkaStream stream = new KafkaStream(t, config);

            Console.CancelKeyPress += (o, e) => stream.Dispose();

            await stream.StartAsync();
        }

        private static async Task CreateTopics(string input, string output, int numberPartitions)
        {
            AdminClientConfig config = new AdminClientConfig();
            config.BootstrapServers = "localhost:9092";

            AdminClientBuilder builder = new AdminClientBuilder(config);
            var client = builder.Build();
            try
            {
                await client.CreateTopicsAsync(new List<TopicSpecification>
                {
                    new TopicSpecification() {Name = input, NumPartitions = numberPartitions},
                    new TopicSpecification() {Name = output, NumPartitions = numberPartitions},
                });
            }
            catch (Exception e)
            {
                // do nothing in case of topic already exist
            }
            finally
            {
                client.Dispose();
            }
        }
    }
}
  

Limitations

  • Objects will not be created if the evaluation fails to resolve the necessary parameter.