I had difficulties setting up a Kafka cluster to use SASL_PLAINTEXT/SASL_SSL OAUTHBEARER optionally with OpenID Connect(OIDC) with token providers like Azure AD, Keycloak, Ory Hydra optionally in the docker and further connecting to it with Python, C++. Moreover, I didn't find web pages addressing that issue with easy-to-use instructions. Hereby, in this post and the following ones, I will show readers how to do that.

Table of Contents

Introduction

KIP-255 introduced OAuth Authentication via SASL/OAUTHBEARER, and KIP-768 added default implementation for Login/Validator Callback Handlers(aka OIDC) that use client_credentials Oauth2.0 grant type.

In this tutorial, we will use Kafka OAUTHBEARER OIDC SASL_PLAINTEXT setup with Keycloak as a token provider/validator. To start, download Keycloak 20.0.3 and Kafka 3.3.1. Agenda:

  1. Configuring Keycloak
  2. Kafka broker setup
  3. Connecting from Kafka CLI producer, consumer
  4. Connecting from Python confluent_kafka, C++ librdkafka

Keycloak setup

If you want to set up Keycloak using CLI, I suggest creating the client with UI first and then exporting its config, which can be later passed as an argument to kcadm.sh tool.

  1. Start Keycloak server with bin/kc.sh start-dev.
  2. Create an Administrator CLI user on the first Keycloak login at http://localhost:8080.
  3. Log in to the administration console with the user created at step 2 and create a client_credentials client on the master realm. For simplicity, we have one client for Kafka broker, producer, consumer.
None
None

4. The above-mentioned steps are enough for Keycloak setup, because it uses account audience claim by default — however, when using other token providers, we might need to do the additional step, which is configuring audience claim.

The default validator callback logic uses Java JWT consumer, which expects the audience claim in the token to be non-empty. Otherwise, JWT token validation might fail as below. In particular, you will face these issues when using Ory Hydra.

org.apache.kafka.common.security.oauthbearer.secured.ValidateException: Could not validate the access token: JWT (claims->{"exp":1677440955,"iat":1677440895,"jti":"a679e025–744f-4049-a926–6657e2d36767","iss":"http://0.0.0.0:8080/realms/master","aud":"account","sub":"863424ee-bdcd-455b-a34c-cdf0f15849c5","typ":"Bearer","azp":"kafka-broker","acr":"1","realm_access":{"roles":["default-roles-master","offline_access","uma_authorization"]},"resource_access":{"account":{"roles":["manage-account","manage-account-links","view-profile"]}},"scope":"profile email","clientId":"kafka-broker","email_verified":false,"clientHost":"192.168.1.34","preferred_username":"service-account-kafka-broker","clientAddress":"192.168.1.34"}) rejected due to invalid claims or other invalid content. Additional details: [[8] Audience (aud) claim [account] doesn't contain an acceptable identifier. Expected account2 as an aud value.

5. Validate that token request logic works using POSTMAN — you can also set client_id, client_secret in the Authorization tab for Basic Auth type.

None

Kafka broker setup

Modify Kafka config to add SASL_PLAINTEXT OAUTHBEARER listener. One can found supported configuration options in Apache Kafka docs, and the explanation behind the setup — at the confluent page Configuring OAUTHBEARER.

listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.enabled.mechanisms=OAUTHBEARER
sasl.mechanism.inter.broker.protocol=OAUTHBEARER

listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.oauthbearer.token.endpoint.url=http://0.0.0.0:8080/realms/master/protocol/openid-connect/token
listener.name.sasl_plaintext.oauthbearer.sasl.oauthbearer.expected.audience=account
sasl.oauthbearer.jwks.endpoint.url=http://0.0.0.0:8080/realms/master/protocol/openid-connect/certs
listener.name.sasl_plaintext.oauthbearer.sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
 clientId='kafka-broker' \
 clientSecret='mTCgOCh39yrevYCf60yIuo7WH1heozWN' \
 unsecuredLoginStringClaim_sub='sub';

Let's start Kafka broker and see if it works — bin/kafka-server-start.sh config/server_keycloak.properties.

Connect from Kafka CLI

For both Kafka consumer and producer, the config changes for OAUTHBEARER will be the same.

security.protocol=SASL_PLAINTEXT
sasl.oauthbearer.token.endpoint.url=http://0.0.0.0:8080/realms/master/protocol/openid-connect/token
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.mechanism=OAUTHBEARER

sasl.jaas.config= \
  org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    clientId='kafka-broker' \
    clientSecret='mTCgOCh39yrevYCf60yIuo7WH1heozWN';

Start Kafka producer with bin/kafka-console-producer.sh — producer.config config/producer_keycloak.properties — topic tryhub — bootstrap-server localhost:9092.

Produce few messages and consume later…

Start Kafka consumer with bin/kafka-console-consumer.sh — consumer.config config/consumer_keycloak.properties — topic tryhub — bootstrap-server localhost:9092 — from-beginning to verify that setup works.

Connect from C++ librdkafka

Firstly, download, build and install librdkafka with the instructions mentioned at https://github.com/confluentinc/librdkafka#building. In this post, we will use librdkafka 1.9.2.

Make sure to download the needed OAuth bearer OIDC dependencies — otherwise, the OIDC option will be disabled in the build. I suggest building librdkafka from source code rather than installing it as a package because you will be able to see what library versions/config options are used/included in the final build. The important dependencies are on the screenshot below.

None

Double-check you have #define WITH_OAUTHBEARER_OIDC 1 in config.h file after running ./configure

We will modify the existing producer example from librdkafka repo.

    conf->set("bootstrap.servers", "localhost:9092", errstr);
    conf->set("security.protocol", "SASL_PLAINTEXT", errstr);
    conf->set("sasl.mechanism", "OAUTHBEARER", errstr);
    conf->set("sasl.oauthbearer.method", "oidc", errstr);
    conf->set("sasl.oauthbearer.client.id", "kafka-broker", errstr);
    conf->set("sasl.oauthbearer.client.secret", "mTCgOCh39yrevYCf60yIuo7WH1heozWN", errstr);
    conf->set("sasl.oauthbearer.token.endpoint.url", "http://0.0.0.0:8080/realms/master/protocol/openid-connect/token", errstr);

Then build and test with g++ producer.cpp -lrdkafka++ -std=c++17 && ./a.out.

Connect from Python confluent_kafka

We can either build confluent_kafka from sources using librdkafka produced at the step above with pip install — no-binary=:all: confluent-kafka==1.9.2 or use a wheel with librdkafka C library packaged in case it's available for your distribution — pip install confluent-kafka==1.9.2.

The producer example can be taken from confluent_kafka examples or azure-event-hubs-for-kafka examples. After that, we modify the config to use Keycloak endpoints with OIDC.

    conf = {
        'bootstrap.servers': 'localhost:9092',
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.mechanism': 'OAUTHBEARER',
        'sasl.oauthbearer.method': 'oidc',
        'sasl.oauthbearer.client.id': 'kafka-broker',
        'sasl.oauthbearer.client.secret': 'mTCgOCh39yrevYCf60yIuo7WH1heozWN',
        'sasl.oauthbearer.token.endpoint.url': 'http://0.0.0.0:8080/realms/master/protocol/openid-connect/token'
    }

Conclusion

The idea of this tutorial was to provide exact steps for quick and easy configuration of the Kafka cluster with Oauth Bearer OpenID Connect. We used Keycloak as a token provider and deployed the Kafka cluster with the SASL_PLAINTEXT listener.

In addition to CLI communication, I also highlighted the main points of connecting with C++/Python, which might be more practical.

In the next tutorial, you will learn how to use Kafka Oauth Bearer with Azure Event Hub(Kafka endpoints) and Azure Active Directory(token provider).