Extending Oracle Blockchain Events with OCI - Part 3 (Function Producer)

February 19, 2020 | 6 minute read
Tamer Qumhieh
Master Principal Technology Evangelist
Text Size 100%:

Introduction

This is Part 3 of the “Extending Oracle Blockchain Events with OCI” series. In Part 1 I introduced the solution architecture and the different components that will be utilized for extending Oracle Blockchain platform with OCI services. In Part 2 I went through the steps needed to prepare your OCI environment. In this Blog series, we will build the Oracle Functions "Event Producer" and push its corresponding docker image to Oracle OCI Registry "OCIR". We will not deploy the function as that will be automated in the next blog post.

Prerequisites

Docker 17.10.0-ce or later installed and running

fn project  cli installed

Oracle Functions

As explained before, “Oracle Functions” is a fully managed, highly scalable, on-demand, Functions-as-a-Service platform, built on enterprise-grade Oracle Cloud Infrastructure and powered by the Fn Project open source engine. With Functions; you only need to focus on writing code to meet business requirements. You don't have to worry about the underlying infrastructure because Oracle Functions will ensure your app is highly available, scalable, secure, and monitored.

To create a new fn function, you simply use the fn cli to scaffold a new project. fn support many programming languages (Java, Node, Python, GO, C#, Ruby), however we will be using Java.

> fn init obpeventsfunc --runtime java

This will create you the below structure

.
├── func.yaml
├── pom.xml
└── src
    ├── main
    │   └── java
    │       └── com
    │           └── example
    │               └── fn
    │                   └── HelloFunction.java
    └── test
        └── java
            └── com
                └── example
                    └── fn
                        └── HelloFunctionTest.java

11 directories, 4 files

The fn init command creates you sample code, but I usually prefer to delete sample code and create my own. Firstly, delete "obpeventsfunc/src/test" folder, then under "obpeventsfunc/src/main/java" create a new "producer" folder. Now you can delete the "obpeventsfunc/src/main/java/com" folder. Under "obpeventsfunc/src/main/java/producer" create a new Java file "EventProducer.java" your final folder structure should look like this

.
├── func.yaml
├── hello.iml
├── pom.xml
└── src
    └── main
        └── java
            └── producer
                └── EventProducer.java

Finally edit the file "obpeventsfunc/func.yaml" and modify the value of the "cmd" property to point to your newly created java class "cmd: producer.EventProducer::handleRequest".

schema_version: 20180708
name: obpeventsfunc
version: 0.0.1
runtime: java
build_image: fnproject/fn-java-fdk-build:jdk11-1.0.105
run_image: fnproject/fn-java-fdk:jre11-1.0.105
cmd: producer.EventProducer::handleRequest

Now as we have our folder structure in place, let us start by importing the required maven API dependencies into your function pom.xml


    com.oracle.oci.sdk
    oci-java-sdk-keymanagement
    1.12.5


    javax.activation
    javax.activation-api
    1.2.0


    org.apache.kafka
    kafka-clients
    2.4.0


    org.slf4j
    slf4j-simple
    1.7.30

Start by creating the handleRequest method that takes as input parameter an Event object. The Event object is the actual event payload sent by Blockchain Platform:

public String handleRequest(Event event) {

    // OBP sends a test event when initially subscribing with its Event service, in that case, do nothing and return
    if (event.eventType.equalsIgnoreCase("test")) {
        return "done";
    }
    return "done";
}

When Oracle Blockchain platform emits an event; the event generated is in JSON format, however in order to use it in our Java function, we need to create a corresponding Java class(es) that maps to the structure of the JSON payload. FN will do the rest of the marshalling and unmarshalling of data.

Oracle Blockchain Platform Event Payload:

{
  "eventType": "string",
  "sudid": "string",
  "channel": "string",
  "eventMsg": {
    "chaincodeId": "string",
    "txId": "string",
    "eventName": "string",
    "payload": {
      "type": "string",
      "data": "string"
    }
  }
}

In the EventProducer.java create three subclasses that maps to the above JSON object

public static class Event{
     public String eventType;
     public String subid;
     public String channel;
     public EventMsg eventMsg;

     @Override public String toString() {
         return eventMsg.payload.data;
     }
 }

 public static class EventMsg{
     public String chaincodeId;
     public String txId;
     public String eventName;
     public Payload payload;
 }

 public static class Payload{
     public String type;
     public String data;
 }

Now you need to implement the function logic. When the function is triggered and the event is received, you need to connect to OCI Streaming Service and produce a new message. As discussed before in Part 1, Oracle OCI Streaming service is mostly API Compatible with Apache Kafka, hence you can use Apache Kafka APIs to produce messages to OCI streaming service. I'm not going to go into the details of Apache Kafka Producer APIs, but in order to connect you need the stream ID, a username and a password.

Rather than hardcoding these values into the function, we are going to fetch them from the "function configurations". Also as discussed before, the username and password will be encrypted using OCI KMS. For now you will only code the function to use the configurations, however will not create them; function configurations are created at deployment time, which is explained in the next blog post.

So just assume your function code should read a key-value pair of configurations when it executes, and the userName and password are encrypted, hence need to be decrypted using KMS.

To instruct your function to fetch its configurations, create a new config method that is annotated with "@FnConfiguration" and take as an input the fn "RuntimeContext"

@FnConfiguration
public void config(RuntimeContext ctx) {

bootstrapServers = ctx.getConfigurationByKey("BOOT_STRAP_SERVERS").orElse("");
streamOCID = ctx.getConfigurationByKey("STREAM_OCID").orElse("");
tenancyName = ctx.getConfigurationByKey("TENANT_NAME").orElse("");
userName = ctx.getConfigurationByKey("USER_NAME").orElse("");
authToken = ctx.getConfigurationByKey("AUTH_TOKEN").orElse("");
kmsEndpoint = ctx.getConfigurationByKey("KMS_ENDPOINT").orElse("");
kmsKeyId = ctx.getConfigurationByKey("KMS_KEY_ID").orElse("");
}

You beed to define the below global String variables

// OCI StreamingPool URL
private String bootstrapServers;
// OCI Stream ID
private String streamOCID;
// OCI Tenant Name
private String tenancyName;
// Your Username
private String userName;
// OCI User Specific Auth Token
private String authToken;
// OCI KMS Service Encryption/Decryption Endpoint
private String kmsEndpoint;
// OCI KMS "Key" ID that is used for encryption/decryption
private String kmsKeyId;

Next Create the decryptData method that will use KMS to decrypt all cipherText in order for the function to connect to OCI Streams. We will be using OCI KMS Java SDK to decrypt the text

The cipherTexts are already encrypted using KMS which will be configured in the next blog post

private String decryptData(String cipherText){

    // Create OCI Resource Principle Authentication Provider
    AbstractAuthenticationDetailsProvider provider = ResourcePrincipalAuthenticationDetailsProvider.builder().build();

    // Create OCI KMS Client
    KmsCryptoClient cryptoClient = KmsCryptoClient.builder().endpoint(kmsEndpoint).build(provider);

    // Decrypt cipherText
    DecryptDataDetails decryptDataDetails = DecryptDataDetails.builder().keyId(kmsKeyId).ciphertext(cipherText).build();
    DecryptRequest decryptRequest = DecryptRequest.builder().decryptDataDetails(decryptDataDetails).build();
    DecryptResponse decryptResponse = cryptoClient.decrypt(decryptRequest);

    // Get decrypted text, the result is base64 encoded string
    final String bas64String = decryptResponse.getDecryptedData().getPlaintext();

    // Decode from base64 to String
    byte[] byteArray = Base64.decodeBase64(bas64String.getBytes());
    String value = new String(byteArray);

    // Return final result
    return value;
}

Finally connect to OCI Streaming Service to publish the received event:

public String handleRequest(Event event) {

    // OBP sends a test event when initially subscribing with its Event service, in that case, do nothing and return
    if(event.eventType.equalsIgnoreCase("test"))
    {
        return "done";
    }

    // Dynamically create a topic based on OBP event Name
    String topic = event.eventMsg.eventName;
    // Prepare Kafka authentication credentials, all values are passed in as Function Configuration values.
    // Keep in mind userName and password are encrypted using KMS, hence they need to be decrypted before use
    String saslJassConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s/%s/%s\" password=\"%s\";" , tenancyName,decryptData(userName),streamOCID,decryptData(authToken));

    // Set Kafka settings
    Properties props = new Properties();
    props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootstrapServers);
    props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
    props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName());
    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
    props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJassConfig);
    props.put(ProducerConfig.RETRIES_CONFIG, 5); // retries on transient errors and load balancing disconnection
    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024); // limit request size to 1MB

    // Create Kafka Producer
    KafkaProducerproducer = new KafkaProducer(props);
    ProducerRecord record = new ProducerRecord(topic , event.toString());

    // Send Kafka message
    producer.send(record);

    // Flush and close
    producer.flush();
    producer.close();



    return "done";

}

These are the list of java libraries you need to import

import com.fnproject.fn.api.FnConfiguration;
import com.fnproject.fn.api.RuntimeContext;
import com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider;
import com.oracle.bmc.auth.ResourcePrincipalAuthenticationDetailsProvider;
import com.oracle.bmc.keymanagement.KmsCryptoClient;
import com.oracle.bmc.keymanagement.model.DecryptDataDetails;
import com.oracle.bmc.keymanagement.requests.DecryptRequest;
import com.oracle.bmc.keymanagement.responses.DecryptResponse;
import org.apache.commons.codec.binary.Base64;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

Now you have your function ready, the next step is to build the function and push its corresponding docker image to OCIR. To build the function, make sure docker is started, and then from the root of the "obpeventsfunc" folder execute the following command from a terminal

obpeventsfunc> fn build

After the command finishes, you can check docker images to ensure it is created

obpeventsfunc> docker images
REPOSITORY                           TAG                 IMAGE ID            CREATED             SIZE
obpeventsfunc                        0.0.1               3ec577e58d1e        4 minutes ago       248MB

Push Image to OCIR

In order to push the image to Oracle OCIR, you need to tag the image with the format:
REGION_KEY.ocir.io/OBJECT_STORAGE_NAMESPACE/obpeventsfunc:0.0.1

REGION_KEY: Copy the “Region Key” value from documentation that maps to the home_region you captured before in Part 2.

OBJECT_STORAGE_NAMESAPCE: is the value you captured before in Part 2.

docker tag obpeventsfunc:0.0.1 REGION_KEY.ocir.io/NAMESPACE/obpeventsfunc:0.0.1

As your image is ready now to be pushed to OCIR, you first need to login to OCIR:

> docker login REGION_KEY.ocir.io

Docker will ask you for a username and password: 

  • Username: NAMESPACE/YOUR_USER_NAME
  • Password: is the AUTH_TOKEN you captured before in Part 2.

If all values are correct you should see a “Login Succeeded” Prompt in your terminal.
Finally, you need to push image to OCIR

> docker push REGION_KEY.ocir.io/NAMESPACE/obpeventsfunc:0.0.1

If command succeeds you can check the pushed image from console.

Next Step

Part 4: Provisioning Your Infrastructure

Tamer Qumhieh

Master Principal Technology Evangelist

Tamer Qumhieh, A technology evangelist with more than 15 years of technology and IT skillsets specialized in Blockchain, mobile, AppDev, AI Chatbots and Machine Learning. At Oracle A-Team, Tamer works closely with engineers, product management, customers and partners to ensure smooth and proper adoption of cutting-edge technologies in the market. 


Previous Post

Extending Oracle Blockchain Events with OCI - Part 2 (Prepare OCI Environment)

Tamer Qumhieh | 6 min read

Next Post


ISV Architecture Validation - Implementation Details - Part 3B - OCI vRouter Instances

Tal Altman | 8 min read