X

Best Practices from Oracle Development's A‑Team

Extending Oracle Blockchain Events with OCI - Part 6 (Building a Consumer)

Tamer Qumhieh
Master Principal Technology Evangelist

Introduction

This is Part 6 of the “Extending Oracle Blockchain Events with OCI” series. In previous blog posts I covered:

In this blog series, I’ll be developing a NodeJS consumer to consume events from OCI Streaming Service. As discussed before in Part 1, Oracle OCI Streaming service is mostly API Compatible with Apache Kafka, hence you can use Apache Kafka APIs to consume messages to OCI streaming service.

I'll start by initializing a nodejs app and installing the 'kafkajs' and 'log4js' libraries using npm and requiring them. You then can create a class that will be initialized using a config object that holds the OCI Stream connection details.

class Consumer {

    constructor(config) {
        // Your Username
        this.userName = config.userName;
        // OCI User Specific Auth Token
        this.authToken = config.authToken;
        // Your OCI Tenant Object Storage Namespace you captured in Part 2: Prepare OCI Environment .
        this.objectStorageNamespace = config.objectStorageNamespace;
        // OCI Stream ID
        this.streamPoolOCID = config.streamPoolOCID;
        // “Region Identifier” value from https://docs.cloud.oracle.com/en-us/iaas/Content/General/Concepts/regions.htm#AboutRegionsandAvailabilityDomains that maps to the home_region you captured in Part 2: Prepare OCI Environment .
        this.region = config.region;
        // Oracle Blockchain Platform event name that will be emitted from your chaincode.
        this.eventName = config.eventName;
        // OCI StreamingPool URL
        this.broker = util.format('api.cell-1.%s.streaming.oci.oraclecloud.com:9092', this.region);
    }

    // Start Consumer
    async start() {

    }
}

You need to require the following libraries

const {Kafka} = require('kafkajs');
const util = require("util");
const log4js = require('log4js');
const logger = log4js.getLogger("Consumer");
logger.level = "debug";

Within the start method, you first need to initialize a Kafka client passing it the connection details, then create a consumer and connect it.

// Configure Kafka Client
const kafka = new Kafka({
    clientId: 'obpevent-consumer',
    brokers: [this.broker],
    ssl: true,
    sasl: {
        mechanism: 'plain',
        // userName: objectStorageNamespace/userName/streamPoolOCID
        username: util.format('%s/%s/%s', this.objectStorageNamespace, this.userName, this.streamPoolOCID),
        password: this.authToken
    },
});

// Create a consumer group
const consumer = kafka.consumer({
    groupId: 'obpevent-consumer-group'
});

// Connect consumer
await consumer.connect();
logger.info("Successfully connected to Stream");

Last but not least :) you need to subscribe with your topic and start consuming messages. 

// Subscribe to topic => topic is eventName
await consumer.subscribe({topic : this.eventName,fromBeginning: true});

// Start consuming messages from stream
await consumer.run({
    eachMessage: async ({
                            topic,
                            partition,
                            message
                        }) => {
        logger.info(message.value.toString())
    },
});

Now you call start your consumer by providing the details captured in Part 2: Prepare OCI Environment 

REGION: is the “Region Identifier” value from documentation that maps to the home_region you captured in Part 2: Prepare OCI Environment .

const config = {
    userName : "USER_NAME",
    authToken: "AUTH_TOKEN",
    objectStorageNamespace: "OBJECT_STORAGE_NAMESPACE",
    streamPoolOCID: "STREAM_POOL_OCID",
    region: "REGION",
    eventName: "EVENT_NAME"
};

let consumer = new Consumer(config);

consumer.start().catch(error => {logger.error(error)});

For streamPoolOCID, you can copy its value from OCI console. From main navigation menu/Analytics/Streaming

Now you have all the code in place,  you can start your consumer. As this is the first time you start your consumer, it will consume any existing events that was already pushed into the stream before. And from now  on; any new event that your chaincode will emit; the consumer will pick it up and print it out as below.

Be the first to comment

Comments ( 0 )
Please enter your name.Please provide a valid email address.Please enter a comment.CAPTCHA challenge response provided was incorrect. Please try again.Captcha