/
Enabling SSL and ACL for Kafka

Enabling SSL and ACL for Kafka

Table of Contents

Useful links

Local configuration (Docker)

This configuration contains 3 Apache Kafka brokers in a cluster with a single Apache Zookeeper instance. Each broker mounts to the /tmp directory 2 folders: jks contains keystores and truststore for servers, client - contains client property file and client keystore/truststore.

docker-compose,.yaml
version: "3.9"

services:
  kafka-0:
    image: wurstmeister/kafka:2.13-2.7.0
    container_name: kafka-0
    hostname: kafka-0
    depends_on:
      - zookeeper
    volumes:
      - ./jks:/tmp/jks
      - ./client:/tmp/client
    ports:
      - 9092:9092
      - 9091:9091
      - 10000:10000
    env_file:
      - kafka-env.env
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://:9091,EXTERNAL://kafka-0:9092
      KAFKA_LISTENERS: INTERNAL://:9091,EXTERNAL://:9092
      KAFKA_SSL_KEYSTORE_LOCATION: /tmp/jks/kafka-0.keystore.jks

  kafka-1:
    image: wurstmeister/kafka:2.13-2.7.0
    container_name: kafka-1
    hostname: kafka-1
    depends_on:
      - zookeeper
    volumes:
      - ./jks:/tmp/jks
      - ./client:/tmp/client
    ports:
      - 19092:19092
    env_file:
      - kafka-env.env
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://:19091,EXTERNAL://kafka-1:19092
      KAFKA_LISTENERS: INTERNAL://:19091,EXTERNAL://:19092
      KAFKA_ADVERTISED_HOST_NAME: kafka-1
      KAFKA_SSL_KEYSTORE_LOCATION: /tmp/jks/kafka-1.keystore.jks

  kafka-2:
    image: wurstmeister/kafka:2.13-2.7.0
    container_name: kafka-2
    hostname: kafka-2
    depends_on:
      - zookeeper
    volumes:
      - ./jks:/tmp/jks
      - ./client:/tmp/client
    ports:
      - 29092:29092
    env_file:
      - kafka-env.env
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://:29091,EXTERNAL://kafka-2:29092
      KAFKA_LISTENERS: INTERNAL://:29091,EXTERNAL://:29092
      KAFKA_SSL_KEYSTORE_LOCATION: /tmp/jks/kafka-2.keystore.jks

  zookeeper:
    image: zookeeper:3.6.2
    container_name: zookeeper
    ports:
      - 2181:2181
      - 8080:8080
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOO_PORT_NUMBER: 2181

Kafka environment variables are depend on the Kafka docker image, for example, bitnami/kafka uses a prefix KAFKA_CFG_ for environment parameters. For wurstmeister kafka these values have been provided (Eventually, they will be added to the server.properties file in Kafka instance):

kafka-env.env
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR=2
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=2
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR=3

KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SSL,EXTERNAL:SSL
KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL

KAFKA_SSL_TRUSTSTORE_TYPE=jks
KAFKA_SSL_TRUSTSTORE_LOCATION=/tmp/jks/kafka.truststore.jks
KAFKA_SSL_TRUSTSTORE_PASSWORD=secret
KAFKA_SSL_KEYSTORE_PASSWORD=secret
KAFKA_SSL_KEY_PASSWORD=secret
KAFKA_SSL_CLIENT_AUTH=required

KAFKA_SSL_KEYSTORE_TYPE=jks
KAFKA_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=HTTPS

KAFKA_AUTHORIZER_CLASS_NAME=kafka.security.authorizer.AclAuthorizer
KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND=false
KAFKA_SUPER_USERS=User:kafka

KAFKA_MIN_INSYNC_REPLICAS=2
KAFKA_MESSAGE_MAX_BYTES: 1000000
KAFKA_AUTO_CREATE_TOPICS_ENABLE=false
KAFKA_SSL_PRINCIPAL_MAPPING_RULES='RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,DEFAULT'

SSL Configuration

Generating SSL certificates

Script to generate certificates (slightly modified from https://github.com/confluentinc/confluent-platform-security-tools/blob/master/kafka-generate-ssl-automatic.sh). This script uses file kafka-hosts.txt:

kafka-hosts.txt
kafka-0
kafka-1
kafka-2

Each Apache Kafka broker should have its own keystore with CN=FQDN (Fully Qualified Domain Name). It will be used for endpoint authentication between brokers and clients. For docker environment  - kafka-0, kafka-1, kafka-2

generate-ssl.sh
#!/usr/bin/env bash

set -eu

export COUNTRY=RU
export STATE=
export ORGANIZATION_UNIT=ServiceUsers
export CITY=
export PASSWORD=secret
export CN=kafka-admin

validity=3650
defaultTruststoreName="kafka.truststore.jks"
truststoreWorkDir="truststore"
keystoreWorkDir="keystore"
caKey="ca-cert"
keystoreSignInRequest="cert-file"
keystoreSignRequestSrl="ca-cert.srl"
keystoreSignedCert="cert-signed"
kafkaHostsFile="kafka-hosts.txt"

country=$COUNTRY
state=$STATE
OU=$ORGANIZATION_UNIT
CN=$CN
location=$CITY
password=$PASSWORD

function file_exists_and_exit() {
echo "'$1' cannot exist. Move or delete it before"
echo "re-running this script."
exit 1
}

if [ -e "$keystoreWorkDir" ]; then
  file_exists_and_exit $keystoreWorkDir
fi

if [ -e "$caKey" ]; then
  file_exists_and_exit $caKey
fi

if [ -e "$keystoreSignInRequest" ]; then
  file_exists_and_exit $keystoreSignInRequest
fi

if [ -e "$keystoreSignRequestSrl" ]; then
  file_exists_and_exit $keystoreSignRequestSrl
fi

if [ -e "$keystoreSignedCert" ]; then
  file_exists_and_exit $keystoreSignedCert
fi

if [ -e "$kafkaHostsFile" ]; then
  echo "'$kafkaHostsFile' does not exists. Create this file"
  echo 1
fi

echo "Welcome to the Kafka SSL keystore and trust store generator script."

truststoreFile=""
truststorePrivateKey=""

if [ ! -e "$truststoreWorkDir" ]; then
  mkdir $truststoreWorkDir
  echo
  echo "OK, we'll generate a trust store and associated private key."
  echo
  echo "First, the private key."
  echo

  openssl req -new -x509 -keyout $truststoreWorkDir/ca-key \
    -out $truststoreWorkDir/ca-cert -days $validity -nodes \
    -subj "/C=$country/ST=$state/L=$location/O=$OU/CN=$CN"

  truststorePrivateKey="$truststoreWorkDir/ca-key"

  echo
  echo "Two files were created:"
  echo " - $truststoreWorkDir/ca-key -- the private key used later to"
  echo " sign certificates"
  echo " - $truststoreWorkDir/ca-cert -- the certificate that will be"
  echo " stored in the trust store in a moment and serve as the certificate"
  echo " authority (CA). Once this certificate has been stored in the trust"
  echo " store, it will be deleted. It can be retrieved from the trust store via:"
  echo " $ keytool -keystore <trust-store-file> -export -alias CARoot -rfc"

  echo
  echo "Now the trust store will be generated from the certificate."
  echo

  keytool -keystore $truststoreWorkDir/$defaultTruststoreName \
    -alias CARoot -import -file $truststoreWorkDir/ca-cert \
    -noprompt -dname "C=$country, ST=$state, L=$location, O=$OU, CN=$CN" -keypass $password -storepass $password

  truststoreFile="$truststoreWorkDir/$defaultTruststoreName"

  echo
  echo "$truststoreWorkDir/$defaultTruststoreName was created."

  # don't need the cert because it's in the trust store.
  rm $truststoreWorkDir/$caKey

  echo
  echo "Continuing with:"
  echo " - trust store file: $truststoreFile"
  echo " - trust store private key: $truststorePrivateKey"

else
  truststorePrivateKey="$truststoreWorkDir/ca-key"
  truststoreFile="$truststoreWorkDir/$defaultTruststoreName"
fi

mkdir $keystoreWorkDir

while read -r kafkaHost || [ -n "$kafkaHost" ]; do
  echo
  echo "Now, a keystore will be generated. Each broker and logical client needs its own"
  echo "keystore. This script will create only one keystore. Run this script multiple"
  echo "times for multiple keystores."
  echo
  echo " NOTE: currently in Kafka, the Common Name (CN) does not need to be the FQDN of"
  echo " this host. However, at some point, this may change. As such, make the CN"
  echo " the FQDN. Some operating systems call the CN prompt 'first / last name'"

  # To learn more about CNs and FQDNs, read:
  # https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/X509ExtendedTrustManager.html

  keystoreFileName="$kafkaHost.server.keystore.jks"

  keytool -keystore $keystoreWorkDir/"$keystoreFileName" \
    -alias localhost -validity $validity -genkey -keyalg RSA \
    -noprompt -dname "C=$country, ST=$state, L=$location, O=$OU, CN=$kafkaHost" \
    -keypass $password -storepass $password

  echo
  echo "'$keystoreWorkDir/$keystoreFileName' now contains a key pair and a"
  echo "self-signed certificate. Again, this keystore can only be used for one broker or"
  echo "one logical client. Other brokers or clients need to generate their own keystores."

  echo
  echo "Fetching the certificate from the trust store and storing in $caKey."
  echo

  keytool -keystore $truststoreFile -export -alias CARoot -rfc -file $caKey -keypass $password -storepass $password

  echo
  echo "Now a certificate signing request will be made to the keystore."
  echo
  keytool -keystore $keystoreWorkDir/"$keystoreFileName" -alias localhost \
    -certreq -file $keystoreSignInRequest -keypass $password -storepass $password

  echo
  echo "Now the trust store's private key (CA) will sign the keystore's certificate."
  echo
  openssl x509 -req -CA $caKey -CAkey $truststorePrivateKey \
    -in $keystoreSignInRequest -out $keystoreSignedCert \
    -days $validity -CAcreateserial
  # creates $keystoreSignRequestSrl which is never used or needed.

  echo
  echo "Now the CA will be imported into the keystore."
  echo
  keytool -keystore $keystoreWorkDir/"$keystoreFileName" -alias CARoot \
    -import -file $caKey -keypass $password -storepass $password -noprompt
    rm $caKey # delete the trust store cert because it's stored in the trust store.

  echo
  echo "Now the keystore's signed certificate will be imported back into the keystore."
  echo
  keytool -keystore $keystoreWorkDir/"$keystoreFileName" -alias localhost -import \
    -file $keystoreSignedCert -keypass $password -storepass $password

  echo
  echo "All done!"
  echo
  echo "Deleting intermediate files. They are:"
  echo " - '$keystoreSignRequestSrl': CA serial number"
  echo " - '$keystoreSignInRequest': the keystore's certificate signing request"
  echo " (that was fulfilled)"
  echo " - '$keystoreSignedCert': the keystore's certificate, signed by the CA, and stored back"
  echo " into the keystore"

  rm $keystoreSignRequestSrl
  rm $keystoreSignInRequest
  rm $keystoreSignedCert
done < "$kafkaHostsFile"

Enabling SSL

To enable SSL communication between brokers and Kafka clients following settings should be added to the Apache Kafka server.properties file

# Enables SSL for inter broker communication and for Kafka clients
inter.broker.listener.name=INTERNAL
listeners=INTERNAL://:9091,EXTERNAL://:9092
advertised.listeners=INTERNAL://:19091,EXTERNAL://kafka-1:19092
listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SSL

# SSL broker settings
ssl.protocol=SSL
ssl.key.password={ca-key password}
ssl.endpoint.identification.algorithm=HTTPS

ssl.keystore.type=jks
ssl.keystore.password={keystore password)
ssl.keystore.location=${path to the server.keystore.jks}

ssl.truststore.type=jks
ssl.truststore.password=${truststore password}
ssl.truststore.location=${path to the server.truststore.jks}

Kafka clients should have it's own keystore/truststore pair and they should be added to the client configuration:

Spring Boot

spring:
  kafka:
   security:
     protocol: SSL
   ssl:
     key-store-location: ${path to client.keystore.jks}
     key-store-password: ${keystorePassword}
     trust-store-location: ${truststoreLocation}
     trust-store-password: ${path to the client.truststore.jks}

VertX

Producer/consumer settings include the following values from environment variables:

security.protocol: SSL
ssl.keystore.password: ${keystore password)
ssl.keystore.location: ${path to the client.keystore.jks}
ssl.truststore.password: ${truststorePassword}
ssl.truststore.location: ${path to the client.truststore.jks}

ACL

Enabling ACL

To enable ACL following values should be added to the Kafka server.properties file

ssl.client.auth=required
allow.everyone.if.no.acl.found=false
authorizer.class.name=kafka.security.authorizer.AclAuthorizer
super.users=User:kafka
# These rules translate the DN as follows: CN=serviceuser,OU=ServiceUsers,O=Unknown,L=Unknown,ST=Unknown,C=Unknown to serviceuser and CN=adminUser,OU=Admin,O=Unknown,L=Unknown,ST=Unknown,C=Unknown to adminuser@admin.
ssl.principal.mapping.rules=RULE:^CN=(.*?),OU=ServiceUsers.*$/$1/,RULE:^CN=(.*?),OU=(.*?),O=(.*?),L=(.*?),ST=(.*?),C=(.*?)$/$1@$2/L,RULE:^.*[Cc][Nn]=([a-zA-Z0-9.]*).*$/$1/L,DEFAULT

Enabling access for topics with the prefix

Script to enable producer/consumer access to ${username} for all topics with ${prefix}

kafka-acls.sh --bootstrap-server ${kafkaHost} --command-config ${configPath} --add --allow-principal User:${username} --producer --topic ${prefix} --resource-pattern-type prefixed
kafka-acls.sh --bootstrap-server ${kafkaHost} --command-config ${configPath} --add --allow-principal User:${username} --consumer-- topic ${prefix} --group ${consumerGroup} --resource-pattern-type prefixed

Script to revoke access producer/consumer

kafka-acls.sh --bootstrap-server ${kafkaHost} --command-config ${configPath} --remove --allow-principal User:${username} --producer --topic ${prefix} --resource-pattern-type prefixed
kafka-acls.sh --bootstrap-server ${kafkaHost} --command-config ${configPath} --remove --allow-principal User:${username} --consumer-- topic ${prefix} --group ${consumerGroup} --resource-pattern-type prefixed