Using the Kafka API
Kafka provides a rich set of APIs and clients across a broad range of languages. APIs include core API, Streams API, and Connect API.
- Kafka's core API (Consumer, Producer, and Admin API)
Use to send and receive messages directly from one or more Kafka topics. The Kafka Admin client provides a simple interface through the Kafka API for managing Kafka resources. You can create, delete, and manage topics. You can also use the Admin client to manage consumer groups and configurations. - Streams API
A higher-level stream processing API to easily consume, transform, and produce events between topics. - Connect API
A framework that allows reusable or standard integrations to stream events into and out of external systems, such as databases.
The following table summarizes what you can use with Event Streams:
Enterprise plan | Standard plan | Lite plan | |
---|---|---|---|
Kafka version on cluster | Kafka 3.6 | Kafka 3.6 | Kafka 3.6 |
Minimum recommended Kafka client version | Kafka 2.6.0, or later | Kafka 2.6.0, or later | Kafka 2.6.0, or later |
Supported client versions | See Support summary for all recommended clients | ||
Kafka Connect supported | Yes | Yes | No |
Kafka Streams supported | Yes | Yes | No |
ksqlDB supported | Yes | No | No |
Authentication requirements | Client must support authentication by using the SASL Plain mechanism and use the Server Name Indication (SNI) extension to the TLSv1.2 protocol. | Client must support authentication by using the SASL Plain mechanism and use the Server Name Indication (SNI) extension to the TLSv1.2 protocol. | Client must support authentication by using the SASL Plain mechanism and use the Server Name Indication (SNI) extension to the TLSv1.2 protocol. |
Choosing a Kafka client to use with Event Streams
The official client for the Kafka API is written in Java, and as such contains the latest features and bug fixes. For more information about this API, see Kafka Producer API 3.6 and Kafka Consumer API 3.6.
For other languages, run one of the following clients, all of which are tested with Event Streams.
Support summary for all recommended clients
Client | Language | Recommended version | Minimum version supported [1] | Link to sample |
---|---|---|---|---|
Official Apache Kafka client: | ||||
Apache Kafka client | Java | 3.6.2, or later | 2.5.0 | Java console sample |
Third-party clients: | ||||
node-rdkafka | Node.js | Latest | 2.8.0 | Node.js sample |
confluent-kafka-python | Python | Latest | 1.4.0 | Kafka Python sample |
confluent-kafka-go | Go | Latest | 1.4.0 | |
librdkafka | C or C++ | Latest | 1.4.0 | |
sarama | Go | Latest | 1.26.3 | Sarama examples |
Connecting your client to Event Streams
For information about how to configure your Java client to connect to Event Streams, see Configuring your client.
Configuring your Kafka API client
To establish a connection, clients must be configured to use SASL PLAIN or SASL OAUTHBEARER over TLSv1.2 at a minimum and to require a username, and a list of the bootstrap servers. TLSv1.2 ensures that connections are encrypted and validates the authenticity of the brokers (to prevent man-in-the-middle attacks). SASL enforces authentication on all connections.
To retrieve the username, password, and list of bootstrap servers, a service credentials object, or service key is required for the service instance. For more information about creating these objects, see Connecting to Event Streams.
Using SASL PLAIN
Use the following strings and properties.
- Use the
bootstrap_endpoints
string as the list of bootstrap servers and pass this string of host and port pairs to your Kafka client. - Use the
user
andapi_key
properties as the username and password.
For a Java client, the following example shows the minimum set of properties, where ${USERNAME}
, ${PASSWORD}
, and ${BOOTSTRAP_ENDPOINTS}
are to be replaced by the values that you retrieved previously.
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
If you use a Kafka client earlier than version 0.10.2.1, the sasl.jaas.config
property isn't supported, and you must instead provide the client configuration in a JAAS configuration file.
Using SASL OAUTHBEARER
Before configuring the SASL mechanism for Java client, there are two prerequisites.
- The minimum supported Kafka Java client version is 3.1.0.
- Additional jar package needs to be downloaded from Maven Central and made available in the classpath.
If Maven is used in build system, add the following information to the file pom.xml
in the dependencies section.
<dependency>
<groupId>com.ibm.cloud.eventstreams</groupId>
<artifactId>oauth-client</artifactId>
<version>1.3.1</version>
</dependency>
If Gradle is used in build system, add the following information to the file build.gradle
in the dependencies section.
implementation com.ibm.cloud.eventstreams:oauth-client:1.3.1
Use the following strings and properties.
- Use the
bootstrap_endpoints
string as the list of bootstrap servers and pass this string of host and port pairs to your Kafka client. - Use the
api_key
string as the API key. - The
IAMOAuthBearerLoginCallbackHandler
is provided by the jar packagecom.ibm.cloud.eventstreams:oauth-client:+
. - The IBM Cloud® Identity and Access Management's token endpoint
https://iam.cloud.ibm.com/identity/token
is configured to generate token from the API key by using specified grant type in jaas config. It is done on client side, thus the API key is never sent to the server side and provides better security than a long-lived API key. - The Cloud Identity and Access Management's key endpoint
https://iam.cloud.ibm.com/identity/keys
is configured to validate the token.
For a Java client, the following example shows the minimum set of properties, where ${BOOTSTRAP_ENDPOINTS}
, and ${APIKEY}
are to be replaced by the values that you retrieved previously.
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
The sample code refers to the Event Streams samples.
For other Kafka client libaries, refer to their documentation about how to implement OAUTHBEARER support. For example:.
- sarama: an implementation of
AccessTokenProvider
interface is required. - librdkafka: an implementation of
oauthbearer_token_refresh_cb
callback is required.
For information about how to generate an IBM Cloud IAM token by using an API key, see IBM Cloud® Identity and Access Management's document.
-
The earliest version that was validated in continual testing. Typically, it is the initial version available within the last 12 months, or newer if significant issues are known to exist. If you can't run any of the clients that are listed, you can use other third-party clients that meet the following minimum requirements (for example, librdkafka). 1. Supports Kafka 1.40, or later. 2. Can connect and authenticate by using SASL PLAIN with TLSv1.2. 3. Supports the SNI extensions for TLS where the server's hostname is includes in the TLS handshake. 4. Supports elliptic curve cryptography. In all cases, use the latest version of the client. ↩︎