Apache Kafka for .NET - 1.0
Extension ID
com.castsoftware.dotnet.kafka
What’s new?
See Apache Kafka for .NET - 1.0 - Release Notes for more information.
Description
This extension provides support for Apache Kafka .NET APIs (see Links) which are responsible for consumer and Producer operations on the cluster.
In what situation should you install this extension?
If your C# application utilizes Apache Kafka cluster for messaging-related operations and the producer and consumer are handled using the C# language, and you want to modelize the producer and consumer links with appropriate objects and links, then you should install this extension.
Technology Libraries
Library | Version | Supported |
---|---|---|
Confluent.Kafka | up to: 2.1.1 | ✅ |
DotNetCore.CAP.Kafka | up to: 7.1.4 | ✅ |
MassTransit.Kafka | up to: 8.0.16 | ✅ |
Streamiz.Kafka.Net | up to: 1.4.2 | ✅ |
Compatibility
Core release | Operating System | Supported |
---|---|---|
8.4.x | Microsoft Windows / Linux | ✅ |
8.3.x | Microsoft Windows | ✅ |
Dependencies with other extensions
Some CAST extensions require the presence of other CAST extensions in order to function correctly. The .NET Kafka extension requires that the following other CAST extensions are also installed (this will be managed automatically by CAST Console):
- CAST AIP Internal extension (internal technical extension)
- Web Services Linker
Download and installation instructions
The extension will not be automatically downloaded and installed in CAST Console. If you need to use it, you should manually install the extension using the Application - Extensions interface.
What results can you expect?
Objects
Icon | Description |
---|---|
DotNet Kafka Topic Producer | |
DotNet Kafka Topic Consumer | |
DotNet Kafka Unknown Topic Producer | |
DotNet Kafka Unknown Topic Consumer |
Links
Link Type | Source and Destination of link | Supported APIs |
---|---|---|
callLink | callLink between the caller C# method and the DotNet Kafka Topic Producer object | Producer API’s support by .Net Kafka: Confluent.Kafka.IProducer.Produce Confluent.Kafka.IProducer.ProduceAsync DotNetCore.CAP.ICapPublisher.Publish DotNetCore.CAP.ICapPublisher.PublishAsync DotNetCore.CAP.ICapPublisher.PublishDelay DotNetCore.CAP.ICapPublisher.PublishDelayAsync MassTransit.KafkaProducerRegistrationExtensions.AddProducer MassTransit.KafkaConfiguratorExtensions.TopicProducer Streamiz.Kafka.Net.Stream.IKStream.To |
callLink | callLink between the DotNet Kafka Topic Consumer object and the caller C# method | Consumer API’s support by .Net Kafka: Confluent.Kafka.IConsumer.Consume MassTransit.KafkaConfiguratorExtensions.TopicEndpoint |
Example code scenarios
Producer APIs
Producer
.NET Producer
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
class Program
{
static void ProducerServer(string[] args)
{
Producer2();
}
public static async Task Producer2()
{
var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
p.Produce("Topic-2", new Message<Null, string> { Value = "test" });
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
AsyncProducer
.NET Producer
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
class Program
{
static void ProducerServer1(string[] args)
{
Producer1("Topic-1");
}
public static async Task Producer1(string topic)
{
var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };
// If serializers are not specified, default serializers from
// `Confluent.Kafka.Serializers` will be automatically used where
// available. Note: by default strings are encoded as UTF8.
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = "test" });
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
Unknown Producer
.NET Producer
using System;
using System.Threading;
using Confluent.Kafka;
class consumer
{
public void KafkaProducer(string topic)
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
}
Reading from JSON files
Producer Code
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
public async Task PublishMessage(EvenementAthena evenement)
{
TopicPrefix = _configuration["KafkaData:TopicPrefix"];
TopicSuffix = _configuration["KafkaData:TopicSuffix"];
string topicName = $"{TopicPrefix}.Dataset.{TopicSuffix}";
string key = $"{evenement.Id}-{evenement.NumeroVersion}";
using (var producer = new ProducerBuilder<string, string>(KafkaProviderConfiguration.BrokerClientConfig).Build())
{
try
{
var dr = await producer.ProduceAsync(topicName, new Message<string, string> {Key= key, Value = evenement.Data.ToString() });
Log.Info($"produced to: {dr.TopicPartitionOffset}");
}
catch (ProduceException<string, GenericRecord> ex)
{
Log.Error($"error producing message: {ex}");
throw new KafkaRetriableException(ex.Error);
}
}
}
appsettings.json
{
"KafkaData": {
"TopicPrefix": "TechProduct",
"TopicSuffix": "Devices",
},
}
Consumer APIs
Consumer
.NET Consumer
using System;
using System.Threading;
using Confluent.Kafka;
class consumer1
{
static void consumerServer1(string[] args)
{
KafkaConsumer1();
}
public static void KafkaConsumer1()
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("Topic-1");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
}
Unknown Consumer
.NET Consumer
using System;
using System.Threading;
using Confluent.Kafka;
class consumer
{
public void KafkaProducer(string topic)
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
}
.NET Producer and Consumer
.NET Producer Consumer
Producer
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
class Program
{
static void Mainmethod(string[] args)
{
asyncProduceDOtNet("Topic2");
}
public static async Task asyncProduceDOtNet(string topic)
{
var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = "test" });
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
Consumer
using System;
using System.Threading;
using Confluent.Kafka;
class consumer
{
public void KafkaProducer()
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("Topic2");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
}
Cross Technology Java/.NET
Cross Technology
Java Producer
package com.springcore.com.kafka.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.BasicConfigurator;
public class JavaProducer {
public void produce() {
BasicConfigurator.configure();
Properties properties=new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String,String> first_producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record=new ProducerRecord<String, String>("Topic-CrossTech-java-dotNet", "Hye Kafka");
first_producer.send(record);
first_producer.flush();
first_producer.close();
}
}
.NET Consumer
using System;
using System.Threading;
using Confluent.Kafka;
class consumer
{
public void KafkaProducer()
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("Topic-CrossTech-java-dotNet");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
}
Stream APIs
Kafka Streaming
.NET Streaming
using System;
using System.Threading;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Prometheus;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.Stream;
namespace sample_stream_demo
{
class Program
{
static async Task Main(string[] args)
{
string inputTopic = "input-stream";
string outputTopic = "output-stream";
int numberPartitions = 4;
await CreateTopics(inputTopic, outputTopic, numberPartitions);
var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "sample-streamiz-demo";
config.BootstrapServers = "localhost:9092";
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.StateDir = Path.Combine(".");
config.CommitIntervalMs = 5000;
config.Guarantee = ProcessingGuarantee.AT_LEAST_ONCE;
config.MetricsRecording = MetricsRecordingLevel.DEBUG;
config.UsePrometheusReporter(9090, true);
StreamBuilder builder = new StreamBuilder();
builder.Stream<string, string>(inputTopic)
.FlatMapValues((v) => v.Split(" ").AsEnumerable())
.GroupBy((k, v) => v)
.Count()
.ToStream()
.MapValues((v) => v.ToString())
.To(outputTopic);
Topology t = builder.Build();
KafkaStream stream = new KafkaStream(t, config);
Console.CancelKeyPress += (o, e) => stream.Dispose();
await stream.StartAsync();
}
private static async Task CreateTopics(string input, string output, int numberPartitions)
{
AdminClientConfig config = new AdminClientConfig();
config.BootstrapServers = "localhost:9092";
AdminClientBuilder builder = new AdminClientBuilder(config);
var client = builder.Build();
try
{
await client.CreateTopicsAsync(new List<TopicSpecification>
{
new TopicSpecification() {Name = input, NumPartitions = numberPartitions},
new TopicSpecification() {Name = output, NumPartitions = numberPartitions},
});
}
catch (Exception e)
{
// do nothing in case of topic already exist
}
finally
{
client.Dispose();
}
}
}
}
Limitations
- Objects will not be created if the evaluation fails to resolve the necessary parameter.