During my previous tutorials, I showed how to setup Kafka Oauth Bearer producer/consumer with Azure Event Hubs&Azure AD, Keycloak&Kafka on C++, Python, CLI.

In this tutorial, you will learn how to run Kafka Oauthbearer setup in Docker with multiple listeners — including SASL_SSL. We will verify it after by simulating the workload using Python script.

Table of Contents

Introduction

In order to configure the total setup, we will need:

  1. docker-kafka (e097b944c505655fca3884bd0632dcb389828057) repository that runs both Kafka&Zookeeper in the same container. Why? No dependency on an external Zookeeper host, or linking to another container — Zookeeper and Kafka are configured to work together out of the box.
  2. keycloak/keycloak (20.0.3) docker container as Oauth Server for Kafka OauthBearer. Why? We want to run Oauth Server separately from Kafka container to be able to configure different token providers.
  3. confluent_kafka Python library to test Kafka listeners. Why? Python is used for simplicity and workflow simulation.

Such docker setup is expected to be used in testing environments, so we will have self-signed SSL certs for SASL_SSL listener and PLAINTEXT as security.inter.broker.protocol.

docker-kafka

To build the needed image, one can use build-suported-versions.sh.

export KAFKA_VERSION=3.3.1
export SCALA_VERSION=2.13
build_and_tag oauth

Uncomment all but oauth docker image and execute ./build-suported-versions.sh 1 to build the needed images.

REPOSITORY                                                                        TAG                       IMAGE ID       CREATED          SIZE
memsql/kafka_oauth                                                                latest                    98821ceb8b59   16 minutes ago   1.92GB
psy3.memcompute.com/schema_kafka-oauth                                            3.3.1.1                   98821ceb8b59   16 minutes ago   1.92GB
memsql/kafka                                                                      latest                    cbd98355ae1b   16 minutes ago   1.92GB
psy3.memcompute.com/schema_kafka                                                  3.3.1.1                   cbd98355ae1b   16 minutes ago   1.92GB
memsql/kafka_base                                                                 1                         d24f9c8f6d6f   21 minutes ago   1.81GB

For this tutorial, we will use psy3.memcompute.com/schema_kafka-oauth, which exposes SSH, PLAINTEXT, schema registry SASL_PLAINTEXT, SASL_SSL ports. psy3.memcompute.com/schema_kafka, on the other hand, has PLAINTEXT listener only.

We will verify that the image actually works later with Python because the setup requires Keycloak client configuration, so for now, let's dig more into the build scripts.

Internally, build-suported-versions.sh depends on Dockerfile.oauth and start-kafka-oauth.sh. We configure three listeners: PLAINTEXT for unsecured data ingestion, SASL_PLAINTEXT, SASL_SSL for oauthbearer.

# Kafka 3.3.1: The advertised.port and advertised.host.name configurations were removed. Please use advertised.listeners instead.
LISTENERS="PLAINTEXT:\/\/:9092, SASL_PLAINTEXT:\/\/:9093, SASL_SSL:\/\/:9094"
ADVERTISED_LISTENERS="PLAINTEXT:\/\/${HOSTNAME}:9092, SASL_PLAINTEXT:\/\/${HOSTNAME}:9093, SASL_SSL:\/\/${HOSTNAME}:9094"

On start of the docker container, the script expects OAUTHBEARER_CONFIG environment variable as base64 encoded JSON that has respective oauth config fields client_id, client_secret, token_url, jwks_url, audience.

# Kafka 3.3.1: The advertised.port and advertised.host.name configurations were removed. Please use advertised.listeners instead.
LISTENERS="PLAINTEXT:\/\/:9092, SASL_PLAINTEXT:\/\/:9093, SASL_SSL:\/\/:9094"
ADVERTISED_LISTENERS="PLAINTEXT:\/\/${HOSTNAME}:9092, SASL_PLAINTEXT:\/\/${HOSTNAME}:9093, SASL_SSL:\/\/${HOSTNAME}:9094"
...


if ! grep -r -q "sasl.enabled.mechanisms=OAUTHBEARER" ${CONFIG_PATH}; then
    CONFIG_JSON=$(echo "${OAUTHBEARER_CONFIG}" | base64 -d)
    echo "Setting OAUTHBEARER config settings ${CONFIG_JSON}"

    CLIENT_ID=$(echo "${CONFIG_JSON}" | jq -r '.client_id')
    CLIENT_SECRET=$(echo "${CONFIG_JSON}" | jq -r '.client_secret')


    cat << EOF >> ${CONFIG_PATH}

sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER
security.inter.broker.protocol=PLAINTEXT

sasl.oauthbearer.token.endpoint.url=$(echo "${CONFIG_JSON}" | jq -r '.token_url')
sasl.oauthbearer.jwks.endpoint.url=$(echo "${CONFIG_JSON}" | jq -r '.jwks_url')
sasl.oauthbearer.expected.audience=$(echo "${CONFIG_JSON}" | jq -r '.audience')

listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=${CLIENT_ID} clientSecret=${CLIENT_SECRET} unsecuredLoginStringClaim_sub='sub';

listener.name.sasl_ssl.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
listener.name.sasl_ssl.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
listener.name.sasl_ssl.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required clientId=${CLIENT_ID} clientSecret=${CLIENT_SECRET} unsecuredLoginStringClaim_sub='sub';
EOF
fi

For the actual token request, validation logic, we will use OIDC implementation provided by Kafka after KIP-768 — OAuthBearerValidatorCallbackHandler, OAuthBearerLoginCallbackHandler.

keycloak docker

docker pull keycloak/keycloak:20.0.3
20.0.3: Pulling from keycloak/keycloak
Digest: sha256:b8f2a453a17a244a829fdafdb08dd77f719d3622bc3987c76a81771c0913b882
Status: Image is up to date for keycloak/keycloak:20.0.3
docker.io/keycloak/keycloak:20.0.3

After deploying the container, we will need to create a client_credentials client using kcadm.sh create clients and config exported from the Keycloak setup in my first tutorial.

To export the client configuration in JSON, one should click Clients -> Client Details -> Action -> Export.

None

Then the content of the kafka-broker.json should be similar to below.

{
  "clientId": "kafka-broker",
  "name": "",
  "description": "",
  "rootUrl": "",
  "adminUrl": "",
  "baseUrl": "",
  "surrogateAuthRequired": false,
  "enabled": true,
  "alwaysDisplayInConsole": false,
  "clientAuthenticatorType": "client-secret",
  "secret": "CHnRHGf5Crr1YC4pU6eULk9lPBuVdJba",
  "redirectUris": [],
  "webOrigins": [],
  "notBefore": 0,
  "bearerOnly": false,
  "consentRequired": false,
  "standardFlowEnabled": false,
  "implicitFlowEnabled": false,
  "directAccessGrantsEnabled": false,
  "serviceAccountsEnabled": true,
  "publicClient": false,
  "frontchannelLogout": true,
  "protocol": "openid-connect",
  "attributes": {
 "access.token.lifespan": "3600",
    "oidc.ciba.grant.enabled": "false",
    "client.secret.creation.time": "1674663139",
    "backchannel.logout.session.required": "true",
    "oauth2.device.authorization.grant.enabled": "false",
    "display.on.consent.screen": "false",
    "backchannel.logout.revoke.offline.tokens": "false"
  },
  "authenticationFlowBindingOverrides": {},
  "fullScopeAllowed": true,
  "nodeReRegistrationTimeout": -1,
  "protocolMappers": [
    {
      "name": "Client Host",
      "protocol": "openid-connect",
      "protocolMapper": "oidc-usersessionmodel-note-mapper",
      "consentRequired": false,
      "config": {
        "user.session.note": "clientHost",
        "id.token.claim": "true",
        "access.token.claim": "true",
        "claim.name": "clientHost",
        "jsonType.label": "String"
      }
    },
    {
      "name": "Client ID",
      "protocol": "openid-connect",
      "protocolMapper": "oidc-usersessionmodel-note-mapper",
      "consentRequired": false,
      "config": {
        "user.session.note": "clientId",
        "id.token.claim": "true",
        "access.token.claim": "true",
        "claim.name": "clientId",
        "jsonType.label": "String"
      }
    },
    {
      "name": "Client IP Address",
      "protocol": "openid-connect",
      "protocolMapper": "oidc-usersessionmodel-note-mapper",
      "consentRequired": false,
      "config": {
        "user.session.note": "clientAddress",
        "id.token.claim": "true",
        "access.token.claim": "true",
        "claim.name": "clientAddress",
        "jsonType.label": "String"
      }
    }
  ],
  "defaultClientScopes": [
    "web-origins",
    "acr",
    "profile",
    "roles",
    "email"
  ],
  "optionalClientScopes": [
    "address",
    "phone",
    "offline_access",
    "microprofile-jwt"
  ],
  "access": {
    "view": true,
    "configure": true,
    "manage": true
  }
}

Keycloak docker deployment

Let's write a Python script to do the deployment. The idea is to start a keycloak docker container, wait until it's up, add a client using the exported config, request a token to verify the setup.

class KeycloakOauthContainer(LocalContainer):
    KEYCLOAK_CLIENT_JSON = base64.standard_b64encode(open('keycloak_client.json').read().encode()).decode()
    def __init__(self):
        super().__init__(
            'keycloak/keycloak:20.0.3',
            'start-dev',
            [('8080', '8080')],
            { 'KEYCLOAK_ADMIN':'admin', 'KEYCLOAK_ADMIN_PASSWORD':'admin' }
        )

        auth_cmd = '--server http://%s:8080 --realm master --user admin --password admin' % self.ip
        cmd = "docker exec %s /opt/keycloak/bin/kcadm.sh get realms %s" % (self.container_id, auth_cmd)

        def ping_fn():
            subprocess.check_output(cmd, shell=True)

        WaitOnlineGenericContainer(ping_fn, PING_TIME=8, err_msg='Failed to init Keycloak')

        # Alternative is installing kcadm.sh to avoid echo | base64 logic
        #
        cmd = "echo '%s' | base64 -d > /tmp/oauth_client.json && " \
            "/opt/keycloak/bin/kcadm.sh create clients %s -f /tmp/oauth_client.json" % (
            KeycloakOauthContainer.KEYCLOAK_CLIENT_JSON,
            auth_cmd
        )
        cmd = "docker exec %s bash -c \"%s\"" % (self.container_id, cmd)

        print('Adding OpenID Keycloak client with %s' % (cmd))
        subprocess.check_output(cmd, shell=True)

        self.client_id = "kafka-client"
        self.client_secret = "CHnRHGf5Crr1YC4pU6eULk9lPBuVdJba"
        self.token_url = "http://%s:8080/realms/master/protocol/openid-connect/token" % self.ip
        self.server_config = {
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "token_url": self.token_url,
            "jwks_url": "http://%s:8080/realms/master/protocol/openid-connect/certs" % self.ip,
            "audience": "account"
        }

For the token request we will use python requests library. The format is described in my first tutorial.

def get_token(self):
        resp = requests.post(
            self.token_url,
            auth=(self.client_id, self.client_secret),
            data='grant_type=client_credentials',
            headers={'Content-Type': 'application/x-www-form-urlencoded'}
        )
        return resp.json()

Verifying the execution with the script below.

if __name__ == '__main__':
    keycloak = KeycloakOauthContainer()
    print("Token:", keycloak.get_token())
Running docker run -d --rm -p 8080:8080 -e KEYCLOAK_ADMIN=admin -e KEYCLOAK_ADMIN_PASSWORD=admin  keycloak/keycloak:20.0.3 start-dev 
Container running with id=38db2ee26fae374efb4d31c1e9bfc0910057129db87e9652f595c921dd829e57, ip=172.17.0.2
Logging into http://172.17.0.2:8080 as user admin of realm master
Failed to send request - Connect to 172.17.0.2:8080 [/172.17.0.2] failed: Connection refused (Connection refused)
Logging into http://172.17.0.2:8080 as user admin of realm master
Failed to send request - Connect to 172.17.0.2:8080 [/172.17.0.2] failed: Connection refused (Connection refused)
Logging into http://172.17.0.2:8080 as user admin of realm master

Adding OpenID Keycloak client with docker exec 38db2ee26fae374efb4d31c1e9bfc0910057129db87e9652f595c921dd829e57 bash -c "echo 'ewogICJjbGllbnRJZCI6ICJrYWZrYS1icm9rZXIiLAogICJuYW1lIjogIiIsCiAgImRlc2NyaXB0aW9uIjogIiIsCiAgInJvb3RVcmwiOiAiIiwKICAiYWRtaW5VcmwiOiAiIiwKICAiYmFzZVVybCI6ICIiLAogICJzdXJyb2dhdGVBdXRoUmVxdWlyZWQiOiBmYWxzZSwKICAiZW5hYmxlZCI6IHRydWUsCiAgImFsd2F5c0Rpc3BsYXlJbkNvbnNvbGUiOiBmYWxzZSwKICAiY2xpZW50QXV0aGVudGljYXRvclR5cGUiOiAiY2xpZW50LXNlY3JldCIsCiAgInNlY3JldCI6ICJDSG5SSEdmNUNycjFZQzRwVTZlVUxrOWxQQnVWZEpiYSIsCiAgInJlZGlyZWN0VXJpcyI6IFtdLAogICJ3ZWJPcmlnaW5zIjogW10sCiAgIm5vdEJlZm9yZSI6IDAsCiAgImJlYXJlck9ubHkiOiBmYWxzZSwKICAiY29uc2VudFJlcXVpcmVkIjogZmFsc2UsCiAgInN0YW5kYXJkRmxvd0VuYWJsZWQiOiBmYWxzZSwKICAiaW1wbGljaXRGbG93RW5hYmxlZCI6IGZhbHNlLAogICJkaXJlY3RBY2Nlc3NHcmFudHNFbmFibGVkIjogZmFsc2UsCiAgInNlcnZpY2VBY2NvdW50c0VuYWJsZWQiOiB0cnVlLAogICJwdWJsaWNDbGllbnQiOiBmYWxzZSwKICAiZnJvbnRjaGFubmVsTG9nb3V0IjogdHJ1ZSwKICAicHJvdG9jb2wiOiAib3BlbmlkLWNvbm5lY3QiLAogICJhdHRyaWJ1dGVzIjogewoJImFjY2Vzcy50b2tlbi5saWZlc3BhbiI6ICIzNjAwIiwKICAgICJvaWRjLmNpYmEuZ3JhbnQuZW5hYmxlZCI6ICJmYWxzZSIsCiAgICAiY2xpZW50LnNlY3JldC5jcmVhdGlvbi50aW1lIjogIjE2NzQ2NjMxMzkiLAogICAgImJhY2tjaGFubmVsLmxvZ291dC5zZXNzaW9uLnJlcXVpcmVkIjogInRydWUiLAogICAgIm9hdXRoMi5kZXZpY2UuYXV0aG9yaXphdGlvbi5ncmFudC5lbmFibGVkIjogImZhbHNlIiwKICAgICJkaXNwbGF5Lm9uLmNvbnNlbnQuc2NyZWVuIjogImZhbHNlIiwKICAgICJiYWNrY2hhbm5lbC5sb2dvdXQucmV2b2tlLm9mZmxpbmUudG9rZW5zIjogImZhbHNlIgogIH0sCiAgImF1dGhlbnRpY2F0aW9uRmxvd0JpbmRpbmdPdmVycmlkZXMiOiB7fSwKICAiZnVsbFNjb3BlQWxsb3dlZCI6IHRydWUsCiAgIm5vZGVSZVJlZ2lzdHJhdGlvblRpbWVvdXQiOiAtMSwKICAicHJvdG9jb2xNYXBwZXJzIjogWwogICAgewogICAgICAibmFtZSI6ICJDbGllbnQgSG9zdCIsCiAgICAgICJwcm90b2NvbCI6ICJvcGVuaWQtY29ubmVjdCIsCiAgICAgICJwcm90b2NvbE1hcHBlciI6ICJvaWRjLXVzZXJzZXNzaW9ubW9kZWwtbm90ZS1tYXBwZXIiLAogICAgICAiY29uc2VudFJlcXVpcmVkIjogZmFsc2UsCiAgICAgICJjb25maWciOiB7CiAgICAgICAgInVzZXIuc2Vzc2lvbi5ub3RlIjogImNsaWVudEhvc3QiLAogICAgICAgICJpZC50b2tlbi5jbGFpbSI6ICJ0cnVlIiwKICAgICAgICAiYWNjZXNzLnRva2VuLmNsYWltIjogInRydWUiLAogICAgICAgICJjbGFpbS5uYW1lIjogImNsaWVudEhvc3QiLAogICAgICAgICJqc29uVHlwZS5sYWJlbCI6ICJTdHJpbmciCiAgICAgIH0KICAgIH0sCiAgICB7CiAgICAgICJuYW1lIjogIkNsaWVudCBJRCIsCiAgICAgICJwcm90b2NvbCI6ICJvcGVuaWQtY29ubmVjdCIsCiAgICAgICJwcm90b2NvbE1hcHBlciI6ICJvaWRjLXVzZXJzZXNzaW9ubW9kZWwtbm90ZS1tYXBwZXIiLAogICAgICAiY29uc2VudFJlcXVpcmVkIjogZmFsc2UsCiAgICAgICJjb25maWciOiB7CiAgICAgICAgInVzZXIuc2Vzc2lvbi5ub3RlIjogImNsaWVudElkIiwKICAgICAgICAiaWQudG9rZW4uY2xhaW0iOiAidHJ1ZSIsCiAgICAgICAgImFjY2Vzcy50b2tlbi5jbGFpbSI6ICJ0cnVlIiwKICAgICAgICAiY2xhaW0ubmFtZSI6ICJjbGllbnRJZCIsCiAgICAgICAgImpzb25UeXBlLmxhYmVsIjogIlN0cmluZyIKICAgICAgfQogICAgfSwKICAgIHsKICAgICAgIm5hbWUiOiAiQ2xpZW50IElQIEFkZHJlc3MiLAogICAgICAicHJvdG9jb2wiOiAib3BlbmlkLWNvbm5lY3QiLAogICAgICAicHJvdG9jb2xNYXBwZXIiOiAib2lkYy11c2Vyc2Vzc2lvbm1vZGVsLW5vdGUtbWFwcGVyIiwKICAgICAgImNvbnNlbnRSZXF1aXJlZCI6IGZhbHNlLAogICAgICAiY29uZmlnIjogewogICAgICAgICJ1c2VyLnNlc3Npb24ubm90ZSI6ICJjbGllbnRBZGRyZXNzIiwKICAgICAgICAiaWQudG9rZW4uY2xhaW0iOiAidHJ1ZSIsCiAgICAgICAgImFjY2Vzcy50b2tlbi5jbGFpbSI6ICJ0cnVlIiwKICAgICAgICAiY2xhaW0ubmFtZSI6ICJjbGllbnRBZGRyZXNzIiwKICAgICAgICAianNvblR5cGUubGFiZWwiOiAiU3RyaW5nIgogICAgICB9CiAgICB9CiAgXSwKICAiZGVmYXVsdENsaWVudFNjb3BlcyI6IFsKICAgICJ3ZWItb3JpZ2lucyIsCiAgICAiYWNyIiwKICAgICJwcm9maWxlIiwKICAgICJyb2xlcyIsCiAgICAiZW1haWwiCiAgXSwKICAib3B0aW9uYWxDbGllbnRTY29wZXMiOiBbCiAgICAiYWRkcmVzcyIsCiAgICAicGhvbmUiLAogICAgIm9mZmxpbmVfYWNjZXNzIiwKICAgICJtaWNyb3Byb2ZpbGUtand0IgogIF0sCiAgImFjY2VzcyI6IHsKICAgICJ2aWV3IjogdHJ1ZSwKICAgICJjb25maWd1cmUiOiB0cnVlLAogICAgIm1hbmFnZSI6IHRydWUKICB9Cn0K' | base64 -d > /tmp/oauth_client.json && /opt/keycloak/bin/kcadm.sh create clients --server http://172.17.0.2:8080 --realm master --user admin --password admin -f /tmp/oauth_client.json"
Logging into http://172.17.0.2:8080 as user admin of realm master
Created new client with id '00668095-9b3c-443d-ba10-795dbe433f38'
Token: {'access_token': 'eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJ0Z0Z2YlpKLUtqTTZmQmhqbnliTmxyX0lFdlFKU0JXQXllc1JYMFFNLXpnIn0.eyJleHAiOjE2ODgyMzY2MzgsImlhdCI6MTY4ODIzMzAzOCwianRpIjoiNWRkOWQzY2UtYzY4MS00YWEyLWI3MmQtMTE2YjY5Yjg1OTYxIiwiaXNzIjoiaHR0cDovLzE3Mi4xNy4wLjI6ODA4MC9yZWFsbXMvbWFzdGVyIiwiYXVkIjoiYWNjb3VudCIsInN1YiI6IjhmY2Q1OTE2LWJkYmEtNGM3Yy04NGZiLWNhZTM2ZDI5OTliMiIsInR5cCI6IkJlYXJlciIsImF6cCI6ImthZmthLWJyb2tlciIsImFjciI6IjEiLCJhbGxvd2VkLW9yaWdpbnMiOlsiLyoiXSwicmVhbG1fYWNjZXNzIjp7InJvbGVzIjpbImRlZmF1bHQtcm9sZXMtbWFzdGVyIiwib2ZmbGluZV9hY2Nlc3MiLCJ1bWFfYXV0aG9yaXphdGlvbiJdfSwicmVzb3VyY2VfYWNjZXNzIjp7ImFjY291bnQiOnsicm9sZXMiOlsibWFuYWdlLWFjY291bnQiLCJtYW5hZ2UtYWNjb3VudC1saW5rcyIsInZpZXctcHJvZmlsZSJdfX0sInNjb3BlIjoicHJvZmlsZSBlbWFpbCIsImNsaWVudElkIjoia2Fma2EtYnJva2VyIiwiY2xpZW50SG9zdCI6IjE3Mi4xNy4wLjEiLCJlbWFpbF92ZXJpZmllZCI6ZmFsc2UsInByZWZlcnJlZF91c2VybmFtZSI6InNlcnZpY2UtYWNjb3VudC1rYWZrYS1icm9rZXIiLCJjbGllbnRBZGRyZXNzIjoiMTcyLjE3LjAuMSJ9.Y5rHixr6tOPbToZjtnPuHnOjRykBYB6TLpIDJ9p3ZWAQ4IuhffVBm_FZuc2IB_EI2qJHgOHEKuoTr5D2HJZcXJXaIzKkqZHbFTHFSClZYlUDVzdgk5T2tZaHpVPg-YwFAII0D9jYdAiiK9eiLu1LRlWurwB55uOU6BHJ33x5s01sUd3P8veLPzDJbcJOz-0ZytYVD-LCcfNAe7fxcRdOporamYYTi257OU1lRxiVOLLxG4jVDwQAq-XZTS2dslFXsbY6xkynPz2VzCzU0pujclVzLGQRHqHIU53hFU0R9XidwYD9gUHGTfpKi-qfCTGMEdHwlUAFLGWweXfT9vV5YA', 'expires_in': 3600, 'refresh_expires_in': 0, 'token_type': 'Bearer', 'not-before-policy': 0, 'scope': 'profile email'}

As we can see, the token request was successful. Now, let's do the Kafka setup that uses KeycloakOauthContainer.server_config field as base64 encoded JSON.

Kafka docker deployment

For the Kafka docker container to start, we set the OAUTHBEARER_CONFIG environment variable to the created Keycloak client, wait until Kafka is up, copy SSL-related files from the Kafka container after.

class KafkaOauthSSLContainer(LocalContainer):
    def __init__(self, oauth_json_conf):
        oauth_json_conf = base64.standard_b64encode(json.dumps(oauth_json_conf).encode()).decode()
        container_cmd = 'export OAUTHBEARER_CONFIG=%s' % oauth_json_conf
        container_cmd += ' &&  bash /scripts/start.sh | tee /var/log/script_start.log'
        container_cmd = "bash -c '%s'" % container_cmd

        super().__init__(
            'psy3.memcompute.com/schema_kafka-oauth:3.3.1.1',
            container_cmd,
            [("2122", "2122"), ("9092-9094", "9092-9094")]
        )

        def ping_fn():
            # https://stackoverflow.com/questions/61226910/how-to-programmatically-check-if-kafka-broker-is-up-and-running-in-python
            self.producer = kafka.KafkaProducer(bootstrap_servers=('%s:9092' % self.ip), acks='all')

        WaitOnlineGenericContainer(ping_fn)

        print("Copying ssl files") # alternative is using ssh
        for src, dest in [
            ("/var/private/ssl/ca-cert", "/tmp/file.ca"),
            ("/var/private/ssl/client_memsql_client.pem", "/tmp/file.cert"),
            ("/var/private/ssl/client_memsql_client.key", "/tmp/file.key")]:

            subprocess.check_call('docker cp %s:%s %s' % (self.container_id, src, dest), shell=True)

Deploying kafka with oauth_kafka = KafkaOauthSSLContainer(keycloak.server_config) results in

Running docker run -d --rm -p 2122:2122 -p 9092-9094:9092-9094  psy3.memcompute.com/schema_kafka-oauth:3.3.1.1 bash -c 'export OAUTHBEARER_CONFIG=ImV5SmpiR2xsYm5SZmFXUWlPaUFpYTJGbWEyRXRZbkp2YTJWeUlpd2dJbU5zYVdWdWRGOXpaV055WlhRaU9pQWlRMGh1VWtoSFpqVkRjbkl4V1VNMGNGVTJaVlZNYXpsc1VFSjFWbVJLWW1FaUxDQWlkRzlyWlc1ZmRYSnNJam9nSW1oMGRIQTZMeTh4TnpJdU1UY3VNQzR5T2pnd09EQXZjbVZoYkcxekwyMWhjM1JsY2k5d2NtOTBiMk52YkM5dmNHVnVhV1F0WTI5dWJtVmpkQzkwYjJ0bGJpSXNJQ0pxZDJ0elgzVnliQ0k2SUNKb2RIUndPaTh2TVRjeUxqRTNMakF1TWpvNE1EZ3dMM0psWVd4dGN5OXRZWE4wWlhJdmNISnZkRzlqYjJ3dmIzQmxibWxrTFdOdmJtNWxZM1F2WTJWeWRITWlMQ0FpWVhWa2FXVnVZMlVpT2lBaVlXTmpiM1Z1ZENKOSI= &&  bash /scripts/start.sh | tee /var/log/script_start.log' 
Container running with id=6cbe714dde0593475ea9c6c2b47f174bd8e10cc71ab800239b3ff422f0d18e39, ip=172.17.0.3

Kafka listeners configs

We will perform a simple test_producer_consumer logic for each of Kafka listeners — PLAINTEXT, SASL_PLAINTEXT, SASL_SSL.

def get_plaintext_config(kafka):
    return {
        'bootstrap.servers': '%s:9092' % kafka.ip,
        'security.protocol': 'PLAINTEXT'
    }


def get_sasl_plaintext_config(kafka, keycloak):
    return {
        'bootstrap.servers': '%s:9093' % kafka.ip,
        'sasl.mechanism': 'OAUTHBEARER',
        'sasl.oauthbearer.method': 'oidc',
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.oauthbearer.client.id': keycloak.client_id,
        'sasl.oauthbearer.client.secret': keycloak.client_secret,
        'sasl.oauthbearer.token.endpoint.url': keycloak.token_url
    }


def get_sasl_ssl_config(kafka, keycloak):
    return {
        'bootstrap.servers': '%s:9094' % kafka.ip,
        'sasl.mechanism': 'OAUTHBEARER',
        'sasl.oauthbearer.method': 'oidc',
        'security.protocol': 'SASL_SSL',
        'sasl.oauthbearer.client.id': keycloak.client_id,
        'sasl.oauthbearer.client.secret': keycloak.client_secret,
        'sasl.oauthbearer.token.endpoint.url': keycloak.token_url,
        'ssl.ca.location': '/tmp/file.ca',
        'ssl.certificate.location': '/tmp/file.cert',
        'ssl.key.location': '/tmp/file.key',
        'ssl.key.password': 'abcdefgh'
    }

Python workflow verification

For the actual workflow verification, we will produce and consume one message using the respective Kafka listener.

def test_producer_consumer(conf):
    proto = conf['security.protocol']
    print('\nTest', proto)

    topic = 'topic_' + proto
    producer = confluent_kafka.Producer(acks='all', **conf)
    produce_msg = 'some %s text' % proto
    producer.produce(topic, produce_msg)
    producer.poll(0)
    print('Waiting for delivery')
    producer.flush()

    consumer_conf = conf.copy()
    consumer_conf.update({
        'group.id':'group_id',
        'auto.offset.reset':'earliest'
    })
    print('Setting up consumer')
    consumer = confluent_kafka.Consumer(consumer_conf)
    consumer.subscribe([topic])
    msg = consumer.poll()
    print('Received: %s' % msg.value())

Finally, let's test each listener.

    test_producer_consumer(get_plaintext_config(oauth_kafka))
    test_producer_consumer(get_sasl_plaintext_config(oauth_kafka, keycloak))
    test_producer_consumer(get_sasl_ssl_config(oauth_kafka, keycloak))

The tail of the output should be as below.

Test PLAINTEXT
Waiting for delivery
Setting up consumer
Received: b'some PLAINTEXT text'

Test SASL_PLAINTEXT
Waiting for delivery
Setting up consumer
Received: b'some SASL_PLAINTEXT text'

Test SASL_SSL
Waiting for delivery
Setting up consumer
Received: b'some SASL_SSL text'

All combined code for this tutorial can be found as test_setup.py on GitHub.

Conclusion

After this tutorial, the reader knows how to run Kafka Oauthbearer setup in Docker with PLAINTEXT, SASL_PLAINTEXT, SASL_SSL listeners that involve using docker-kafka repository, keycloak docker image and Python script that starts docker containers and does simple producer-consumer logic after.

To find more detailed steps on Oauthbearer/OIDC/Keycloak configuration, Kafka CLI/C++ librdkafka connections, please check one of the tutorials mentioned in the Introduction.