kafka-server-start.sh -daemon ../config/server.properties
3..net core 操作
producer 端, 引入 Confluent.Kafka
- Install-Package Confluent.Kafka -Version 1.0-beta2
- using Confluent.Kafka;
- using System;
- using System.Collections.Generic;
- using System.Text;
- using System.Threading.Tasks;
- namespace KafkaTest
- {
- class Program
- {
- static void Main(string[] args)
- {
- Test().Wait();
- }
- static async Task Test()
- {
- var conf = new ProducerConfig { BootstrapServers = "39.**.**.**:9092" };
- Action<DeliveryReportResult<Null, string>> handler = r =>
- Console.WriteLine(!r.Error.IsError
- ? $"Delivered message to {r.TopicPartitionOffset}"
- : $"Delivery Error: {r.Error.Reason}");
- using (var p = new Producer<Null, string>(conf))
- {
- for (int i = 0; i <100000; ++i)
- {
- p.BeginProduce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler);
- }
- // wait for up to 10 seconds for any inflight messages to be delivered.
- p.Flush(TimeSpan.FromSeconds(10));
- }
- }
- }
- }
consumer 端, 引入 Confluent.Kafka
- Install-Package Confluent.Kafka -Version 1.0-beta2
- using Confluent.Kafka;
- using System;
- using System.Linq;
- using System.Text;
- namespace KafkaClient
- {
- class Program
- {
- static void Main(string[] args)
- {
- var conf = new ConsumerConfig
- {
- GroupId = "test-consumer-group4",
- BootstrapServers = "39.**.**.**:9092",
- // Note: The AutoOffsetReset property determines the start offset in the event
- // there are not yet any committed offsets for the consumer group for the
- // topic/partitions of interest. By default, offsets are committed
- // automatically, so in this example, consumption will only start from the
- // earliest message in the topic 'my-topic' the first time you run the program.
- AutoOffsetReset = AutoOffsetResetType.Earliest
- };
- using (var c = new Consumer<Ignore, string>(conf))
- {
- c.Subscribe("my-topic");
- bool consuming = true;
- // The client will automatically recover from non-fatal errors. You typically
- // don't need to take any action unless an error is marked as fatal.
- c.OnError += (_, e) => consuming = !e.IsFatal;
- while (consuming)
- {
- try
- {
- var cr = c.Consume();
- Console.WriteLine($"Consumed message'{cr.Value}'at:'{cr.TopicPartitionOffset}'.");
- }
- catch (ConsumeException e)
- {
- Console.WriteLine($"Error occured: {e.Error.Reason}");
- }
- }
- // Ensure the consumer leaves the group cleanly and final offsets are committed.
- c.Close();
- }
- }
- }
- }
来源: https://www.cnblogs.com/chenyishi/p/10250768.html