Apache Kafka for .NET - 1.0
Description
This extension provides support for Apache Kafka .NET APIs (see Links) which are responsible for consumer and Producer operations on the cluster.
In what situation should you install this extension?
If your C# application utilizes Apache Kafka cluster for messaging-related operations and the producer and consumer are handled using the C# language, and you want to modelize the producer and consumer links with appropriate objects and links, then you should install this extension.
Technology Libraries
| Library | Version | Supported |
|---|---|---|
| Confluent.Kafka | 1.0.0 to 2.11.1 | ✅ |
| DotNetCore.CAP.Kafka | 2.0.0 to 8.4.0 | ✅ |
| MassTransit.Kafka | 7.0.0 to 8.5.2 | ✅ |
| Streamiz.Kafka.Net | 1.0.0 to 1.4.2 | ✅ |
Function Point, Quality and Sizing support
- Function Points (transactions): a green tick indicates that OMG Function Point counting and Transaction Risk Index are supported
- Quality and Sizing: a green tick indicates that CAST can measure size and that a minimum set of Quality Rules exist
| Function Points (transactions) | Quality and Sizing |
|---|---|
| ✅ | ❌ |
Dependencies with other extensions
Some CAST extensions require the presence of other CAST extensions in order to function correctly. The .NET Kafka extension requires that the following other CAST extensions are also installed (this will be managed automatically by CAST Console):
- CAST AIP Internal extension (internal technical extension)
- Web Services Linker
Download and installation instructions
The extension will not be automatically downloaded and installed in CAST Console. If you need to use it, you should manually install the extension using the Application - Extensions interface.
What results can you expect?
Objects
| Icon | Description | Comment |
|---|---|---|
![]() |
DotNet Kafka Topic Producer | an object is created when message to send to kafka broker |
![]() |
DotNet Kafka Topic Consumer | an object is created when message to send from kafka broker |
![]() |
DotNet Kafka Unknown Topic Producer | For every C# method, an object is created when topic name cannot be resolved while sending message to kafka broker |
![]() |
DotNet Kafka Unknown Topic Consumer | For every C# method, an object is created when topic name cannot be resolved while receiving message from kafka broker |
Links
| Link Type | Source and Destination of link | Supported APIs |
|---|---|---|
| callLink | callLink between the caller C# method and the DotNet Kafka Topic Producer / DotNet Kafka Unknown Topic Producer object | Producer API’s support by .Net Kafka: Confluent.Kafka.IProducer.Produce Confluent.Kafka.IProducer.ProduceAsync DotNetCore.CAP.ICapPublisher.Publish DotNetCore.CAP.ICapPublisher.PublishAsync DotNetCore.CAP.ICapPublisher.PublishDelay DotNetCore.CAP.ICapPublisher.PublishDelayAsync MassTransit.KafkaProducerRegistrationExtensions.AddProducer MassTransit.KafkaConfiguratorExtensions.TopicProducer Streamiz.Kafka.Net.Stream.IKStream.To |
| callLink | callLink between the DotNet Kafka Topic Consumer / DotNet Kafka Unknown Topic Consumer object and the caller C# method | Consumer API’s support by .Net Kafka: Confluent.Kafka.IConsumer.Consume MassTransit.KafkaConfiguratorExtensions.TopicEndpoint |
Example code scenarios
Producer APIs
Producer
.NET Producer
class Program
{
static void ProducerServer(string[] args)
{
Producer2();
}
public static async Task Producer2()
{
var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
p.Produce("Topic-2", new Message<Null, string> { Value = "test" });
}
}
}

AsyncProducer
.NET Producer
class Program
{
static void ProducerServer1(string[] args)
{
Producer1("Topic-1");
}
public static async Task Producer1(string topic)
{
var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = "test" });
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
}
}

Unknown Producer
.NET Producer
class consumer
{
public void KafkaProducer(string topic)
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
}
}

Reading from JSON files
Producer Code
public async Task PublishMessage(EvenementAthena evenement)
{
TopicPrefix = _configuration["KafkaData:TopicPrefix"];
TopicSuffix = _configuration["KafkaData:TopicSuffix"];
string topicName = $"{TopicPrefix}.Dataset.{TopicSuffix}";
string key = $"{evenement.Id}-{evenement.NumeroVersion}";
using (var producer = new ProducerBuilder<string, string>(KafkaProviderConfiguration.BrokerClientConfig).Build())
{
var dr = await producer.ProduceAsync(topicName, new Message<string, string> {Key= key, Value = evenement.Data.ToString() });
Log.Info($"produced to: {dr.TopicPartitionOffset}");
}
}
appsettings.json
{
"KafkaData": {
"TopicPrefix": "TechProduct",
"TopicSuffix": "Devices",
},
}

Consumer APIs
Consumer
.NET Consumer
class consumer1
{
static void consumerServer1(string[] args)
{
KafkaConsumer1();
}
public static void KafkaConsumer1()
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("Topic-1");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
while (true)
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
}
}
}
}

Unknown Consumer
.NET Consumer
class consumer
{
public void KafkaProducer(string topic)
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
while (true)
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
}
}
}
}

.NET Producer and Consumer
.NET Producer Consumer
Producer
class Program
{
static void Mainmethod(string[] args)
{
asyncProduceDOtNet("Topic2");
}
public static async Task asyncProduceDOtNet(string topic)
{
var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = "test" });
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
}
}
Consumer
class consumer
{
public void KafkaProducer()
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("Topic2");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
while (true)
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
}
}
}
}

Cross Technology Java/.NET
Cross Technology
Java Producer
ublic class JavaProducer {
public void produce() {
BasicConfigurator.configure();
Properties properties=new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String,String> first_producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record=new ProducerRecord<String, String>("Topic-CrossTech-java-dotNet", "Hye Kafka");
first_producer.send(record);
first_producer.flush();
first_producer.close();
}
}
.NET Consumer
class consumer
{
public void KafkaProducer()
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("Topic-CrossTech-java-dotNet");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
while (true)
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
}
}
}

Stream APIs
Kafka Streaming
.NET Streaming
namespace sample_stream_demo
{
class Program
{
static async Task Main(string[] args)
{
string inputTopic = "input-stream";
string outputTopic = "output-stream";
int numberPartitions = 4;
await CreateTopics(inputTopic, outputTopic, numberPartitions);
var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "sample-streamiz-demo";
config.BootstrapServers = "localhost:9092";
config.AutoOffsetReset = AutoOffsetReset.Earliest;
StreamBuilder builder = new StreamBuilder();
builder.Stream<string, string>(inputTopic)
.FlatMapValues((v) => v.Split(" ").AsEnumerable())
.GroupBy((k, v) => v)
.Count()
.ToStream()
.MapValues((v) => v.ToString())
.To(outputTopic);
Topology t = builder.Build();
KafkaStream stream = new KafkaStream(t, config);
Console.CancelKeyPress += (o, e) => stream.Dispose();
await stream.StartAsync();
}
private static async Task CreateTopics(string input, string output, int numberPartitions)
{
AdminClientConfig config = new AdminClientConfig();
config.BootstrapServers = "localhost:9092";
AdminClientBuilder builder = new AdminClientBuilder(config);
var client = builder.Build();
await client.CreateTopicsAsync(new List<TopicSpecification>
{
new TopicSpecification() {Name = input, NumPartitions = numberPartitions},
new TopicSpecification() {Name = output, NumPartitions = numberPartitions},
});
}
}
}


Limitations
- Objects will not be created if the evaluation fails to resolve the necessary parameter.



