一、ACL检查流程解析 一起看一下kafka server的启动与监听流程: Kafka -> KafkaServer -> SocketServer、KafkaRequestHandler 其中KafkaServer做相关的初始化,包括SocketServer 与 handler pool。 SocketServer的start up流程 看一下acceptor acceptor实际执行流程:监听网络事件,提交给processor处理 看一下processor,截取KafkaServer代码:这边的numIoThreads就是配置的具体处理线程数量。KafkaApis封装具体的各api的处理逻辑,handler pool则是工作线程池。 看看kafkaApis 接着看pool中每个线程的while循环,从request队列拿一个请求,调用apis处理,结果发送给对应channel。 request的入队,见SocketServer,这里也可以看到acl使用的principal和sasl认证是同一个。 之前我们看到,在selector做channel初始化时,已经做了sasl认证。那acl在哪里处理?KafkaApis。以offset commit为例 看一下,这边的authorize,各api共用的方法。构建resource pattern,即资源描述,以及对应的操作。然后使用authorizer实现类进行处理。 看一下鉴权实现类AclAuthorizer,初始化,主要是zk相关的监听,因为token,user,acl都存在zk。 遍历每个action 每个action的检查,区分超管,否则根据acl进行匹配。使用acl 缓存,结合zk的linstener更新缓存。不管是否开启acl,都会去取acl缓存列表。 二、总结 kafka以事件、线程池的方式,对接口请求做处理。 每个api请求,都会做相关的acl校验,不管是否开启acl,都会去取acl缓存列表。差别在于是否使用acl cache map对action进行匹配,匹配本身是用遍历的方式,action不多的话,对吞吐的影响可以忽略不计。 网上有一篇性能测试:见这里
Huh HUH is typically used as a slang word, with the meaning “I am Confused or Surprised” or “Do you Understand?”. 嗯哼? Duh DUH is an ironic response to a question or statement, implying that the speaker is stupid or that the reply is obvious. 显而易见~ 废话~
一、认证流程 在了解kafka网络模型的基础上,了解它的认证流程: ApiVersionsRequest->SaslHandshakeRequest->a series of SASL client and server tokens corresponding to the mechanism are sent->认证成功,继续处理后续的请求,否则关闭连接。 状态转换见SaslServerAuthenticator https://kafka.apache.org/protocol#sasl_handshake 具体逻辑chanel验证流程以sasl/plain为例: 分析这个包的内容: server factory创建sasl plain server单例。server负责按协议获取username 与 password,再最终调用plain server callback handler进行验证(handler的指定是在SaslChannelBuilder中根据具体配的协议类型进行指定)。 下面看一下关键类SaslChannelBuilder,创建sasl channel的实现: 初始化时,配置handler: 再看具体的channel,结合selector poll的过程KafkaChannel: java doc有说明具体的网络模型: 我们来看prepare方法: prepare触发的地方,调用频率在selector,事件通知触发: 也就是每次网络读事件且channle尚未验证成功都会触发认证,和官方文档描述一致。一旦验证通过就不再认证。所以开启认证对吞吐影响可以忽略不计。 校验不过,会直接抛异常捕获后,断开连接了。 二、总结 以上我们了解到kafka sasl的认证流程,现在,可以自己添加认证方式,例如加上自己公司平台的权限认证,注意尽量使用轻量级的方式即cache。 scram方式,除了以上流程,还会多一步,见ScramSaslServer。第一步获取credential,无效则报错,第二步计算校验。详见scram rfc定义:这里 kafka server这边的final message 截图: 自定义认证可以参考这篇:博文 三、token验证 kafka实现了基于token验证的sasl。详见这里:文档 流程是:admin client管理client(创建和刷新),存放在zk上,验证时会验token是否存在,以及用户信息,密码是否一致。token的生命周期内有效,过期无效。 弊端:在公司内一般不会频繁用到这种方式, […]
大家对开源项目有兴趣、想成为committer,或者工作需要,会从github上获取最新的开源项目源码。本文做一个示例,怎样搭建本地的源码阅读、开发、构建环境。 首先,在github上找到项目的链接,clone到本地自己的目录中,这步略过。 下面一起看一个示例:使用idea导入kafka 2.4.1版本的源码。 一、idea打开项目后,切换到指定的分支 获取最新分支列表(如果卡很久,shell加一下自己的网络代理,翻墙一下): git fetch –all –tags 获取2.4前缀分支列表: -n 加描述 -l <pattern> 加正则 从git tag创建分支,切换为指定的版本:git checkout tags/<tag> -b <branch> 二、下载依赖 gradle build项目 报错1:本地默认gradle版本过高,没有MavenDeployment这个插件。 为了解决gradle版本问题,项目使用gradle wrapper版本配置。调整idea使用项目的gradle,preference->Build,->Gradle->Use Gradle from,选择gradle/wrapper/gradle-wrapper.properties,重新reload。或者直接执行项目根目录的gradlew,会自动下载指定版本编译。 三、本地开发 熟悉代码内容,方式推荐两种: 1、结合文档看 2、结合jira看,熟悉kafka的代码模块、开发、提交pr的流程 kafka2.8之前需要zk,所以先把本地zk起起来,入口类:core模块的kafka.Kafka 运行前读一下kafka-server-start.sh,知道它的log配置是通过命令参数的方式: 所以,运行时带上参数,如下图: 此外,依然报错: 排查gradle脚本,compile阶段没有依赖slf4j实现库,而是在发布时从test compile拷贝这个依赖,所以可以加一下依赖到core项目的依赖配置中: Tips: 本地一般会有各版本的kafka,这边是开发调试用的,改一下zk地址,添加一下chroot zookeeper.connect=localhost:2181/kafka_dev 四、构建 使用命令行(README.md中有说明),或者直接在idea gradle窗口双击执行相关task。 查看输出的release包 五、延伸 kafka code指引:https://kafka.apache.org/coding-guide.html kafka bug jira:jira kafka contribute:https://kafka.apache.org/contributing […]
前面两篇文章讲解了如何配置认证。本文接着说明如何做client验证ACL是否生效,我们之前开启了无acl信息不允许访问的配置。涉及的client有以下几个场景:shell脚本、python脚本、java应用、flink流。 kafka shell script验证 核心逻辑是,连zk的,使用zk的jaas,连kafka broker的,使用kafka 的jaas配置。 新建topic, kafka-run-class.sh已经配置了zk acl 添加admin shell认证配置文件 vim kafka/config/adminclient-configs.conf 查看所有用户 对应zk目录在config/users下 查看topic 列表,若未指定jaas配置文件,则会hang不响应 列出topic test的acl列表 尝试不指定用户 消费test topic 报错 消费与生产配置,添加用户jaas vim config/consumer.properties vim config/producer.properties 验证消费与生产 python验证 admin用户,验证通过。其他用户需添加用户,并添加acl分别验证 如果认证方式不对,报错信息是kafka.errors.NoBrokersAvailable: NoBrokersAvailable。因为client先获取了kafka version,获取失败就报了。如果用户信息不对,结果是进程hang挂起。 java 如果用户信息不对,会hang挂起 flink scala code 输出 : 若用户信息出错,flink不断重试,报错
由上一篇博文,讲解了SASL认证的安装过程。本文主要围绕ACL设置进行说明。 参考: kafka authentorization:官方 一、开启ZK ACL(可选,内网环境,用户无机器访问权限时) 给kafka meta都加上zk的acl,默认是world,即任何人可访问。 zk acl详见:https://www.cnblogs.com/dalianpai/p/12748144.html 执行已有kafka meta的迁移 vim kafka/bin/zookeeper-security-migration.sh 执行迁移 到zk上查看生效 二、设置kafka acl 上篇文章已经在server.properties中添加了acl的配置。这里执行命令设置acl。 kafka的acl对象,即资源类型分为 cluster, topic, consumer group, transation id, token。实际应用中,主要以topic及consumer group为主进行运维管控。即控制”哪个用户的哪个消费组可以访问哪个topic“。 常用acl操作 列出 test 这个 topic 的 ACL 列表 对用户 test 授权,以访问test 这个 topic。 先添加scram用户 test 因为添加了zk的acl,所以需指定ZK的acl jaas账号信息。若不指定ZK的jaas账号信息,会报如下错误: 加acl认证信息的方式:先调整bin/kafka-run-class.sh,用于zk acl验证。这个文件被大多数的kafka bin目录下的script所引用,如添加topic的脚本等,所以改它最方便,添加一行。 再执行一次添加即可,输出如下提示表示添加用户test完成。 添加consumer 授权方式1,先加topic权限: 输出: 再添加消费组权限: 添加consumer 授权方法2,参考文档。一步到位: […]