Oracle 23c 中用于事务事件队列的 Kafka Java 客户端

2023-11-02 18:43:50 浏览数 (2)

除了 Oracle 对微服务已经全面的支持之外,新功能还使跨服务事务的实现变得更加简单。

Kafka 应用程序现在可以利用高性能事务事件队列 (TxEventQ),直接在 Oracle 数据库上运行,只需进行最少的代码更改。

事务事件队列 (TxEventQ) 现在支持 Apache Kafka 中的 KafkaProducer 和 KafkaConsumer 类。

Oracle 数据库现在可以用作使用 Kafka API 的应用程序的源或目标。

  • 用于 Oracle 事务事件队列的 Kafka API

Oracle 事务事件队列 (TxEventQ) 可以轻松实现基于事件的应用程序。它还与 Apache Kafka 高度集成,Apache Kafka 是一个由 LinkedIn 开发并捐赠给 Apache 软件基金会的开源流处理软件平台,用 Scala 和 Java 编写。除了使使用 Kafka API 的应用程序能够透明地在 Oracle TxEventQ 上操作之外,Oracle TxEventQ 还支持 TxEventQ 和 Kafka 之间的双向信息流,以便近乎实时地尽快在 TxEventQ 或 Kafka 中提供更改。

Apache Kafka Connect 是 Apache Kafka 中包含的一个框架,用于将 Kafka 与其他系统集成。Oracle TxEventQ将提供标准的JMS包和相关的JDBC、Transaction包来建立连接并完成事务性数据流。Oracle TxEventQ 配置标准 Kafka JMS 连接器以建立互操作性并完成两个消息系统之间的数据流。

  • 用于事务事件队列的 Kafka Java 客户端

Oracle Database 21c 引入了 Kafka 应用程序与 Oracle 数据库的兼容性。Oracle Database 23c 为 Kafka 应用程序与 Oracle 数据库提供了更精细的兼容性。这使得 Kafka Java 应用程序可以轻松迁移到事务事件队列 (TxEventQ)。Kafka Java API 现在可以连接到 Oracle 数据库服务器并使用 TxEventQ 作为消息传递平台。

该图显示了 Kafka API 库,其中包含 Kafka Java API 的 Oracle 特定实现,该实现依赖于 kafka-clients-2.8.0.jar 文件。此实现在内部调用 AQ-JMS API,而 AQ-JMS API 又使用 JDBC 驱动程序与 Oracle 数据库进行通信。

开发人员现在可以使用 okafka.jar 将使用 Kafka 的现有 Java 应用程序迁移到 Oracle 数据库。该客户端库允许 Kafka 应用程序连接到 Oracle 数据库而不是 Kafka 集群,并透明地使用 TxEventQ 的消息传递平台。

  • 为事务事件队列配置 Kafka Java 客户端

以下是在 Oracle 数据库中为 TxEventQ 配置和运行 Kafka Java 客户端的先决条件。

  1. 创建数据库用户。

注意:通常最好为数据库用户分配或授予表空间上的特定配额,而不是在默认表空间中授予无限配额。 可以创建一个表空间,并使用以下命令向数据库用户授予特定表空间的配额。

代码语言:javascript复制
ALTER USER user QUOTA  UNLIMITED /* or size-clause */ on tablespace_name

2.授予用户以下权限。

代码语言:javascript复制
GRANT EXECUTE on DBMS_AQ to user.

GRANT EXECUTE on DBMS_AQADM to user.

GRANT SELECT on GV_$SESSION to user;

GRANT SELECT on V_$SESSION to user;

GRANT SELECT on GV_$INSTANCE to user;

GRANT SELECT on GV_$LISTENER_NETWORK to user;

GRANT SELECT on GV_$PDBS to user;

GRANT SELECT on USER_QUEUE_PARTITION_ASSIGNMENT_TABLE to user;

exec DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user');

3.设置正确的数据库配置参数以使用 TxEventQ。

代码语言:javascript复制
SET STREAMS_POOL_SIZE=400M

注意:根据您的工作负载适当设置大小。 无法为共享自治数据库设置 STREAMS_POOL_SIZE。 它是自动配置的。 如果设置,则会被忽略。

4.设置 LOCAL_LISTENER 数据库参数

代码语言:javascript复制
SET LOCAL_LISTENER= (ADDRESS=(PROTOCOL=TCP)(HOST=<HOST_NAME.DOMAIN_NAME/ IP> )(PORT=<PORT NUMBER>))
  • 连接配置

Kafka API 库使用 JDBC Thin Driver 连接到 Oracle 数据库。要建立此连接,Kafka 应用程序可以以纯文本形式提供用户名和密码,或者应用程序可以配置 SSL。要针对 OCI 上的 Oracle 自治数据库 (ADB) 运行 Kafka 应用程序,仅支持 SSL 配置。对于其他部署,您可以使用 PLAINTEXT 或 SSL 连接到 Oracle 数据库。

  • PLAINTEXT:在此安全协议中,使用 TCP 协议设置与 Oracle 数据库的 JDBC 连接,并在 ojdbc.properties 文件中以明文形式提供用户名和密码。 要使用 PLAINTEXT 协议,应用程序必须设置以下属性:
    • oracle.service.name = <实例上运行的服务的名称>
    • bootstrap.servers = <host:port>
    • security.protocol=PLAINTEXT
    • oracle.net.tns_admin = <ojdbc.properties 文件的位置>

ojdbc.properties 文件中应存在以下属性:

  • user = <nameofdatabaseuser>
  • password = <userpassword>

  • SSL:要使用 SSL 安全协议连接到 ATP 数据库,请执行以下附加步骤。
    1. ‍‍JDBC瘦驱动程序连接SSL安全的先决条件:
      • JDK8u162或更高版本。
      • oraclepki.jar、osdt_cert.jar和osdt_core.jar
      • 18.3或更高版本JDBC精简驱动程序

为了利用JDBC SSL安全性连接到Oracle数据库实例,用户必须提供以下属性。JDBC通过两种方式支持到Oracle数据库的SSL安全连接。‍‍

2.使用钱包。要使用钱包:

  • 在类路径中添加使用Oracle钱包所需的依赖jar。

下载oraclepki.jar、osdt_cert.jar和osdt_core.jar文件以及JDBC瘦驱动程序,并将这些jar添加到类路径中。

  • 启用Oracle PKI提供程序

如果SSO钱包(即cwallet.SSO)用于提供SSL安全性,则在java.security文件(位于$JRE_HOME/JRE/lib/security/java.security)的末尾添加OraclePKIProvider。例如:

代码语言:javascript复制
security.provider.1=sun.security.provider.Sun
security.provider.2=sun.security.rsa.SunRsaSign
security.provider.3=com.sun.net.ssl.internal.ssl.Provider
security.provider.4=com.sun.crypto.provider.SunJCE
security.provider.5=sun.security.jgss.SunProvider
security.provider.6=com.sun.security.sasl.Provider
security.provider.7=oracle.security.pki.OraclePKIProvider

要将ewallet.p12用于SSL安全性,请在java.security文件中将OraclePKIProvider放在sun提供程序之前。例如:

代码语言:javascript复制
security.provider.1=sun.security.provider.Sun
security.provider.2=sun.security.rsa.SunRsaSign
security.provider.3=oracle.security.pki.OraclePKIProvider
security.provider.4=com.sun.net.ssl.internal.ssl.Provider
security.provider.5=com.sun.crypto.provider.SunJCE
security.provider.6=sun.security.jgss.SunProvider
security.provider.7=com.sun.security.sasl.Provider
代码语言:javascript复制
security.protocol=SSL
oracle.net.tns_admin=<location of tnsnames.ora file>
tns.alias=<alias of connection string in tnsnames.ora>             

在ojdbc.properties文件中设置以下特性。此文件必须在oracle.net.tns_admin属性指定的位置可用。

代码语言:javascript复制
user(in smallletters)=nameofdatabaseuser
password(in smallletters)=userpassword
oracle.net.ssl_server_dn_match=true
oracle.net.wallet_location=“(SOURCE=(METHOD=FILE)
                            (METHOD_DATA=(DIRECTORY=/location../wallet_dbname)))”

3.要在Java KeyStore中使用JDBC SSL安全性,请在应用程序中提供以下属性:

代码语言:javascript复制
security.protocol=SSL             
oracle.net.tns_admin=<location of tnsnames.ora file>              
tns.alias=<alias of connection string in tnsnames.ora>

在ojdbc.properties文件中设置以下特性。此文件必须在oracle.net.tns_admin属性指定的位置可用。

代码语言:javascript复制
user(in smallletters)=nameofdatabaseuser
password(in smallletters)=userpassword
oracle.net.ssl_server_dn_match=true
javax.net.ssl.trustStore==${TNS_ADMIN}/truststore.jks
javax.net.ssl.trustStorePassword=password
javax.net.ssl.keyStore=${TNS_ADMIN}/keystore.jks
javax.net.ssl.keyStorePassword=password
  • Kafka客户端接口

Kafka应用程序主要使用Producer、Consumer和AdminAPI与Kafka集群进行通信。此版本的用于TxEventQ的Kafka客户端仅支持Apache Kafka 2.8.0的Producer、Consumer和Admin API和属性的一个子集。有了okafka.jar客户端库,Kafka应用程序将能够使用Oracle TxEventQ平台。okafka.jar库需要JRE 9或更高版本。

Kafka API Examples

示例: 创建一个 Oracle Kafka Topic

代码语言:javascript复制
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;

import org.oracle.okafka.clients.admin.AdminClient;

public class SimpleAdminOKafka {

  public static void main(String[] args) {
    Properties props = new Properties();
    //IP or Host name where Oracle Database 23c is running and Database Listener's Port
    props.put("bootstrap.servers", "localhost:1521");

    //name of the service running on the database instance
    props.put("oracle.service.name", "freepdb1");
    props.put("security.protocol","PLAINTEXT");

    // location for ojdbc.properties file where user and password properties are saved
    props.put("oracle.net.tns_admin","."); 

    try (Admin admin = AdminClient.create(props)) {
      //Create Topic named TEQ with 10 Partitions.
      CreateTopicsResult result = admin.createTopics(
          Arrays.asList(new NewTopic("TEQ", 10, (short)0)));
      try {
        KafkaFuture<Void> ftr =  result.all();
        ftr.get();
      } catch ( InterruptedException | ExecutionException e ) {

        throw new IllegalStateException(e);
      }
      System.out.println("Closing  OKafka admin now");
    }
    catch(Exception e)
    {
      System.out.println("Exception while creating topic "   e);
      e.printStackTrace();
    }
  }
}

示例: Kafka Consumer API

代码语言:javascript复制
import java.util.Properties;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.oracle.okafka.clients.consumer.KafkaConsumer;

public class SimpleConsumerOKafka {

  // Dummy implementation of ConsumerRebalanceListener interface
  // It only maintains the list of assigned partitions in assignedPartitions list
  static class ConsumerRebalance implements ConsumerRebalanceListener {

    public List<TopicPartition> assignedPartitions = new ArrayList<>();
    
    @Override
    public synchronized void onPartitionsAssigned(Collection<TopicPartition>  partitions) { 
      System.out.println("Newly Assigned Partitions:");
      for (TopicPartition tp :partitions ) {
        System.out.println(tp);
        assignedPartitions.add(tp);
      }
    } 
    
    @Override
    public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions) {
      System.out.println("Revoked previously assigned partitions. ");
      for (TopicPartition tp :assignedPartitions ) {
        System.out.println(tp);
      }
      assignedPartitions.clear();
    }
  }

  public static void main(String[] args) {
    //System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "TRACE");
    Properties props = new Properties();
    //IP or Host name where Oracle Database 23c is running and Database Listener's Port
    props.put("bootstrap.servers", "localhost:1521");
    
    //name of the service running on the database instance
    props.put("oracle.service.name", "freepdb1");
    props.put("security.protocol","PLAINTEXT");
    
    // location for ojdbc.properties file where user and password properties are saved
    props.put("oracle.net.tns_admin","."); 
    
    //Consumer Group Name
    props.put("group.id" , "CG1");
    props.put("enable.auto.commit","false");
    
    // Maximum number of records fetched in single poll call
    props.put("max.poll.records", 2000);

    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

    Consumer<String , String> consumer = new KafkaConsumer<String, String>(props);    
    ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance();
    
    //Subscribe to a single topic named 'TEQ'.
    consumer.subscribe(Arrays.asList("TEQ"), rebalanceListener);
    
    int expectedMsgCnt = 40000;
    int msgCnt = 0;
    Instant startTime = Instant.now();
    try {
      while(true) {
        try {
          //Consumes records from the assigned partitions of 'TEQ' topic
          ConsumerRecords <String, String> records = consumer.poll(Duration.ofMillis(10000));
          //Print consumed records
          for (ConsumerRecord<String, String> record : records)  
          {
            System.out.printf("partition = %d, offset = %d, key = %s, value =%sn ", record.partition(), record.offset(), record.key(), record.value());
            for(Header h: record.headers())
            {
              System.out.println("Header: "  h.toString());
            } 
          }
          //Commit all the consumed records
          if(records != null && records.count() > 0) {
            msgCnt  = records.count();
            System.out.println("Committing records "   records.count());
            try {
              consumer.commitSync();
            }catch(Exception e)
            {
              System.out.println("Exception in commit "   e.getMessage());
              continue;
            }
            if(msgCnt >= expectedMsgCnt )
            {
              System.out.println("Received "   msgCnt   " Expected "   expectedMsgCnt  ". Exiting Now.");
              break;
            }
          }
          else {
            System.out.println("No Record Fetched. Retrying in 1 second");
            Thread.sleep(1000);
          }

        }catch(Exception e)
        {
          System.out.println("Inner Exception "   e.getMessage());
          throw e;
        }      
      }
    }catch(Exception e)
    {
      System.out.println("Exception from OKafka consumer "   e);
      e.printStackTrace();
    }finally {
      long runDuration = Duration.between(startTime, Instant.now()).toMillis();
      System.out.println("Closing OKafka Consumer. Received "  msgCnt  " records. Run Duration "   runDuration);
      consumer.close();
    }
  }
}

示例: Kafka Producer API

代码语言:javascript复制
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;

import org.oracle.okafka.clients.producer.KafkaProducer;

import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.Future;

public class SimpleProducerOKafka {
  public static void main(String[] args) {
    try {
      Properties props = new Properties();
      //IP or Host name where Oracle Database 23c is running and Database Listener's Port
      props.put("bootstrap.servers", "localhost:1521");
      
      //name of the service running on the database instance
      props.put("oracle.service.name", "freepdb1");
      props.put("security.protocol","PLAINTEXT");
      
      // location for ojdbc.properties file where user and password properties are saved
      props.put("oracle.net.tns_admin","."); 

      
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");    
      
      String baseMsg = "This is a test message ";
      // Creates OKafka Producer
      Producer<String, String> producer = new KafkaProducer<String, String>(props);
      
      Future<RecordMetadata> lastFuture = null;
      int msgCnt = 40000;
      Instant startTime = Instant.now();
      
      //Headers, common for all records
      RecordHeader rH1 = new RecordHeader("CLIENT_ID", "FIRST_CLIENT".getBytes());
      RecordHeader rH2 = new RecordHeader("REPLY_TO", "REPLY_TOPIC_NAME".getBytes());
      
      //Produce 40000 messages into topic named "TEQ".
      for(int i=0;i<msgCnt;i  ) {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("TEQ", "" i, baseMsg   i);
        producerRecord.headers().add(rH1).add(rH2);
        lastFuture =producer.send(producerRecord);
      }
      //Waits until the last message is acknowledged
      lastFuture.get();
      long runTime = Duration.between( startTime, Instant.now()).toMillis();
      System.out.println("Produced "  msgCnt  " messages. Run Duration "   runTime);
      //Closes the OKafka producer
      producer.close();
    }    
    catch(Exception e)
    {
      System.out.println("Exception in Main "   e );
      e.printStackTrace();
    }
  }
}

示例: 删除一个 Oracle Kafka Topic

代码语言:javascript复制
import java.util.Collections;
import java.util.Properties;

import org.apache.kafka.clients.admin.Admin;

import org.oracle.okafka.clients.admin.AdminClient;

public class SimpleAdminDeleteTopic {

  public static void main(String[] args) {
    
    Properties props = new Properties();
    //IP or Host name where Oracle Database 23c is running and Database Listener's Port
    props.put("bootstrap.servers", "localhost:1521");

    //name of the service running on the database instance
    props.put("oracle.service.name", "freepdb1");
    props.put("security.protocol","PLAINTEXT");

    // location for ojdbc.properties file where user and password properties are saved
    props.put("oracle.net.tns_admin","."); 

    try (Admin admin = AdminClient.create(props)) {
      //Throws Exception if failed to delete the topic. Returns null on successful deletion.
      org.apache.kafka.clients.admin.DeleteTopicsResult delResult =
          admin.deleteTopics(Collections.singletonList("TEQ"));
      Thread.sleep(1000);
      System.out.println("Closing admin now");
    }
    catch(Exception e)
    {
      System.out.println("Exception while creating topic "   e);
      e.printStackTrace();
    }
  }

}

0 人点赞