1. kafka

Kafka 开启SASL/SCRAM认证 及 ACL授权(一)认证

kafka安全涉及3部份:传输加密,用户认证与授权,ZK开启ACL(Zookeeper存储了kafka的元数据以及用户信息,默认不开启acl所有用户可改,内网环境机器不对外开放可考虑使用默认不开启ZK ACL)。

词汇说明:

认证,即用户登陆。

授权,即管理用户可见的资源。

ACL,Access Control List 访问控制列表

SASL认证与Kerberos认证:SASL资料很多, java的见这里,Kerberos的资料,点这里

Kafka权限控制指引

支持的认证方式

Kafka支持的认证类别有kerberos(和hadoop一样,大多数公司应该没用kerberos)、ldap(在传统企业中比较普遍)与rbac(这两个是要企业license,基于Confluence的平台组件MDS,自建集群如果没用kafka connect之类的组件,没用conflunece的cli,也用不了)、sasl/plain(用户信息用文件进行管理,修改需重启Kafka,生产大概率不会接受并使用)和sasl/scram(用户信息用api或命令行进行管理,存储在zk上,不需重启)。考虑用哪一种取决于公司自身的情况,这里主要是用sasl/scram。

接入现有认证系统

如果对zk不放心或想对接已有的认证系统,接入kafka权限管控,可参考这个kip-86自定义认证方式,文中有场景列表及sample code。

开启认证主要的关注点

  1. 使用者的需求,生产者和消费者都有哪些,场景如何,需求怎样,支持以后需要先验证,对使用方提供技术上的指引文档等。
  2. 开启方式,是重启集群还是迁移集群。开启认证不是平滑的,需要短暂中断业务流程。
  3. 使用的认证方式:基于现有的环境及实际需要。
  4. 运维成本:考虑运维、监控的方式及成本(监控需要admin账号)
  5. kafka版本是否需要升级:公司现有的版本是0.10.1,1.1,2.4等,需要升级吗?看使用哪种认证方式,rbac需要2.4+,sasl/scram只要0.9+,本文描述的基于2.4.1版本。

本文主要描述用户认证:

参考:

一、zookeeper sasl开启(可选,内网环境,用户无机器访问权限时)

  • vim zookeeper/conf/zoo.cfg
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000
  • vim zookeeper/conf/zookeeper_jaas.conf Server是ZK SERVER之间, Client是zkclient与zk server之间
Client {
  org.apache.zookeeper.server.auth.DigestLoginModule required
    user_admin="zkPassword";
};
Server {
  org.apache.zookeeper.server.auth.DigestLoginModule required
    user_admin="zkPassword";
};
  • vim zookeeper/bin/zkEnv.sh zk启动引入jaas配置
export SERVER_JVMFLAGS="-Djava.security.auth.login.config=${ZOOBINDIR}/../conf/zookeeper_jaas.conf $SERVER_JVMFLAGS"
  • vim zookeeper/conf/adminclient_jaas.conf
Client {
  org.apache.zookeeper.server.auth.DigestLoginModule required
    username="admin"
    password="zkPassword";
};
  • vim zookeeper/bin/zkCli.sh zkCli命令默认添加jaas
"$JAVA" "-Djava.security.auth.login.config=${ZOOBIN}/../conf/adminclient_jaas.conf" ${后面接上原来的参数}
  • 重启zk bin/zkServer.sh restart
  • 验证 bin/zkCli.sh 是否成功,调整密码验证

二、配置kafka sasl

  • vim kafka/config/server.properties
listeners=SASL_PLAINTEXT://:9093
zookeeper.set.acl=true
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=SCRAM-SHA-256
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-256
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
allow.everyone.if.no.acl.found=false
# 参数 allow.everyone.if.no.acl.found
# 设置为 true,ACL 机制改为黑名单机制,只有黑名单中的用户无法访问
super.users=User:admin
#不能用,zk开启sasl后,jaas需包含KafkaServer内容,故这里用不了,除非zk不开启sasl
#listener.name.sasl_ssl.scram-sha-256.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required
#   username="admin"
#   password="kXQ[5/BIKJwAhYU";
  • vim kafka/config/kafka_server_jaas.conf KafkaServer是broker之间验证, Client是与ZK验证
KafkaServer {
   org.apache.kafka.common.security.scram.ScramLoginModule required
   username="admin"
   password="kafkaPassword";
};
Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="zkPassword";
};
  • vim kafka/bin/kafka-server-start.sh server启动添加jaas
export KAFKA_OPTS="-Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf"

三、新增scram 用户

密码与KafkaServer保持一致,否则会报sasl验证失败

bin/kafka-configs --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-256=[password=kafkaPassword],SCRAM-SHA-512=[password=kafkaPassword]' --entity-type users --entity-name admin

四、start server

bin/start-kafka-server.sh

[2022-02-16 18:45:28,713] INFO [ZooKeeperClient ACL authorizer] Initializing a new session to 192.168.91.226:2181. (kafka.zookeeper.ZooKeeperClient)
[2022-02-16 18:45:28,713] INFO Initiating client connection, connectString=192.168.91.226:2181 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@290b1b2e (org.apache.zookeeper.ZooKeeper)
[2022-02-16 18:45:28,713] INFO jute.maxbuffer value is 4194304 Bytes (org.apache.zookeeper.ClientCnxnSocket)
[2022-02-16 18:45:28,714] INFO zookeeper.request.timeout value is 0. feature enabled= (org.apache.zookeeper.ClientCnxn)
[2022-02-16 18:45:28,714] INFO [ZooKeeperClient ACL authorizer] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2022-02-16 18:45:28,715] INFO Client successfully logged in. (org.apache.zookeeper.Login)
[2022-02-16 18:45:28,715] INFO Client will use DIGEST-MD5 as SASL mechanism. (org.apache.zookeeper.client.ZooKeeperSaslClient)
[2022-02-16 18:45:28,715] INFO Opening socket connection to server ATHADOOPDQ06/192.168.91.226:2181. Will attempt to SASL-authenticate using Login Context section 'Client' (org.apache.zookeeper.ClientCnxn)
[2022-02-16 18:45:28,716] INFO Socket connection established, initiating session, client: /192.168.91.226:31564, server: ATHADOOPDQ06/192.168.91.226:2181 (org.apache.zookeeper.ClientCnxn)
[2022-02-16 18:45:28,719] INFO Session establishment complete on server ATHADOOPDQ06/192.168.91.226:2181, sessionid = 0x102165975e5000d, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2022-02-16 18:45:28,720] INFO [ZooKeeperClient ACL authorizer] Connected. (kafka.zookeeper.ZooKeeperClient)
[2022-02-16 18:45:28,806] INFO [/kafka-acl-changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2022-02-16 18:45:28,806] INFO [/kafka-acl-extended-changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2022-02-16 18:45:28,851] INFO [ExpirationReaper-226-AlterAcls]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2022-02-16 18:45:28,868] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2022-02-16 18:45:28,870] INFO Processing notification(s) to /config/changes (kafka.common.ZkNodeChangeNotificationListener)
[2022-02-16 18:45:28,877] INFO Processing override for entityPath: users/admin with config: Map(SCRAM-SHA-256 -> [hidden], SCRAM-SHA-512 -> [hidden]) (kafka.server.DynamicConfigManager)
[2022-02-16 18:45:28,882] INFO Removing PRODUCE quota for user admin (kafka.server.ClientQuotaManager)
[2022-02-16 18:45:28,886] INFO Removing FETCH quota for user admin (kafka.server.ClientQuotaManager)
[2022-02-16 18:45:28,886] INFO Removing PRODUCE quota for user admin (kafka.server.ClientQuotaManager)
[2022-02-16 18:45:28,886] INFO Removing REQUEST quota for user admin (kafka.server.ClientRequestQuotaManager)
[2022-02-16 18:45:28,886] INFO Removing FETCH quota for user admin (kafka.server.ClientQuotaManager)
[2022-02-16 18:45:28,886] INFO Removing REQUEST quota for user admin (kafka.server.ClientRequestQuotaManager)
[2022-02-16 18:45:28,898] INFO [SocketServer brokerId=226] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2022-02-16 18:45:28,899] INFO Kafka version: 2.4.1 (org.apache.kafka.common.utils.AppInfoParser)
[2022-02-16 18:45:28,899] INFO Kafka commitId: c57222ae8cd7866b (org.apache.kafka.common.utils.AppInfoParser)
[2022-02-16 18:45:28,899] INFO Kafka startTimeMs: 1645008328899 (org.apache.kafka.common.utils.AppInfoParser)
[2022-02-16 18:45:28,901] INFO [KafkaServer id=226] started (kafka.server.KafkaServer)

具体ACL配置、ZK ACL、以及验证,见下一篇文章。

Comments to: Kafka 开启SASL/SCRAM认证 及 ACL授权(一)认证

Your email address will not be published. Required fields are marked *