前面两篇文章讲解了如何配置认证。本文接着说明如何做client验证ACL是否生效,我们之前开启了无acl信息不允许访问的配置。涉及的client有以下几个场景:shell脚本、python脚本、java应用、flink流。
kafka shell script验证
核心逻辑是,连zk的,使用zk的jaas,连kafka broker的,使用kafka 的jaas配置。
- 新建topic, kafka-run-class.sh已经配置了zk acl
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-1
- 添加admin shell认证配置文件 vim kafka/config/adminclient-configs.conf
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule \
required username="admin" password="password";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
- 查看所有用户 对应zk目录在config/users下
./bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type users
- 查看topic 列表,若未指定jaas配置文件,则会hang不响应
bin/kafka-topics.sh --bootstrap-server 127.0.0.1:9093 --list --command-config config/adminclient-configs.conf
- 列出topic test的acl列表
bin/kafka-acls.sh --bootstrap-server 127.0.0.1:9093 --list --topic "test" --command-config config/adminclient-configs.conf
- 尝试不指定用户 消费test topic
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --from-beginning
报错
[2022-02-17 09:39:58,625] WARN [Consumer clientId=consumer-console-consumer-14835-1, groupId=console-consumer-14835] Bootstrap broker localhost:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
- 消费与生产配置,添加用户jaas vim config/consumer.properties vim config/producer.properties
# sasl
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="admin" password="password";
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
- 验证消费与生产
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --from-beginning --consumer.config config/consumer.properties
./bin/kafka-console-producer.sh --bootstrap-server localhost:9093 --topic test --producer.config config/producer.properties
python验证
admin用户,验证通过。其他用户需添加用户,并添加acl分别验证
#!/user/bin/env python3
# -*- coding: utf-8 -*-
from kafka import KafkaConsumer
from datetime import datetime
if __name__ == '__main__':
topic = 'test'
bootstrap_server = ['192.168.91.226:9093']
consumer = KafkaConsumer(topic, group_id="py_group_202202", bootstrap_servers=bootstrap_server,
auto_offset_reset='earliest',
security_protocol='SASL_PLAINTEXT', sasl_mechanism='SCRAM-SHA-256',
sasl_plain_username='test', sasl_plain_password='')
file_dict = {}
for msg in consumer:
timestamp = msg.timestamp
msg_topic = msg.topic
date_str = datetime.fromtimestamp(timestamp / 1000).strftime("%Y-%m-%d")
decode = msg.value.decode('utf-8') + "\n"
print(decode)
如果认证方式不对,报错信息是kafka.errors.NoBrokersAvailable: NoBrokersAvailable。因为client先获取了kafka version,获取失败就报了。
如果用户信息不对,结果是进程hang挂起。
java
如果用户信息不对,会hang挂起
package com.anta.data.kafka;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.SaslConfigs;
import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
/**
* kafka sasl认证例子
*/
public class KafkaConsumerSaslTest {
@Test
public void saslTest() {
Properties props = new Properties();
String username = "admin";
String password = "";
String topic = "test";
props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "192.168.91.226:9093");
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
props.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + username + "\" password=\"" + password + "\";");
props.put(CommonClientConfigs.GROUP_ID_CONFIG, "java-consumer-202202");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
while (true) {
//poll records
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(30));
//do business logic
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
}
//commit
consumer.commitSync();
}
}
}
flink
scala code
val stringSerializer = new SimpleVersionedSerializer[String] {
val VERSION = 77
override def getVersion = 77
@throws[IOException]
override def serialize(checkpointData: String): Array[Byte] = checkpointData.getBytes(StandardCharsets.UTF_8)
@throws[IOException]
override def deserialize(version: Int, serialized: Array[Byte]): String = if (version != 77) throw new IOException("version mismatch")
else new String(serialized, StandardCharsets.UTF_8)
}
override def runJob(parameterTool: ParameterTool, env: StreamExecutionEnvironment): Unit = {
val username = "admin"
val password = ""
val kafkaProp = new Properties
kafkaProp.put("auto.offset.reset", "earliest")
kafkaProp.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaProp.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
kafkaProp.put("max.partition.fetch.bytes", "11000000")
kafkaProp.put("fetch.message.max.bytes", "52428800")
kafkaProp.put("receive.buffer.bytes", "65536")
kafkaProp.put("max.poll.interval.ms", "300000")
kafkaProp.put("session.timeout.ms", "90000")
kafkaProp.put("request.timeout.ms", "100000")
kafkaProp.put(CommonClientConfigs.GROUP_ID_CONFIG, "flink-consumer-202202")
// sasl 部分
kafkaProp.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "192.168.91.226:9093")
kafkaProp.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT")
kafkaProp.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256")
kafkaProp.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"" + username + "\" password=\"" + password + "\";")
val topic = "test"
val subscriptionPattern = Pattern.compile(String.format("^%s.*", topic));
val kfkConsumer = new FlinkKafkaConsumer[(String, String)](subscriptionPattern, deserializationWithTopicNameSchema, kafkaProp)
kfkConsumer.setCommitOffsetsOnCheckpoints(true)
val dataStream: DataStream[(String, String)] = env.addSource(kfkConsumer).name("kfk_source")
.filter(_._2 != null).name("null filter")
dataStream.print("output: ")
}
输出 :
output: :11> (test,hello,world)
若用户信息出错,flink不断重试,报错
2022-02-17 11:16:23,078 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: kfk_source -> null filter -> Sink: Print to Std. Out (6/12)#4 (cef5d8a9796c4b405d44f145e52ae010) switched from INITIALIZING to FAILED with failure cause: org.apache.kafka.common.errors.SaslAuthenticationException: Authentication failed during authentication due to invalid credentials with SASL mechanism SCRAM-SHA-256
No Comments
Leave a comment Cancel