一 kafka 介绍
kafka 是基于 zookeeper 的一个分布式流平台, 既然是流, 那么大家都能猜到它的存储结构基本上就是线性的了. 硬盘大家都知道读写非常的慢, 那是因为在随机情况下, 线性下, 硬盘的读写非常快. kafka 官方文档, 一直拿传统的消息队列来和 kafka 对比, 这样大家会触类旁通更快了解 kafka 的特性. 最熟悉的消息队列框架有 ActiveMQ 和 RabbitMQ. 熟悉消息队列的, 最熟悉的特性就是队列和发布订阅功能, 因为这是大家最常用的, kafka 实现了一些特有的机制, 去规避传统的消息队列的一些瓶颈, 比如并发, rabbitMQ 在多个处理程序下, 并不能保证执行顺序, 还是必须自己去处理独占, 而 kafka 使用 consumer group 的方式, 实现了可以多个处理程序处理一个 topic 下的记录. 如图:
每个分区的记录保证能被每个组接受, 这样可以并发去处理一个 topic 的记录, 而且扩展组, 则可以随意根据应用需求去扩展你的应用程序, 但是每个组的消费者不能超过分区的数量.
kafka Distribution 提供了容错的功能, 每一个 partition 都有一个服务器叫 leader, 还有零个或者一个以上的服务器叫 follower, 当这些 follower 都在同步数据的时候, leader 扛起所有的写和读, 当 leader 挂掉, follower 会随机选取一个服务器当 leader, 当然必须有几个 follower 同步时 in-sync 的. 还有 kafka 虽然的那个记录具有原子性, 但是并不支持事务.
因为这一篇并不是专门讲解 kafka, 所以点到为止.
二 扩展服务 开发
以前讲过, netcore 的一个很重要的特性就是支持依赖注入, 在这里一切皆服务. 那么如果需要 kafka 作为日志服务的终端, 就首先需要 kafka 服务, 下面咱们就开发一个 kafka 服务.
首先, 服务就是需要构建, 这是 netcore 开发服务的第一步, 我们首先建立一个 IKafkaBuilder.cs 接口类, 如下:
- homusing Microsoft.Extensions.DependencyInjection;
- namespace Walt.Freamwork.Service
- {
- public interface IKafkaBuilder
- {
- /// <summary>
- /// Gets the <see cref="IServiceCollection"/> where Logging services are configured.
- /// </summary>
- IServiceCollection Services { get; }
- }
- }
再实现它, KafkaBuilder.cs
- using Microsoft.Extensions.DependencyInjection;
- namespace Walt.Freamwork.Service
- {
- public class KafkaBuilder : IKafkaBuilder
- {
- public IServiceCollection Services {get;}
- public KafkaBuilder(IServiceCollection services)
- {
- Services=services;
- }
- }
- }
再利用扩展方法为 serviceCollection 类加上扩展方法:
- using System;
- using Microsoft.Extensions.Configuration;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.DependencyInjection.Extensions;
- using Walt.Framework.Service.Kafka;
- namespace Walt.Framework.Service
- {
- public static class ServiceCollectionExtensions
- {
- /// <summary>
- /// Adds logging services to the specified <see cref="IServiceCollection" />.
- /// </summary>
- /// <param name="services">The <see cref="IServiceCollection" /> to add services to.</param>
- /// <returns>The <see cref="IServiceCollection"/> so that additional calls can be chained.</returns>
- public static IServiceCollection AddKafka(this IServiceCollection services)
- {
- return AddKafka(services, builder => { });
- }
- public static IServiceCollection AddKafka(this IServiceCollection services
- , Action<IKafkaBuilder> configure)
- {
- if (services == null)
- {
- throw new ArgumentNullException(nameof(services));
- }
- services.AddOptions();
- configure(new KafkaBuilder(services));
- services.TryAddSingleton<IKafkaService,KafkaService>(); //kafka 的服务类
- return services;
- }
- }
- }
KafkaService 的实现:
- using System;
- using System.Collections.Generic;
- using System.Threading.Tasks;
- using Confluent.Kafka;
- using Microsoft.Extensions.Options;
- namespace Walt.Framework.Service.Kafka
- {
- public class KafkaService : IKafkaService
- {
- private KafkaOptions _kafkaOptions;
- private Producer _producer;
- public KafkaService(IOptionsMonitor<KafkaOptions> kafkaOptions)
- {
- _kafkaOptions=kafkaOptions.CurrentValue;
- kafkaOptions.OnChange((kafkaOpt,s)=>{
- _kafkaOptions=kafkaOpt;
- System.Diagnostics.Debug
- .WriteLine(Newtonsoft.JSON.JsonConvert.SerializeObject(kafkaOpt)+"---"+s);
- });
- _producer=new Producer(_kafkaOptions.Properties);
- }
- private byte[] ConvertToByte(string str)
- {
- return System.Text.Encoding.Default.GetBytes(str);
- }
- public async Task<Message> Producer(string topic,string key,string value)
- {
- if(string.IsNullOrEmpty(topic)
- ||string.IsNullOrEmpty(value))
- {
- throw new ArgumentNullException("topic 或者 value 不能为 null.");
- }
- var task= await _producer.ProduceAsync(topic,ConvertToByte(key),ConvertToByte(value));
- return task;
- }
- }
- }
那么咱们是不是忘记什么了, 看上面的代码, 是不是那个配置类 KafkaOptions 还没有说明?
在
这个位置添加 kafka 的配置类 KafkaConfigurationOptions:
- using Microsoft.Extensions.Configuration;
- using Microsoft.Extensions.Options;
- using Walt.Freamwork.Service;
- namespace Walt.Freamwork.Configuration
- {
- public class KafkaConfigurationOptions : IConfigureOptions<KafkaOptions>
- {
- private readonly IConfiguration _configuration;
- public KafkaConfigurationOptions(IConfiguration configuration)
- {
- _configuration=configuration;
- }
- public void Configure(KafkaOptions options)
- {
- // 这里仅仅自定义一些你自己的代码, 使用上面 configuration 配置中的配置节, 处理程序没法自动绑定的
一些事情.
- }
- }
- }
然后, 将配置类添加进服务:
- using Microsoft.Extensions.Configuration;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.DependencyInjection.Extensions;
- using Microsoft.Extensions.Options;
- using Walt.Framework.Service;
- namespace Walt.Framework.Configuration
- {
- public static class KafkaConfigurationExtensioncs
- {
- public static IKafkaBuilder AddConfiguration(this IKafkaBuilder builder
- ,IConfiguration configuration)
- {
- InitService( builder,configuration);
- return builder;
- }
- public static void InitService(IKafkaBuilder builder,IConfiguration configuration)
- {
- builder.Services.TryAddSingleton<IConfigureOptions<KafkaOptions>>(
- new KafkaConfigurationOptions(configuration)); // 配置类和配置内容
- builder.Services.TryAddSingleton
- (ServiceDescriptor.Singleton<IOptionsChangeTokenSource<KafkaOptions>>(
- new ConfigurationChangeTokenSource<KafkaOptions>(configuration)) );// 这个是观察类, 如果更改, 会激发 onchange 方法
- builder.Services
- .TryAddEnumerable(ServiceDescriptor.Singleton<IConfigureOptions<KafkaOptions>>
- (new ConfigureFromConfigurationOptions<KafkaOptions>(configuration))); // 这个是 option 类, 没这个, 配置无法将类绑定
- builder.Services.AddSingleton(new KafkaConfiguration(configuration));
- }
- }
- }
ok, 推送 nuget, 业务部分调用.
三 kafka 服务调用
在 project 中引用然后 restore:
引入命名空间:
调用:
- using System;
- using System.Collections.Generic;
- using System.IO;
- using System.Linq;
- using System.Threading.Tasks;
- using Microsoft.AspNetCore;
- using Microsoft.AspNetCore.Hosting;
- using Microsoft.Extensions.Configuration;
- using Microsoft.Extensions.DependencyInjection;
- using Microsoft.Extensions.Logging;
- using Newtonsoft.JSON;
- using Walt.Framework.Log;
- using Walt.Framework.Configuration;
- using Walt.Framework.Service;
- namespace Walt.TestMcroServoces.webapi
- {
- public class Program
- {
- public static void Main(string[] args)
- {
- var host = new WebHostBuilder()
- .ConfigureAppConfiguration((hostingContext, configContext) =>{
- var en=hostingContext.HostingEnvironment;
- if(en.IsDevelopment())
- {
- configContext.AddJsonFile($"appsettings.{en.EnvironmentName}.json");
- }
- else
- {
- configContext.AddJsonFile("appsettings.json");
- }
- configContext.AddCommandLine(args)
- .AddEnvironmentVariables()
- .SetBasePath(Directory.GetCurrentDirectory()).Build();
- }).ConfigureServices((context,configureServices)=>{
- configureServices.AddKafka(KafkaBuilder=>{
- KafkaBuilder.AddConfiguration(context.Configuration.GetSection("KafkaService"));
- });
- }) //kafka 的调用.
- .ConfigureLogging((hostingContext, logging) => {
- logging.AddConfiguration(hostingContext.Configuration.GetSection("Logging"))
- .AddCustomizationLogger();
- }).UseKestrel(KestrelServerOption=>{
- KestrelServerOption.ListenAnyIP(801);
- })
- .UseStartup<Startup>().Build();
- host.Run();
- Console.ReadKey();
- }
- }
- }
然后提交 Git, 让 jenkins 构建 docker 发布运行:
jenkin 是是非常牛的一款构建工具, 不仅仅根据插件可以扩展不同环境, 还支持分布式构建.
这是我们用 jenikins 构建的的:
然
让它跑起来:
调用看看:
这个方法是输出 Properties 数组的, 这个配置结构只是演示, 后面的结构要变, 因为要放 kafka 的配置, 比如连接服务 ip 等,
改动也很简单, 在配置好 configuration 和 service 后, 改动这个类 KafkaOptions 和配置文件中 kafka 节点中的 JSON 结构就行.:
四 集成 kafka
kafka 的接口不多, 看看都有那些:
https://docs.confluent.io/current/clients/confluent-kafka-dotnet/api/Confluent.Kafka.Producer.html
Consumer 和 Producer 是咱们发布消息和消费消息的两个主类, 代码在上文已经实现的 service.
客户端代码:
使用 my-replicated-topic-morepart 这儿 topic, 还是希望多分区, 因为后面 consumer 使用分布式计算读取.
consumer 先在客户端监听:
product 端的调用代码:
执行这个接口后, 再看 consumer 接收到的消息:
最后一步, 将咱们 kafka 日志部分替换为真实的 kafka 环境, 看结果:
那么最后的配置是这样的:
- {
- "Logging": {
- "LogLevel": {
- "Default": "Debug",
- "System": "Debug",
- "Microsoft": "Debug"
- },
- "KafkaLog":{
- "Prix":"这是我的自定义日志提供程序"
- }
- },
- "KafkaService":{
- "Properties":{
- "bootstrap.servers":"192.168.249.106:9092"
- }
- }
- }
log 使用这个 kafka 服务就很简单了, 在前面文章中实现的 log 扩展类中, 直接构造函数注入这个 kafkaService, 就可以以使用了.
分布式日志到这里结束, 可能大家觉得后面还有日志索引和日志展现, 因为这个读 kafka 需要分布式去处理,
我下面刚好要写分布式计算的文章, 所以到时可以拿这个当例子, 承前继后.
来源: https://www.cnblogs.com/ck0074451665/p/10211725.html