Connecting Event Streams to IBM Cloud Object Storage by using the Kubernetes Service
Get the Kafka Connect runtime to run in an Kubernetes Service cluster. Then, start the IBM Cloud® Object Storage Sink Connector to archive data from Kafka topics in Event Streams to an instance of the IBM Cloud® Object Storage service.
The Connector consumes batches of messages from Kafka and uploads the message data as objects to a bucket in the Cloud Object Storage service.
Step 1. Install the prerequisites
Ensure you have the following software and services installed:
-
An Event Streams instance - Standard or Enterprise plan. You need to create credentials.
-
An instance of the Cloud Object Storage service with at least one bucket.
-
An IBM Cloud® Kubernetes Service cluster. You can provision a free one for testing purposes.
You also need CLI access to your cluster. For more information, see Setting up the CLI and API.
-
A recent version of kubectl.
Step 2. Clone the kafka-connect repositories
Clone the following two repositories that contain the required files:
Step 3. Create your Kafka Connect configuration
-
You must set up this configuration only once. Event Streams stores it for future use.
From the event-streams-samples project, navigate to the
kafka-connect/IKS directory
, edit theconnect-distributed.properties
file, and replace<BOOTSTRAP_SERVERS>
in one place and<APIKEY>
in three places with your Event Streams credentials.Provide
<BOOTSTRAP_SERVERS>
as a comma-separated list. If they are not valid, you get an error.Your
<APIKEY>
appears in clear text on your machine but is secret when pushed to IBM Cloud® Kubernetes Service.Kafka Connect can run multiple workers for reliability and scalability reasons. If your Kubernetes Service cluster has more than one node and you want multiple Connect workers, edit the
kafka-connect.yaml
file and edit the entryreplicas: 1
. -
Then, run the following commands:
Run the following command to create a secret:
kubectl create secret generic connect-distributed-config --from-file=connect-distributed.properties
Run the following command to create a configmap:
kubectl create configmap connect-log4j-config --from-file=connect-log4j.properties
Step 4. Deploy Kafka Connect
Apply the configuration in the kafka-connect.yaml
file by running the following command:
kubectl apply -f ./kafka-connect.yaml
Step 5. Validate Kafka Connect is running
To validate that Kafka Connect is running, port forward to the kafkaconnect-service on port 8083, as in the following example:
kubectl port-forward service/kafkaconnect-service 8083
Keep the terminal that you used for port forwarding open, and use another terminal for the next steps.
The Connect REST API is then available at http://localhost:8083
. If you want more information about the API, see
Kafka Connect REST Interface.
So, you now have the Kafka Connect runtime that is deployed and running in Kubernetes Service. Next, configure, and start the Object Storage connector.
Step 6. Configure the cos-sink JSON file
Edit the cos-sink.json
file located in kafka-connect-ibmcos-sink/config/
so that at a minimum your required properties are completed with your information. Although the configuration properties cos.object.deadline.seconds,
cos.interval.seconds, and cos.object.records are listed as optional, you must set at least one of these properties to a nondefault value.
cos-sink.json file properties
Replace the placeholders in the cos-sink.json
file with your own values.
File properties | Description |
---|---|
cos.api.key | Required. API key used to connect to the Cloud Object Storage service instance. |
cos.bucket.location | Required. Location of the Cloud Object Storage service bucket. For example, for a regional bucket eu-gb , or for a global bucket eu . |
cos.bucket.name | Required. Name of the Cloud Object Storage service bucket to write data into. |
cos.bucket.resiliency | Required. Resiliency of the Cloud Object Storage bucket. Must be one of: cross-region, regional, or single-site. |
cos.service.crn | Required. CRN for the Cloud Object Storage service instance. Ensure you enter the correct CRN: it is the resource instance ID ending with double colons, for example, crn:v1:staging:public:cloud-object-storage:global:a/8c226dc8c8bfb9bc3431515a16957954:b25fe12c-9cf5-4ee8-8285-2c7e6ae707f6:: . |
cos.endpoint.visibility | Optional. Specify public to connect to the Cloud Object Storage service over the public internet. Specify private to connect from a connector that runs inside the IBM Cloud network, for example, from an IBM Cloud Kubernetes Service cluster. The default is public. |
cos.object.deadline.seconds | Optional. The number of seconds (as measured wall clock time for the Connect Task instance) between reading the first record from Kafka, and writing all of the records read so far into a Cloud Object Storage object. This property can be useful in situations that have long pauses between Kafka records being produced to a topic. It ensures that any records that are received by this connector are always written into Object Storage within the specified time. |
cos.object.interval.seconds | Optional. The number of seconds (as measured by the timestamps in Kafka records) between reading the first record from Kafka, and writing all of the records read so far into a Cloud Object Storage object. |
cos.object.records | Optional. The maximum number of Kafka records to combine into an object. |
Get Object Storage credentials that use the IBM Cloud console
- Locate your Object Storage service on the dashboard.
- Click your service tile.
- Click Service Credentials.
- Click New Credential.
- Complete the details for your new credential like a name and role and click Add. A new credential appears in the credentials list.
- Click this credential by using View Credentials to reveal the details in JSON format.
Step 7. Start the connector with its configuration
Run the following command to start the Object Storage connector with the configuration that you provided in the previous step.
curl -X POST -H "Content-Type: application/json" http://localhost:8083/connectors --data "@./cos-sink.json"
Step 8. Monitor your connector
You can check your connector by going to the following location.
http://localhost:8083/connectors/cos-sink/status
If the state of the connector is not running, restart the connector.
Step 9. Delete your connector
You can use the following command to delete a connector.
curl -X DELETE
http://localhost:8083/connectors/cos-sink