Created by Shared Doc User on Sep 29, 2023
Extension ID
com.castsoftware.dotnet.kafka
What's new?
See Apache Kafka for .NET - 1.0 - Release Notes for more information.
Description
This extension provides support for Apache Kafka .NET APIs (see Links) which are responsible for consumer and publisher operations on the cluster.
In what situation should you install this extension?
If your C# application utilizes Apache Kafka cluster for messaging-related operations and the producer and consumer are handled using the C# language, and you want to modelize the producer and consumer links with appropriate objects and links, then you should install this extension.
Technology Libraries
AIP Core compatibility
AIP Core release | Supported |
---|
8.3.x | |
Supported DBMS servers
DBMS | Supported |
---|
CSS/PostgreSQL | |
Dependencies with other extensions
Some CAST extensions require the presence of other CAST extensions in order to function correctly. The .NET Kafka extension requires that the following other CAST extensions are also installed (this will be managed automatically by CAST Console):
- CAST AIP Internal extension (internal technical extension)
- Web Services Linker
Download and installation instructions
The extension will not be automatically downloaded and installed in CAST Console. If you need to use it, you should manually install the extension using the Application - Extensions interface:
What results can you expect?
Once the analysis/snapshot generation has been completed, you can view the results in the normal manner. The following objects and links will be displayed in CAST Imaging:
Objects
The following objects are generated:
Icon | Description |
---|
| DotNet Kafka Topic Publisher |
| DotNet Kafka Topic Receiver |
| DotNet Kafka Unknown Topic Publisher |
| DotNet Kafka Unknown Topic Receiver |
Links
Link Type | Source and Destination of link | Supported APIs |
---|
callLink | callLink between the caller C# method and the DotNet Kafka Topic Publisher object | Producer API's support by .Net Kafka - Confluent.Kafka.IProducer.Produce
- Confluent.Kafka.IProducer.ProduceAsync
- DotNetCore.CAP.ICapPublisher.Publish
- DotNetCore.CAP.ICapPublisher.PublishAsync
- DotNetCore.CAP.ICapPublisher.PublishDelay
- DotNetCore.CAP.ICapPublisher.PublishDelayAsync
- MassTransit.KafkaProducerRegistrationExtensions.AddProducer
- MassTransit.KafkaConfiguratorExtensions.TopicProducer
- Streamiz.Kafka.Net.Stream.IKStream.To
|
callLink | callLink between the DotNet Kafka Topic Receiver object and the caller C# method | Consumer API's support by .Net Kafka - Confluent.Kafka.IConsumer.Consume
- MassTransit.KafkaConfiguratorExtensions.TopicEndpoint
|
Example code scenarios
Publisher APIs
Producer
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
class Program
{
static void ProducerServer(string[] args)
{
Producer2();
}
public static async Task Producer2()
{
var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
p.Produce("Topic-2", new Message<Null, string> { Value = "test" });
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
AsyncProducer
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
class Program
{
static void ProducerServer1(string[] args)
{
Producer1("Topic-1");
}
public static async Task Producer1(string topic)
{
var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };
// If serializers are not specified, default serializers from
// `Confluent.Kafka.Serializers` will be automatically used where
// available. Note: by default strings are encoded as UTF8.
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = "test" });
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
Unknown Producer
using System;
using System.Threading;
using Confluent.Kafka;
class consumer
{
public void KafkaProducer(string topic)
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
}
Reading from JSON files
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
public async Task PublishMessage(EvenementAthena evenement)
{
TopicPrefix = _configuration["KafkaData:TopicPrefix"];
TopicSuffix = _configuration["KafkaData:TopicSuffix"];
string topicName = $"{TopicPrefix}.Dataset.{TopicSuffix}";
string key = $"{evenement.Id}-{evenement.NumeroVersion}";
using (var producer = new ProducerBuilder<string, string>(KafkaProviderConfiguration.BrokerClientConfig).Build())
{
try
{
var dr = await producer.ProduceAsync(topicName, new Message<string, string> {Key= key, Value = evenement.Data.ToString() });
Log.Info($"produced to: {dr.TopicPartitionOffset}");
}
catch (ProduceException<string, GenericRecord> ex)
{
Log.Error($"error producing message: {ex}");
throw new KafkaRetriableException(ex.Error);
}
}
}
{
"KafkaData": {
"TopicPrefix": "TechProduct",
"TopicSuffix": "Devices",
},
}
Receiver APIs
Consumer
using System;
using System.Threading;
using Confluent.Kafka;
class consumer1
{
static void consumerServer1(string[] args)
{
KafkaConsumer1();
}
public static void KafkaConsumer1()
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("Topic-1");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
}
Unknown Consumer
using System;
using System.Threading;
using Confluent.Kafka;
class consumer
{
public void KafkaProducer(string topic)
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(topic);
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
}
.NET Publisher and Receiver
.NET Producer Consumer
using System;
using System.Threading.Tasks;
using Confluent.Kafka;
class Program
{
static void Mainmethod(string[] args)
{
asyncProduceDOtNet("Topic2");
}
public static async Task asyncProduceDOtNet(string topic)
{
var config = new ProducerConfig { BootstrapServers = "10.0.3.15:9092" };
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
var dr = await p.ProduceAsync(topic, new Message<Null, string> { Value = "test" });
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}");
}
}
}
}
using System;
using System.Threading;
using Confluent.Kafka;
class consumer
{
public void KafkaProducer()
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("Topic2");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
}
Cross Technology Java/.NET
Cross Technology
package com.springcore.com.kafka.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.log4j.BasicConfigurator;
public class JavaProducer {
public void produce() {
BasicConfigurator.configure();
Properties properties=new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.10:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String,String> first_producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record=new ProducerRecord<String, String>("Topic-CrossTech-java-dotNet", "Hye Kafka");
first_producer.send(record);
first_producer.flush();
first_producer.close();
}
}
using System;
using System.Threading;
using Confluent.Kafka;
class consumer
{
public void KafkaProducer()
{
var conf = new ConsumerConfig { GroupId = "test-consumer-group", BootstrapServers = "192.168.1.9:9092", AutoOffsetReset = AutoOffsetReset.Earliest };
using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe("Topic-CrossTech-java-dotNet");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
}
}
Stream APIs
Kafka Streaming
using System;
using System.Threading;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Logging;
using Streamiz.Kafka.Net;
using Streamiz.Kafka.Net.Metrics;
using Streamiz.Kafka.Net.Metrics.Prometheus;
using Streamiz.Kafka.Net.SerDes;
using Streamiz.Kafka.Net.Stream;
namespace sample_stream_demo
{
class Program
{
static async Task Main(string[] args)
{
string inputTopic = "input-stream";
string outputTopic = "output-stream";
int numberPartitions = 4;
await CreateTopics(inputTopic, outputTopic, numberPartitions);
var config = new StreamConfig<StringSerDes, StringSerDes>();
config.ApplicationId = "sample-streamiz-demo";
config.BootstrapServers = "localhost:9092";
config.AutoOffsetReset = AutoOffsetReset.Earliest;
config.StateDir = Path.Combine(".");
config.CommitIntervalMs = 5000;
config.Guarantee = ProcessingGuarantee.AT_LEAST_ONCE;
config.MetricsRecording = MetricsRecordingLevel.DEBUG;
config.UsePrometheusReporter(9090, true);
StreamBuilder builder = new StreamBuilder();
builder.Stream<string, string>(inputTopic)
.FlatMapValues((v) => v.Split(" ").AsEnumerable())
.GroupBy((k, v) => v)
.Count()
.ToStream()
.MapValues((v) => v.ToString())
.To(outputTopic);
Topology t = builder.Build();
KafkaStream stream = new KafkaStream(t, config);
Console.CancelKeyPress += (o, e) => stream.Dispose();
await stream.StartAsync();
}
private static async Task CreateTopics(string input, string output, int numberPartitions)
{
AdminClientConfig config = new AdminClientConfig();
config.BootstrapServers = "localhost:9092";
AdminClientBuilder builder = new AdminClientBuilder(config);
var client = builder.Build();
try
{
await client.CreateTopicsAsync(new List<TopicSpecification>
{
new TopicSpecification() {Name = input, NumPartitions = numberPartitions},
new TopicSpecification() {Name = output, NumPartitions = numberPartitions},
});
}
catch (Exception e)
{
// do nothing in case of topic already exist
}
finally
{
client.Dispose();
}
}
}
}
Limitations
- Objects will not be created if the evaluation fails to resolve the necessary parameter.