This is Part 3 of the “Extending Oracle Blockchain Events with OCI” series. In Part 1 I introduced the solution architecture and the different components that will be utilized for extending Oracle Blockchain platform with OCI services. In Part 2 I went through the steps needed to prepare your OCI environment. In this Blog series, we will build the Oracle Functions "Event Producer" and push its corresponding docker image to Oracle OCI Registry "OCIR". We will not deploy the function as that will be automated in the next blog post.
Docker 17.10.0-ce or later installed and running
As explained before, “Oracle Functions” is a fully managed, highly scalable, on-demand, Functions-as-a-Service platform, built on enterprise-grade Oracle Cloud Infrastructure and powered by the Fn Project open source engine. With Functions; you only need to focus on writing code to meet business requirements. You don't have to worry about the underlying infrastructure because Oracle Functions will ensure your app is highly available, scalable, secure, and monitored.
To create a new fn function, you simply use the fn cli to scaffold a new project. fn support many programming languages (Java, Node, Python, GO, C#, Ruby), however we will be using Java.
> fn init obpeventsfunc --runtime java
This will create you the below structure
. ├── func.yaml ├── pom.xml └── src ├── main │ └── java │ └── com │ └── example │ └── fn │ └── HelloFunction.java └── test └── java └── com └── example └── fn └── HelloFunctionTest.java 11 directories, 4 files
The fn init command creates you sample code, but I usually prefer to delete sample code and create my own. Firstly, delete "obpeventsfunc/src/test" folder, then under "obpeventsfunc/src/main/java" create a new "producer" folder. Now you can delete the "obpeventsfunc/src/main/java/com" folder. Under "obpeventsfunc/src/main/java/producer" create a new Java file "EventProducer.java" your final folder structure should look like this
. ├── func.yaml ├── hello.iml ├── pom.xml └── src └── main └── java └── producer └── EventProducer.java
Finally edit the file "obpeventsfunc/func.yaml" and modify the value of the "cmd" property to point to your newly created java class "cmd: producer.EventProducer::handleRequest".
schema_version: 20180708 name: obpeventsfunc version: 0.0.1 runtime: java build_image: fnproject/fn-java-fdk-build:jdk11-1.0.105 run_image: fnproject/fn-java-fdk:jre11-1.0.105 cmd: producer.EventProducer::handleRequest
Now as we have our folder structure in place, let us start by importing the required maven API dependencies into your function pom.xml
com.oracle.oci.sdk oci-java-sdk-keymanagement 1.12.5 javax.activation javax.activation-api 1.2.0 org.apache.kafka kafka-clients 2.4.0 org.slf4j slf4j-simple 1.7.30
Start by creating the handleRequest method that takes as input parameter an Event object. The Event object is the actual event payload sent by Blockchain Platform:
public String handleRequest(Event event) { // OBP sends a test event when initially subscribing with its Event service, in that case, do nothing and return if (event.eventType.equalsIgnoreCase("test")) { return "done"; } return "done"; }
When Oracle Blockchain platform emits an event; the event generated is in JSON format, however in order to use it in our Java function, we need to create a corresponding Java class(es) that maps to the structure of the JSON payload. FN will do the rest of the marshalling and unmarshalling of data.
Oracle Blockchain Platform Event Payload:
{ "eventType": "string", "sudid": "string", "channel": "string", "eventMsg": { "chaincodeId": "string", "txId": "string", "eventName": "string", "payload": { "type": "string", "data": "string" } } }
In the EventProducer.java create three subclasses that maps to the above JSON object
public static class Event{ public String eventType; public String subid; public String channel; public EventMsg eventMsg; @Override public String toString() { return eventMsg.payload.data; } } public static class EventMsg{ public String chaincodeId; public String txId; public String eventName; public Payload payload; } public static class Payload{ public String type; public String data; }
Now you need to implement the function logic. When the function is triggered and the event is received, you need to connect to OCI Streaming Service and produce a new message. As discussed before in Part 1, Oracle OCI Streaming service is mostly API Compatible with Apache Kafka, hence you can use Apache Kafka APIs to produce messages to OCI streaming service. I'm not going to go into the details of Apache Kafka Producer APIs, but in order to connect you need the stream ID, a username and a password.
Rather than hardcoding these values into the function, we are going to fetch them from the "function configurations". Also as discussed before, the username and password will be encrypted using OCI KMS. For now you will only code the function to use the configurations, however will not create them; function configurations are created at deployment time, which is explained in the next blog post.
So just assume your function code should read a key-value pair of configurations when it executes, and the userName and password are encrypted, hence need to be decrypted using KMS.
To instruct your function to fetch its configurations, create a new config method that is annotated with "@FnConfiguration" and take as an input the fn "RuntimeContext"
@FnConfiguration public void config(RuntimeContext ctx) { bootstrapServers = ctx.getConfigurationByKey("BOOT_STRAP_SERVERS").orElse(""); streamOCID = ctx.getConfigurationByKey("STREAM_OCID").orElse(""); tenancyName = ctx.getConfigurationByKey("TENANT_NAME").orElse(""); userName = ctx.getConfigurationByKey("USER_NAME").orElse(""); authToken = ctx.getConfigurationByKey("AUTH_TOKEN").orElse(""); kmsEndpoint = ctx.getConfigurationByKey("KMS_ENDPOINT").orElse(""); kmsKeyId = ctx.getConfigurationByKey("KMS_KEY_ID").orElse(""); }
You beed to define the below global String variables
// OCI StreamingPool URL private String bootstrapServers; // OCI Stream ID private String streamOCID; // OCI Tenant Name private String tenancyName; // Your Username private String userName; // OCI User Specific Auth Token private String authToken; // OCI KMS Service Encryption/Decryption Endpoint private String kmsEndpoint; // OCI KMS "Key" ID that is used for encryption/decryption private String kmsKeyId;
Next Create the decryptData method that will use KMS to decrypt all cipherText in order for the function to connect to OCI Streams. We will be using OCI KMS Java SDK to decrypt the text
The cipherTexts are already encrypted using KMS which will be configured in the next blog post
private String decryptData(String cipherText){ // Create OCI Resource Principle Authentication Provider AbstractAuthenticationDetailsProvider provider = ResourcePrincipalAuthenticationDetailsProvider.builder().build(); // Create OCI KMS Client KmsCryptoClient cryptoClient = KmsCryptoClient.builder().endpoint(kmsEndpoint).build(provider); // Decrypt cipherText DecryptDataDetails decryptDataDetails = DecryptDataDetails.builder().keyId(kmsKeyId).ciphertext(cipherText).build(); DecryptRequest decryptRequest = DecryptRequest.builder().decryptDataDetails(decryptDataDetails).build(); DecryptResponse decryptResponse = cryptoClient.decrypt(decryptRequest); // Get decrypted text, the result is base64 encoded string final String bas64String = decryptResponse.getDecryptedData().getPlaintext(); // Decode from base64 to String byte[] byteArray = Base64.decodeBase64(bas64String.getBytes()); String value = new String(byteArray); // Return final result return value; }
Finally connect to OCI Streaming Service to publish the received event:
public String handleRequest(Event event) { // OBP sends a test event when initially subscribing with its Event service, in that case, do nothing and return if(event.eventType.equalsIgnoreCase("test")) { return "done"; } // Dynamically create a topic based on OBP event Name String topic = event.eventMsg.eventName; // Prepare Kafka authentication credentials, all values are passed in as Function Configuration values. // Keep in mind userName and password are encrypted using KMS, hence they need to be decrypted before use String saslJassConfig = String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username=\"%s/%s/%s\" password=\"%s\";" , tenancyName,decryptData(userName),streamOCID,decryptData(authToken)); // Set Kafka settings Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG , bootstrapServers); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG , StringSerializer.class.getName()); props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL"); props.put(SaslConfigs.SASL_MECHANISM, "PLAIN"); props.put(SaslConfigs.SASL_JAAS_CONFIG, saslJassConfig); props.put(ProducerConfig.RETRIES_CONFIG, 5); // retries on transient errors and load balancing disconnection props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 1024 * 1024); // limit request size to 1MB // Create Kafka Producer KafkaProducerproducer = new KafkaProducer (props); ProducerRecord record = new ProducerRecord (topic , event.toString()); // Send Kafka message producer.send(record); // Flush and close producer.flush(); producer.close(); return "done"; }
These are the list of java libraries you need to import
import com.fnproject.fn.api.FnConfiguration; import com.fnproject.fn.api.RuntimeContext; import com.oracle.bmc.auth.AbstractAuthenticationDetailsProvider; import com.oracle.bmc.auth.ResourcePrincipalAuthenticationDetailsProvider; import com.oracle.bmc.keymanagement.KmsCryptoClient; import com.oracle.bmc.keymanagement.model.DecryptDataDetails; import com.oracle.bmc.keymanagement.requests.DecryptRequest; import com.oracle.bmc.keymanagement.responses.DecryptResponse; import org.apache.commons.codec.binary.Base64; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties;
Now you have your function ready, the next step is to build the function and push its corresponding docker image to OCIR. To build the function, make sure docker is started, and then from the root of the "obpeventsfunc" folder execute the following command from a terminal
obpeventsfunc> fn build
After the command finishes, you can check docker images to ensure it is created
obpeventsfunc> docker images REPOSITORY TAG IMAGE ID CREATED SIZE obpeventsfunc 0.0.1 3ec577e58d1e 4 minutes ago 248MB
In order to push the image to Oracle OCIR, you need to tag the image with the format:
REGION_KEY.ocir.io/OBJECT_STORAGE_NAMESPACE/obpeventsfunc:0.0.1
REGION_KEY: Copy the “Region Key” value from documentation that maps to the home_region you captured before in Part 2.
OBJECT_STORAGE_NAMESAPCE: is the value you captured before in Part 2.
docker tag obpeventsfunc:0.0.1 REGION_KEY.ocir.io/NAMESPACE/obpeventsfunc:0.0.1
As your image is ready now to be pushed to OCIR, you first need to login to OCIR:
> docker login REGION_KEY.ocir.io
Docker will ask you for a username and password:
If all values are correct you should see a “Login Succeeded” Prompt in your terminal.
Finally, you need to push image to OCIR
> docker push REGION_KEY.ocir.io/NAMESPACE/obpeventsfunc:0.0.1
If command succeeds you can check the pushed image from console.
Tamer Qumhieh, A technology evangelist with more than 15 years of technology and IT skillsets specialized in Blockchain, mobile, AppDev, AI Chatbots and Machine Learning. At Oracle A-Team, Tamer works closely with engineers, product management, customers and partners to ensure smooth and proper adoption of cutting-edge technologies in the market.
Previous Post
Next Post