首先说明一点, 像 Confluent.Kafka 这种开源的组件, 三天两头的更新. 在搜索引擎搜索到的结果往往用不了, 浪费时间. 建议以后遇到类似的情况直接看官网给的 Demo.
因为搜索引擎搜到的文章, 作者基本上都没有说明用的是哪个版本的 dll. 所以你 nuget 安装了后, 不一定能使用.
截止目前, 我用的 Confluent.Kafka 是最新版本: 1.2.1.
GitHub 上源码地址: https://github.com/confluentinc/confluent-kafka-dotnet, 上面附有生产和消费的示例. 直接去看吧. 往下就不要看了, 是我自己用到的, 只是方便我自己查看.
生产:
- static async void Produce()
- {
- var config = new ProducerConfig { BootstrapServers = "192.168.3.250:9092" };
- using (var p = new ProducerBuilder<Null, string>(config).Build())
- {
- try
- {
- var dr = await p.ProduceAsync("mytopic", new Message<Null, string> { Value = "test" });
- Console.WriteLine($"Delivered'{dr.Value}'to'{dr.TopicPartitionOffset}'");
- }
- catch (ProduceException<Null, string> e)
- {
- Console.WriteLine($"Delivery failed: {e.Error.Reason}");
- }
- }
- }
消费:
- static async void Consume()
- {
- var conf = new ConsumerConfig
- {
- GroupId = "test-consumer-group",
- BootstrapServers = "192.168.3.250:9092",
- AutoOffsetReset = AutoOffsetReset.Earliest
- };
- using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
- {
- c.Subscribe("mytopic");
- CancellationTokenSource cts = new CancellationTokenSource();
- Console.CancelKeyPress += (_, e) => {
- e.Cancel = true; // prevent the process from terminating.
- cts.Cancel();
- };
- try
- {
- while (true)
- {
- try
- {
- var cr = c.Consume(cts.Token);
- Console.WriteLine($"Consumed message'{cr.Value}'at:'{cr.TopicPartitionOffset}'.");
- }
- catch (ConsumeException e)
- {
- Console.WriteLine($"Error occured: {e.Error.Reason}");
- }
- }
- }
- catch (OperationCanceledException)
- {
- c.Close();
- }
- }
- }
来源: http://www.bubuko.com/infodetail-3279767.html