1. kafka

Kafka 开启SASL/SCRAM认证 及 ACL授权(三)验证

前面两篇文章讲解了如何配置认证。本文接着说明如何做client验证ACL是否生效,我们之前开启了无acl信息不允许访问的配置。涉及的client有以下几个场景:shell脚本、python脚本、java应用、flink流。

kafka shell script验证

核心逻辑是,连zk的,使用zk的jaas,连kafka broker的,使用kafka 的jaas配置。

  • 新建topic, kafka-run-class.sh已经配置了zk acl
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-1
  • 添加admin shell认证配置文件 vim kafka/config/adminclient-configs.conf
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule \
    required username="admin" password="password";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
  • 查看所有用户 对应zk目录在config/users下
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users
  • 查看topic 列表,若未指定jaas配置文件,则会hang不响应
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9093 --list --command-config config/adminclient-configs.conf
  • 列出topic test的acl列表
bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9093 --list --topic "test" --command-config config/adminclient-configs.conf
  • 尝试不指定用户 消费test topic
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --from-beginning

报错

[2022-02-17 09:39:58,625] WARN [Consumer clientId=consumer-console-consumer-14835-1, groupId=console-consumer-14835] Bootstrap broker localhost:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
  • 消费与生产配置,添加用户jaas vim config/consumer.properties vim config/producer.properties
# sasl
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="admin" password="password";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
  • 验证消费与生产
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --from-beginning --consumer.config config/consumer.properties
./bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config config/producer.properties

python验证

admin用户,验证通过。其他用户需添加用户,并添加acl分别验证

#!/user/bin/env python3
# -*- coding: utf-8 -*-

from kafka import KafkaConsumer
from datetime import datetime

if __name__ == '__main__':
    topic = 'test'
    bootstrap_server = ['192.168.91.226:9093']

    consumer = KafkaConsumer(topic, group_id="py_group_202202", bootstrap_servers=bootstrap_server,
                             auto_offset_reset='earliest',
                             security_protocol='SASL_PLAINTEXT', sasl_mechanism='SCRAM-SHA-256',
                             sasl_plain_username='test', sasl_plain_password='')
    file_dict = {}
    for msg in consumer:
        timestamp = msg.timestamp
        msg_topic = msg.topic
        date_str = datetime.fromtimestamp(timestamp / 1000).strftime("%Y-%m-%d")
        decode = msg.value.decode('utf-8') + "\n"
        print(decode)

如果认证方式不对,报错信息是kafka.errors.NoBrokersAvailable: NoBrokersAvailable。因为client先获取了kafka version,获取失败就报了。
如果用户信息不对,结果是进程hang挂起。

java

如果用户信息不对,会hang挂起

package com.anta.data.kafka;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.junit.Test;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;


/**
 * kafka sasl认证例子
 */
public class KafkaConsumerSaslTest {

    @Test
    public void saslTest() {
        Properties props = new Properties();
        String username = "admin";
        String password = "";
        String topic = "test";
        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "192.168.91.226:9093");
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
        props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
        props.put(CommonClientConfigs.GROUP_ID_CONFIG, "java-consumer-202202");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            //poll records
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(30));
            //do business logic
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
            }
            //commit
            consumer.commitSync();
        }
    }
}

flink

scala code

val stringSerializer = new SimpleVersionedSerializer[String] {
    val VERSION = 77

    override def getVersion = 77

    @throws[IOException]
    override def serialize(checkpointData: String): Array[Byte] = checkpointData.getBytes(StandardCharsets.UTF_8)

    @throws[IOException]
    override def deserialize(version: Int, serialized: Array[Byte]): String = if (version != 77) throw new IOException("version mismatch")
    else new String(serialized, StandardCharsets.UTF_8)
  }

override def runJob(parameterTool: ParameterTool, env: StreamExecutionEnvironment): Unit = {
    val username = "admin"
    val password = ""
    val kafkaProp = new Properties
    kafkaProp.put("auto.offset.reset", "earliest")
    kafkaProp.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaProp.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    kafkaProp.put("max.partition.fetch.bytes", "11000000")
    kafkaProp.put("fetch.message.max.bytes", "52428800")
    kafkaProp.put("receive.buffer.bytes", "65536")
    kafkaProp.put("max.poll.interval.ms", "300000")
    kafkaProp.put("session.timeout.ms", "90000")
    kafkaProp.put("request.timeout.ms", "100000")
    kafkaProp.put(CommonClientConfigs.GROUP_ID_CONFIG, "flink-consumer-202202")
    // sasl 部分
    kafkaProp.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "192.168.91.226:9093")
    kafkaProp.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
    kafkaProp.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256")
    kafkaProp.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + username + "\" password=\"" + password + "\";")

    val topic = "test"
    val subscriptionPattern = Pattern.compile(String.format("^%s.*", topic));
    val kfkConsumer = new FlinkKafkaConsumer[(String, String)](subscriptionPattern, deserializationWithTopicNameSchema, kafkaProp)
    kfkConsumer.setCommitOffsetsOnCheckpoints(true)

    val dataStream: DataStream[(String, String)] = env.addSource(kfkConsumer).name("kfk_source")
      .filter(_._2 != null).name("null filter")
    dataStream.print("output: ")
  }

输出 :

output: :11> (test,hello,world)

若用户信息出错,flink不断重试,报错

2022-02-17 11:16:23,078 WARN  org.apache.flink.runtime.taskmanager.Task                    [] - Source: kfk_source -> null filter -> Sink: Print to Std. Out (6/12)#4 (cef5d8a9796c4b405d44f145e52ae010) switched from INITIALIZING to FAILED with failure cause: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256

Comments to: Kafka 开启SASL/SCRAM认证 及 ACL授权(三)验证

Your email address will not be published. Required fields are marked *