You can read the initial Kafka Oauthbearer OIDC Keycloak setup tutorial to have a better understanding of the concepts mentioned here and dependencies(librdkafka, confluent_kafka) installation, but in this tutorial, the reader will learn how to connect Kafka clients to the Azure Event Hub and use Azure AD(Active Directory) for Kafka Oauth Bearer with/without OpenID Connect(OIDC) setup.
Table of Contents
- Introduction
- Azure Event Hubs
- Connecting with Python confluent_kafka
- Azure AD(Active Directory)
- Python confluent_kafka OIDC connection
- Python confluent_kafka Oauth connection
- C++ librdkafka OIDC connection
- C++ librdkafka Oauth connection
- Kafka CLI OpenID Connect(OIDC) connection
- Conclusion
Introduction
Azure Event Hubs is a big data streaming platform and event ingestion service. It also provides an Apache Kafka endpoint on an event hub, which enables users to connect to the event hub using the Kafka protocol. Event Hubs integrates with Azure Active Directory (Azure AD), which provides an OAuth 2.0 compliant centralized authorization server.
Finally, it means that one can use Kafka clients with Azure Event Hubs(aka Kafka brokers) and Azure AD(aka token provider) by specifying SASL_SSL for the protocol and OAUTHBEARER for the mechanism.
We will do the Azure Event Hubs and Azure AD setup, followed by connecting to Kafka endpoints with Kafka CLI, C++ librdkafka, and Python confluent_kafka.
Azure Event Hubs
Following the official Azure tutorial at Create an event hub using Azure portal, we create a resource group, a namespace and an event hub. Make sure you use Standrad pricing tier for the Event Hubs namespace since Kafka protocol is supported for Standard, Premium and Dedicated SKU only.

Let's quickly verify the Azure Event Hubs setup works with SAS (Shared Access Signature) first — without OAUTHBEARER.
Connecting with Python confluent_kafka
For simplicity, we will use Python and confluent_kafka. Modify the producer config and connect with Python using confluent_kafka and official Azure GitHub repo example.
conf = {
'bootstrap.servers': 'advancedEhub.servicebus.windows.net:9093',
'security.protocol': 'SASL_SSL',
'ssl.ca.location': '/etc/ssl/certs/ca-certificates.crt',
'sasl.mechanism': 'PLAIN',
'sasl.username': '$ConnectionString',
'sasl.password': 'Endpoint=sb://advancedehub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=ahn9mS62O6DmsqEDPNfy+8bPPDioRhcES+AEhAfkOns=',
'client.id': 'python-example-producer'
}sasl.password can be obtained from "Connection string-primary key" property in the Shared access policies tab.

python producer.py tryhub
% Waiting for 3 deliveries
% Message delivered to tryhub [0] @ 0
% Message delivered to tryhub [0] @ 1
% Message delivered to tryhub [0] @ 2Azure AD(Active Directory)
Now, we will secure a Kafka connection with Oauth Bearer using Azure AD. Navigate to Azure AD -> App registrations -> New registration to create a client we will use with OAUTHBEARER. Specify name only — all the other settings are default.

In order to use Kafka OAUTHBEARER client with Azure AD, we will need four properties — token_endpoint_url, client_id, client_secret, scope.
client_id is listed directly as "Application (client) ID" on the screenshot below. By default, Azure AD uses scope=https://advancedEhub.servicebus.windows.net/.default. The correct scope value is important.

For the client_secret, navigate to the "Client credentials" tab, highlighted on the screenshot above and generate a client secret there.

token_endpoint_url can be viewed at the endpoints tab of the registered app — "OAuth 2.0 token endpoint (v2)"

Let's validate we can retrieve OAUTHBEARER token with Postman.

That's not all — we also need to allow the s2KafkaTest registered app to access our event hub. The scope of the access can be different — I will add the subscription based role "Azure Event Hubs Owner". Read more at Authenticate an application with Azure Active Directory to access Event Hubs resources.

Python confluent_kafka OIDC connection
Assuming the library is installed, we will modify the producer example from azure-event-hubs-for-kafka.
conf = {
'bootstrap.servers': '%s:9093' % namespace,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'sasl.oauthbearer.method': 'oidc',
'sasl.oauthbearer.client.id': '52c9cfa5-a2a0-4477-9d28-34cacb5b2207',
'sasl.oauthbearer.client.secret': 'v0O8Q~W9kmhh4eir5jcf9_lVXG8v8FD~KyatvbN9',
'sasl.oauthbearer.token.endpoint.url': 'https://login.microsoftonline.com/48d627e4-e0f7-451f-b57d-f2f0cd7fbf09/oauth2/v2.0/token',
'sasl.oauthbearer.scope':'https://%s/.default' % namespace,
}
python producer.py advancedEhub.servicebus.windows.net tryhub
% Waiting for 2 deliveries
% Message delivered to tryhub [0] @ 3
% Message delivered to tryhub [0] @ 4Then we can verify with a CLI consumer or Azure UI that data is indeed produced.

Python confluent_kafka Oauth connection
librdkafka allows providing custom oauthbearer_refresh_cb as mentioned in the configuration. The callback implementation idea is taken from the confluent_kafka oauth producer example.
For the raw implementation, we will perform a post request as we do with Postman with the client_credentials grant type.
def oauth_cb(namespace, config):
payload = {
'grant_type': 'client_credentials',
'scope': 'https://%s/.default' % namespace
}
resp = requests.post("https://login.microsoftonline.com/48d627e4-e0f7-451f-b57d-f2f0cd7fbf09/oauth2/v2.0/token",
auth=(
"52c9cfa5-a2a0-4477-9d28-34cacb5b2207",
"v0O8Q~W9kmhh4eir5jcf9_lVXG8v8FD~KyatvbN9"
),
data=payload
)
token = resp.json()
# If JWT is RFC9086 compilant, one should take exp from the token claim - https://auth0.com/docs/secure/tokens/json-web-tokens/json-web-token-claims
return token['access_token'], time.time() + float(token['expires_in'])Then we need to specify 'oauth_cb': partial(oauth_cb, namespace) in the configuration.
conf = {
'bootstrap.servers': '%s:9093' % namespace,
'security.protocol': 'SASL_SSL',
'sasl.mechanism': 'OAUTHBEARER',
'oauth_cb': partial(oauth_cb, namespace),
}Another option is using Azure library for oauth_cb as 'oauth_cb': partial(oauth_cb, cred, namespace), where cred = azure.identity.DefaultAzureCredential(). cred instance uses env variables CLIENT_ID, CLIENT_SECRET, TENANT_ID.
def oauth_cb(cred, namespace, config):
access_token = cred.get_token('https://%s/.default' % namespace)
return access_token.token, access_token.expires_onC++ librdkafka OIDC connection
We gonna modify producer config from the official librdkafka producer example.
conf->set("sasl.oauthbearer.client.id", "52c9cfa5-a2a0-4477-9d28-34cacb5b2207", errstr);
conf->set("sasl.oauthbearer.client.secret", "v0O8Q~W9kmhh4eir5jcf9_lVXG8v8FD~KyatvbN9", errstr);
conf->set("sasl.oauthbearer.token.endpoint.url", "https://login.microsoftonline.com/48d627e4-e0f7-451f-b57d-f2f0cd7fbf09/oauth2/v2.0/token", errstr);
conf->set("sasl.oauthbearer.scope", "https://advancedEhub.servicebus.windows.net/.default", errstr);Compiling with g++ producer.cpp -lrdkafka++ — std=c++17
C++ librdkafka Oauth connection
Without OIDC, one has to specify the oauthbearer_token_refresh_cb, so let's create a class for that.
The actual post request will be made with cpr and nlohmann/json libraries to simulate Python-like simplicity.
class AzureOAuthBearerTokenRefreshCb : public RdKafka::OAuthBearerTokenRefreshCb
{
public:
void oauthbearer_token_refresh_cb(RdKafka::Handle *handle, const std::string &oauthbearer_config) override
{
std::string token;
time_t expires_time_ms;
std::string errstr;
cpr::Response r = cpr::Post(
cpr::Url{"https://login.microsoftonline.com/48d627e4-e0f7-451f-b57d-f2f0cd7fbf09/oauth2/v2.0/token"},
cpr::Authentication{"52c9cfa5-a2a0-4477-9d28-34cacb5b2207", "v0O8Q~W9kmhh4eir5jcf9_lVXG8v8FD~KyatvbN9", cpr::AuthMode::BASIC},
cpr::Payload{{"grant_type", "client_credentials"}, {"scope", "https://advancedEhub.servicebus.windows.net/.default"}});
// std::cout << "My log response: " << r.status_code << " " << r.text << "\n";
json j = json::parse(r.text);
token = j["access_token"].get<std::string>();
expires_time_ms = (time(nullptr) + j["expires_in"].get<time_t>()) * 1000;
// If JWT is RFC9086 compilant, one should take exp*1000 from the token - https://auth0.com/docs/secure/tokens/json-web-tokens/json-web-token-claims
// std::cout << "Token: " << token << "\nExpiration time: " << expires_time_ms << "\n";
auto err_code = handle->oauthbearer_set_token(token, expires_time_ms, "", {}, errstr);
if (err_code != RdKafka::ErrorCode::ERR_NO_ERROR)
{
std::cerr << "Failed setting token: " << err_code << " " << errstr << "\n";
err_code = handle->oauthbearer_set_token_failure(errstr);
if (err_code != RdKafka::ErrorCode::ERR_NO_ERROR)
{
std::cout << "Failed to set token error - panicking...\n";
exit(1);
}
}
}
};Then we set the callback for oauthbearer_token_refresh_cb property and remove previously mentioned oidc configuration.
AzureOAuthBearerTokenRefreshCb azure_refresh_cb;
if (conf->set("oauthbearer_token_refresh_cb", &azure_refresh_cb, errstr) != RdKafka::Conf::CONF_OK)
{
std::cerr << errstr << std::endl;
exit(1);
}Compiling with g++ producer.cpp -lrdkafka++ — std=c++17 -lcpr
Kafka CLI OpenID Connect(OIDC) connection
For both Kafka consumer and producer, the config changes for OAUTHBEARER will be the same.
security.protocol=SASL_SSL
sasl.oauthbearer.token.endpoint.url=https://login.microsoftonline.com/48d627e4-e0f7-451f-b57d-f2f0cd7fbf09/oauth2/v2.0/token
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerLoginCallbackHandler
sasl.mechanism=OAUTHBEARER
sasl.jaas.config= \
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
clientId='52c9cfa5-a2a0-4477-9d28-34cacb5b2207' \
scope='https://advancedEhub.servicebus.windows.net/.default' \
clientSecret='v0O8Q~W9kmhh4eir5jcf9_lVXG8v8FD~KyatvbN9';Then we verify the setup by starting both producer and consumer:
- bin/kafka-console-producer.sh — producer.config config/producer_azure.properties — topic tryhub — bootstrap-server advancedEhub.servicebus.windows.net:9093
- bin/kafka-console-consumer.sh — consumer.config config/consumer_azure.properties — topic tryhub — bootstrap-server advancedEhub.servicebus.windows.net:9093 — from-beginning
Conclusion
I expect a reader to start this tutorial after reading my first post — Kafka Oauth Bearer setup with OpenID Connect(OIDC) and Keycloak, as it intentionally omits to repeat the details mentioned there.
After reading this tutorial, a reader should be able to do the Azure Event Hubs and Azure Active Directory(AD) configuration in the context of the SASL_SSL OAUTHBEARER Kafka cluster. Then, one should be capable of connecting to it with CLI, C++, Python using OIDC or custom callback implementation.
In the next tutorial, you will learn how to run the entire Kafka OAUTHBEARER setup in the docker with multiple listeners(including SASL_SSL).