windows版kafka添加用户名密码认证


./config 目录下创建配置文件

kafka_server_jaas.conf

KafkaServer {
	org.apache.kafka.common.security.plain.PlainLoginModule required
	username="admin"
	password="123456"
	user_admin="123456"
	user_producer="producer"
	user_consumer="consumer";
};
Client {
 org.apache.kafka.common.security.plain.PlainLoginModule required  
 username="admin"
 password="123456";
};

 kafka_zookeeper_jaas.conf

Server {
	org.apache.kafka.common.security.plain.PlainLoginModule required
	username="admin"
	password="123456"
	user_admin="123456";
};

./config 目录下修改配置文件

zookeeper.properties 添加

# 安全身份认证
authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
requireClientAuthScheme=sasl
jaasLoginRenew=3600000

 server.properties 添加

#身份安装配置
#使用的认证协议
security.inter.broker.protocol=SASL_PLAINTEXT
#SASL机制 
sasl.enabled.mechanisms=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
# 完成身份验证的类
#authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
# 如果没有找到ACL(访问控制列表)配置,则允许任何操作
allow.everyone.if.no.acl.found=true
#超级管理员权限用户
super.users=User:admin

 server.properties 修改

listeners=SASL_PLAINTEXT://127.0.0.1:9092

./bin/windows

zookeeper-server-start.bat 添加

set KAFKA_OPTS=-Djava.security.auth.login.config=file:%~dp0../../config/kafka_zookeeper_jaas.conf

 kafka-server-start.bat 添加

set KAFKA_OPTS=-Djava.security.auth.login.config=file:%~dp0../../config/kafka_server_jaas.conf

Java:

private Map producerProps(String brokers, Byte isAuthorization, String userName, String password) {
        Map props = new HashMap<>();
        //初始化指定kafka集群的地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        //初始化批量提交大小
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //初始化延时提交
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //初始化设置应答机制
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //初始化缓冲大小
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        //初始化序列化主键
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //初始化序列化值
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        if(isAuthorization == ServerConstants.IS_AUTHORIZATION_YES){
            //设置SASL连接
            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
            props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\""+userName+"\" password=\""+ EncryptUtils.decrypt(password)+"\";");
        }

        return props;
    }
MQ