Enabling SSL and ACL for Kafka

Enabling SSL and ACL for Kafka

Table of Contents

Useful links

Local configuration (Docker)

This configuration contains 3 Apache Kafka brokers in a cluster with a single Apache Zookeeper instance. Each broker mounts to the /tmp directory 2 folders: jks contains keystores and truststore for servers, client - contains client property file and client keystore/truststore.

version: "3.9" services:   kafka-0:     image: wurstmeister/kafka:2.13-2.7.0     container_name: kafka-0     hostname: kafka-0     depends_on:       - zookeeper     volumes:      - ./jks:/tmp/jks       - ./client:/tmp/client     ports:       - 9092:9092       - 9091:9091       - 10000:10000     env_file:       - kafka-env.env     environment:       KAFKA_BROKER_ID: 1       KAFKA_ADVERTISED_LISTENERS: INTERNAL://:9091,EXTERNAL://kafka-0:9092      KAFKA_LISTENERS: INTERNAL://:9091,EXTERNAL://:9092       KAFKA_SSL_KEYSTORE_LOCATION: /tmp/jks/kafka-0.keystore.jks   kafka-1:     image: wurstmeister/kafka:2.13-2.7.0     container_name: kafka-1     hostname: kafka-1     depends_on:       - zookeeper     volumes:      - ./jks:/tmp/jks       - ./client:/tmp/client     ports:       - 19092:19092     env_file:       - kafka-env.env     environment:       KAFKA_BROKER_ID: 2       KAFKA_ADVERTISED_LISTENERS: INTERNAL://:19091,EXTERNAL://kafka-1:19092       KAFKA_LISTENERS: INTERNAL://:19091,EXTERNAL://:19092       KAFKA_ADVERTISED_HOST_NAME: kafka-1      KAFKA_SSL_KEYSTORE_LOCATION: /tmp/jks/kafka-1.keystore.jks   kafka-2:     image: wurstmeister/kafka:2.13-2.7.0     container_name: kafka-2     hostname: kafka-2     depends_on:       - zookeeper     volumes:      - ./jks:/tmp/jks       - ./client:/tmp/client     ports:       - 29092:29092     env_file:       - kafka-env.env     environment:       KAFKA_BROKER_ID: 3       KAFKA_ADVERTISED_LISTENERS: INTERNAL://:29091,EXTERNAL://kafka-2:29092      KAFKA_LISTENERS: INTERNAL://:29091,EXTERNAL://:29092      KAFKA_SSL_KEYSTORE_LOCATION: /tmp/jks/kafka-2.keystore.jks   zookeeper:     image: zookeeper:3.6.2     container_name: zookeeper     ports:       - 2181:2181       - 8080:8080     environment:       ZOOKEEPER_SERVER_ID: 1       ZOO_PORT_NUMBER: 2181

Kafka environment variables are depend on the Kafka docker image, for example, bitnami/kafka uses a prefix KAFKA_CFG_ for environment parameters. For wurstmeister kafka these values have been provided (Eventually, they will be added to the server.properties file in Kafka instance):

KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=2 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SSL,EXTERNAL:SSL KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL KAFKA_SSL_TRUSTSTORE_TYPE=jks KAFKA_SSL_TRUSTSTORE_LOCATION=/tmp/jks/kafka.truststore.jks KAFKA_SSL_TRUSTSTORE_PASSWORD=secret KAFKA_SSL_KEYSTORE_PASSWORD=secret KAFKA_SSL_KEY_PASSWORD=secret KAFKA_SSL_CLIENT_AUTH=required KAFKA_SSL_KEYSTORE_TYPE=jks KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=HTTPS KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND=false KAFKA_SUPER_USERS=User:kafka KAFKA_MIN_INSYNC_REPLICAS=2 KAFKA_MESSAGE_MAX_BYTES: 1000000 KAFKA_AUTO_CREATE_TOPICS_ENABLE=false KAFKA_SSL_PRINCIPAL_MAPPING_RULES='RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,DEFAULT'

SSL Configuration

Generating SSL certificates

Script to generate certificates (slightly modified from https://github.com/confluentinc/confluent-platform-security-tools/blob/master/kafka-generate-ssl-automatic.sh). This script uses file kafka-hosts.txt:

kafka-hosts.txt
kafka-0 kafka-1 kafka-2

Each Apache Kafka broker should have its own keystore with CN=FQDN (Fully Qualified Domain Name). It will be used for endpoint authentication between brokers and clients. For docker environment  - kafka-0, kafka-1, kafka-2

#!/usr/bin/env bash set -eu export COUNTRY=RU export STATE= export ORGANIZATION_UNIT=ServiceUsers export CITY= export PASSWORD=secret export CN=kafka-admin validity=3650 defaultTruststoreName="kafka.truststore.jks" truststoreWorkDir="truststore" keystoreWorkDir="keystore" caKey="ca-cert" keystoreSignInRequest="cert-file" keystoreSignRequestSrl="ca-cert.srl" keystoreSignedCert="cert-signed" kafkaHostsFile="kafka-hosts.txt" country=$COUNTRY state=$STATE OU=$ORGANIZATION_UNIT CN=$CN location=$CITY password=$PASSWORD function file_exists_and_exit() { echo "'$1' cannot exist. Move or delete it before" echo "re-running this script." exit 1 } if [ -e "$keystoreWorkDir" ]; then file_exists_and_exit $keystoreWorkDir fi if [ -e "$caKey" ]; then file_exists_and_exit $caKey fi if [ -e "$keystoreSignInRequest" ]; then file_exists_and_exit $keystoreSignInRequest fi if [ -e "$keystoreSignRequestSrl" ]; then file_exists_and_exit $keystoreSignRequestSrl fi if [ -e "$keystoreSignedCert" ]; then file_exists_and_exit $keystoreSignedCert fi if [ -e "$kafkaHostsFile" ]; then echo "'$kafkaHostsFile' does not exists. Create this file" echo 1 fi echo "Welcome to the Kafka SSL keystore and trust store generator script." truststoreFile="" truststorePrivateKey="" if [ ! -e "$truststoreWorkDir" ]; then mkdir $truststoreWorkDir echo echo "OK, we'll generate a trust store and associated private key." echo echo "First, the private key." echo openssl req -new -x509 -keyout $truststoreWorkDir/ca-key \ -out $truststoreWorkDir/ca-cert -days $validity -nodes \ -subj "/C=$country/ST=$state/L=$location/O=$OU/CN=$CN" truststorePrivateKey="$truststoreWorkDir/ca-key" echo echo "Two files were created:" echo " - $truststoreWorkDir/ca-key -- the private key used later to" echo " sign certificates" echo " - $truststoreWorkDir/ca-cert -- the certificate that will be" echo " stored in the trust store in a moment and serve as the certificate" echo " authority (CA). Once this certificate has been stored in the trust" echo " store, it will be deleted. It can be retrieved from the trust store via:" echo " $ keytool -keystore <trust-store-file> -export -alias CARoot -rfc" echo echo "Now the trust store will be generated from the certificate." echo keytool -keystore $truststoreWorkDir/$defaultTruststoreName \ -alias CARoot -import -file $truststoreWorkDir/ca-cert \ -noprompt -dname "C=$country, ST=$state, L=$location, O=$OU, CN=$CN" -keypass $password -storepass $password truststoreFile="$truststoreWorkDir/$defaultTruststoreName" echo echo "$truststoreWorkDir/$defaultTruststoreName was created." # don't need the cert because it's in the trust store. rm $truststoreWorkDir/$caKey echo echo "Continuing with:" echo " - trust store file: $truststoreFile" echo " - trust store private key: $truststorePrivateKey" else truststorePrivateKey="$truststoreWorkDir/ca-key" truststoreFile="$truststoreWorkDir/$defaultTruststoreName" fi mkdir $keystoreWorkDir while read -r kafkaHost || [ -n "$kafkaHost" ]; do echo echo "Now, a keystore will be generated. Each broker and logical client needs its own" echo "keystore. This script will create only one keystore. Run this script multiple" echo "times for multiple keystores." echo echo " NOTE: currently in Kafka, the Common Name (CN) does not need to be the FQDN of" echo " this host. However, at some point, this may change. As such, make the CN" echo " the FQDN. Some operating systems call the CN prompt 'first / last name'" # To learn more about CNs and FQDNs, read: # https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509ExtendedTrustManager.html keystoreFileName="$kafkaHost.server.keystore.jks" keytool -keystore $keystoreWorkDir/"$keystoreFileName" \ -alias localhost -validity $validity -genkey -keyalg RSA \ -noprompt -dname "C=$country, ST=$state, L=$location, O=$OU, CN=$kafkaHost" \ -keypass $password -storepass $password echo echo "'$keystoreWorkDir/$keystoreFileName' now contains a key pair and a" echo "self-signed certificate. Again, this keystore can only be used for one broker or" echo "one logical client. Other brokers or clients need to generate their own keystores." echo echo "Fetching the certificate from the trust store and storing in $caKey." echo keytool -keystore $truststoreFile -export -alias CARoot -rfc -file $caKey -keypass $password -storepass $password echo echo "Now a certificate signing request will be made to the keystore." echo keytool -keystore $keystoreWorkDir/"$keystoreFileName" -alias localhost \ -certreq -file $keystoreSignInRequest -keypass $password -storepass $password echo echo "Now the trust store's private key (CA) will sign the keystore's certificate." echo openssl x509 -req -CA $caKey -CAkey $truststorePrivateKey \ -in $keystoreSignInRequest -out $keystoreSignedCert \ -days $validity -CAcreateserial # creates $keystoreSignRequestSrl which is never used or needed. echo echo "Now the CA will be imported into the keystore." echo keytool -keystore $keystoreWorkDir/"$keystoreFileName" -alias CARoot \ -import -file $caKey -keypass $password -storepass $password -noprompt rm $caKey # delete the trust store cert because it's stored in the trust store. echo echo "Now the keystore's signed certificate will be imported back into the keystore." echo keytool -keystore $keystoreWorkDir/"$keystoreFileName" -alias localhost -import \ -file $keystoreSignedCert -keypass $password -storepass $password echo echo "All done!" echo echo "Deleting intermediate files. They are:" echo " - '$keystoreSignRequestSrl': CA serial number" echo " - '$keystoreSignInRequest': the keystore's certificate signing request" echo " (that was fulfilled)" echo " - '$keystoreSignedCert': the keystore's certificate, signed by the CA, and stored back" echo " into the keystore" rm $keystoreSignRequestSrl rm $keystoreSignInRequest rm $keystoreSignedCert done < "$kafkaHostsFile"

Enabling SSL

To enable SSL communication between brokers and Kafka clients following settings should be added to the Apache Kafka server.properties file

# Enables SSL for inter broker communication and for Kafka clients inter.broker.listener.name=INTERNAL listeners=INTERNAL://:9091,EXTERNAL://:9092 advertised.listeners=INTERNAL://:19091,EXTERNAL://kafka-1:19092 listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SSL # SSL broker settings ssl.protocol=SSL ssl.key.password={ca-key password} ssl.endpoint.identification.algorithm=HTTPS ssl.keystore.type=jks ssl.keystore.password={keystore password) ssl.keystore.location=${path to the server.keystore.jks} ssl.truststore.type=jks ssl.truststore.password=${truststore password} ssl.truststore.location=${path to the server.truststore.jks}

Kafka clients should have it's own keystore/truststore pair and they should be added to the client configuration:

Spring Boot

spring: kafka: security: protocol: SSL ssl: key-store-location: ${path to client.keystore.jks} key-store-password: ${keystorePassword} trust-store-location: ${truststoreLocation} trust-store-password: ${path to the client.truststore.jks}

VertX

Producer/consumer settings include the following values from environment variables:

security.protocol: SSL ssl.keystore.password: ${keystore password) ssl.keystore.location: ${path to the client.keystore.jks} ssl.truststore.password: ${truststorePassword} ssl.truststore.location: ${path to the client.truststore.jks}

ACL

Enabling ACL

To enable ACL following values should be added to the Kafka server.properties file

ssl.client.auth=required allow.everyone.if.no.acl.found=false authorizer.class.name=kafka.security.authorizer.AclAuthorizer super.users=User:kafka # These rules translate the DN as follows: CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown to serviceuser and CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown to adminuser@admin. ssl.principal.mapping.rules=RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,DEFAULT

Enabling access for topics with the prefix

Script to enable producer/consumer access to ${username} for all topics with ${prefix}

kafka-acls.sh --bootstrap-server ${kafkaHost} --command-config ${configPath} --add --allow-principal User:${username} --producer --topic ${prefix} --resource-pattern-type prefixed kafka-acls.sh --bootstrap-server ${kafkaHost} --command-config ${configPath} --add --allow-principal User:${username} --consumer-- topic ${prefix} --group ${consumerGroup} --resource-pattern-type prefixed

Script to revoke access producer/consumer

kafka-acls.sh --bootstrap-server ${kafkaHost} --command-config ${configPath} --remove --allow-principal User:${username} --producer --topic ${prefix} --resource-pattern-type prefixed kafka-acls.sh --bootstrap-server ${kafkaHost} --command-config ${configPath} --remove --allow-principal User:${username} --consumer-- topic ${prefix} --group ${consumerGroup} --resource-pattern-type prefixed