除了 Oracle 对微服务已经全面的支持之外,新功能还使跨服务事务的实现变得更加简单。
Kafka 应用程序现在可以利用高性能事务事件队列 (TxEventQ),直接在 Oracle 数据库上运行,只需进行最少的代码更改。
事务事件队列 (TxEventQ) 现在支持 Apache Kafka 中的 KafkaProducer 和 KafkaConsumer 类。
Oracle 数据库现在可以用作使用 Kafka API 的应用程序的源或目标。
- 用于 Oracle 事务事件队列的 Kafka API
Oracle 事务事件队列 (TxEventQ) 可以轻松实现基于事件的应用程序。它还与 Apache Kafka 高度集成,Apache Kafka 是一个由 LinkedIn 开发并捐赠给 Apache 软件基金会的开源流处理软件平台,用 Scala 和 Java 编写。除了使使用 Kafka API 的应用程序能够透明地在 Oracle TxEventQ 上操作之外,Oracle TxEventQ 还支持 TxEventQ 和 Kafka 之间的双向信息流,以便近乎实时地尽快在 TxEventQ 或 Kafka 中提供更改。
Apache Kafka Connect 是 Apache Kafka 中包含的一个框架,用于将 Kafka 与其他系统集成。Oracle TxEventQ将提供标准的JMS包和相关的JDBC、Transaction包来建立连接并完成事务性数据流。Oracle TxEventQ 配置标准 Kafka JMS 连接器以建立互操作性并完成两个消息系统之间的数据流。
- 用于事务事件队列的 Kafka Java 客户端
Oracle Database 21c 引入了 Kafka 应用程序与 Oracle 数据库的兼容性。Oracle Database 23c 为 Kafka 应用程序与 Oracle 数据库提供了更精细的兼容性。这使得 Kafka Java 应用程序可以轻松迁移到事务事件队列 (TxEventQ)。Kafka Java API 现在可以连接到 Oracle 数据库服务器并使用 TxEventQ 作为消息传递平台。
该图显示了 Kafka API 库,其中包含 Kafka Java API 的 Oracle 特定实现,该实现依赖于 kafka-clients-2.8.0.jar 文件。此实现在内部调用 AQ-JMS API,而 AQ-JMS API 又使用 JDBC 驱动程序与 Oracle 数据库进行通信。
开发人员现在可以使用 okafka.jar 将使用 Kafka 的现有 Java 应用程序迁移到 Oracle 数据库。该客户端库允许 Kafka 应用程序连接到 Oracle 数据库而不是 Kafka 集群,并透明地使用 TxEventQ 的消息传递平台。
- 为事务事件队列配置 Kafka Java 客户端
以下是在 Oracle 数据库中为 TxEventQ 配置和运行 Kafka Java 客户端的先决条件。
- 创建数据库用户。
代码语言:javascript复制注意:通常最好为数据库用户分配或授予表空间上的特定配额,而不是在默认表空间中授予无限配额。 可以创建一个表空间,并使用以下命令向数据库用户授予特定表空间的配额。
ALTER USER user QUOTA UNLIMITED /* or size-clause */ on tablespace_name
2.授予用户以下权限。
代码语言:javascript复制GRANT EXECUTE on DBMS_AQ to user.
GRANT EXECUTE on DBMS_AQADM to user.
GRANT SELECT on GV_$SESSION to user;
GRANT SELECT on V_$SESSION to user;
GRANT SELECT on GV_$INSTANCE to user;
GRANT SELECT on GV_$LISTENER_NETWORK to user;
GRANT SELECT on GV_$PDBS to user;
GRANT SELECT on USER_QUEUE_PARTITION_ASSIGNMENT_TABLE to user;
exec DBMS_AQADM.GRANT_PRIV_FOR_RM_PLAN('user');
3.设置正确的数据库配置参数以使用 TxEventQ。
代码语言:javascript复制SET STREAMS_POOL_SIZE=400M
注意:根据您的工作负载适当设置大小。 无法为共享自治数据库设置 STREAMS_POOL_SIZE。 它是自动配置的。 如果设置,则会被忽略。
4.设置 LOCAL_LISTENER 数据库参数
代码语言:javascript复制SET LOCAL_LISTENER= (ADDRESS=(PROTOCOL=TCP)(HOST=<HOST_NAME.DOMAIN_NAME/ IP> )(PORT=<PORT NUMBER>))
- 连接配置
Kafka API 库使用 JDBC Thin Driver 连接到 Oracle 数据库。要建立此连接,Kafka 应用程序可以以纯文本形式提供用户名和密码,或者应用程序可以配置 SSL。要针对 OCI 上的 Oracle 自治数据库 (ADB) 运行 Kafka 应用程序,仅支持 SSL 配置。对于其他部署,您可以使用 PLAINTEXT 或 SSL 连接到 Oracle 数据库。
- PLAINTEXT:在此安全协议中,使用 TCP 协议设置与 Oracle 数据库的 JDBC 连接,并在 ojdbc.properties 文件中以明文形式提供用户名和密码。
要使用 PLAINTEXT 协议,应用程序必须设置以下属性:
oracle.service.name = <实例上运行的服务的名称>
bootstrap.servers = <host:port>
- security.protocol=PLAINTEXT
- oracle.net.tns_admin = <ojdbc.properties 文件的位置>
ojdbc.properties 文件中应存在以下属性:
- user = <nameofdatabaseuser>
- password = <userpassword>
- SSL:要使用 SSL 安全协议连接到 ATP 数据库,请执行以下附加步骤。
- JDBC瘦驱动程序连接SSL安全的先决条件:
- JDK8u162或更高版本。
- oraclepki.jar、osdt_cert.jar和osdt_core.jar
- 18.3或更高版本JDBC精简驱动程序
- JDBC瘦驱动程序连接SSL安全的先决条件:
为了利用JDBC SSL安全性连接到Oracle数据库实例,用户必须提供以下属性。JDBC通过两种方式支持到Oracle数据库的SSL安全连接。
2.使用钱包。要使用钱包:
- 在类路径中添加使用Oracle钱包所需的依赖jar。
下载oraclepki.jar、osdt_cert.jar和osdt_core.jar文件以及JDBC瘦驱动程序,并将这些jar添加到类路径中。
- 启用Oracle PKI提供程序
如果SSO钱包(即cwallet.SSO)用于提供SSL安全性,则在java.security文件(位于$JRE_HOME/JRE/lib/security/java.security)的末尾添加OraclePKIProvider。例如:
代码语言:javascript复制security.provider.1=sun.security.provider.Sun
security.provider.2=sun.security.rsa.SunRsaSign
security.provider.3=com.sun.net.ssl.internal.ssl.Provider
security.provider.4=com.sun.crypto.provider.SunJCE
security.provider.5=sun.security.jgss.SunProvider
security.provider.6=com.sun.security.sasl.Provider
security.provider.7=oracle.security.pki.OraclePKIProvider
要将ewallet.p12用于SSL安全性,请在java.security文件中将OraclePKIProvider放在sun提供程序之前。例如:
代码语言:javascript复制security.provider.1=sun.security.provider.Sun
security.provider.2=sun.security.rsa.SunRsaSign
security.provider.3=oracle.security.pki.OraclePKIProvider
security.provider.4=com.sun.net.ssl.internal.ssl.Provider
security.provider.5=com.sun.crypto.provider.SunJCE
security.provider.6=sun.security.jgss.SunProvider
security.provider.7=com.sun.security.sasl.Provider
代码语言:javascript复制security.protocol=SSL
oracle.net.tns_admin=<location of tnsnames.ora file>
tns.alias=<alias of connection string in tnsnames.ora>
在ojdbc.properties文件中设置以下特性。此文件必须在oracle.net.tns_admin属性指定的位置可用。
代码语言:javascript复制user(in smallletters)=nameofdatabaseuser
password(in smallletters)=userpassword
oracle.net.ssl_server_dn_match=true
oracle.net.wallet_location=“(SOURCE=(METHOD=FILE)
(METHOD_DATA=(DIRECTORY=/location../wallet_dbname)))”
3.要在Java KeyStore中使用JDBC SSL安全性,请在应用程序中提供以下属性:
代码语言:javascript复制security.protocol=SSL
oracle.net.tns_admin=<location of tnsnames.ora file>
tns.alias=<alias of connection string in tnsnames.ora>
在ojdbc.properties文件中设置以下特性。此文件必须在oracle.net.tns_admin属性指定的位置可用。
代码语言:javascript复制user(in smallletters)=nameofdatabaseuser
password(in smallletters)=userpassword
oracle.net.ssl_server_dn_match=true
javax.net.ssl.trustStore==${TNS_ADMIN}/truststore.jks
javax.net.ssl.trustStorePassword=password
javax.net.ssl.keyStore=${TNS_ADMIN}/keystore.jks
javax.net.ssl.keyStorePassword=password
- Kafka客户端接口
Kafka应用程序主要使用Producer、Consumer和AdminAPI与Kafka集群进行通信。此版本的用于TxEventQ的Kafka客户端仅支持Apache Kafka 2.8.0的Producer、Consumer和Admin API和属性的一个子集。有了okafka.jar客户端库,Kafka应用程序将能够使用Oracle TxEventQ平台。okafka.jar库需要JRE 9或更高版本。
Kafka API Examples
示例: 创建一个 Oracle Kafka Topic
代码语言:javascript复制import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import org.oracle.okafka.clients.admin.AdminClient;
public class SimpleAdminOKafka {
public static void main(String[] args) {
Properties props = new Properties();
//IP or Host name where Oracle Database 23c is running and Database Listener's Port
props.put("bootstrap.servers", "localhost:1521");
//name of the service running on the database instance
props.put("oracle.service.name", "freepdb1");
props.put("security.protocol","PLAINTEXT");
// location for ojdbc.properties file where user and password properties are saved
props.put("oracle.net.tns_admin",".");
try (Admin admin = AdminClient.create(props)) {
//Create Topic named TEQ with 10 Partitions.
CreateTopicsResult result = admin.createTopics(
Arrays.asList(new NewTopic("TEQ", 10, (short)0)));
try {
KafkaFuture<Void> ftr = result.all();
ftr.get();
} catch ( InterruptedException | ExecutionException e ) {
throw new IllegalStateException(e);
}
System.out.println("Closing OKafka admin now");
}
catch(Exception e)
{
System.out.println("Exception while creating topic " e);
e.printStackTrace();
}
}
}
示例: Kafka Consumer API
代码语言:javascript复制import java.util.Properties;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.oracle.okafka.clients.consumer.KafkaConsumer;
public class SimpleConsumerOKafka {
// Dummy implementation of ConsumerRebalanceListener interface
// It only maintains the list of assigned partitions in assignedPartitions list
static class ConsumerRebalance implements ConsumerRebalanceListener {
public List<TopicPartition> assignedPartitions = new ArrayList<>();
@Override
public synchronized void onPartitionsAssigned(Collection<TopicPartition> partitions) {
System.out.println("Newly Assigned Partitions:");
for (TopicPartition tp :partitions ) {
System.out.println(tp);
assignedPartitions.add(tp);
}
}
@Override
public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions) {
System.out.println("Revoked previously assigned partitions. ");
for (TopicPartition tp :assignedPartitions ) {
System.out.println(tp);
}
assignedPartitions.clear();
}
}
public static void main(String[] args) {
//System.setProperty("org.slf4j.simpleLogger.defaultLogLevel", "TRACE");
Properties props = new Properties();
//IP or Host name where Oracle Database 23c is running and Database Listener's Port
props.put("bootstrap.servers", "localhost:1521");
//name of the service running on the database instance
props.put("oracle.service.name", "freepdb1");
props.put("security.protocol","PLAINTEXT");
// location for ojdbc.properties file where user and password properties are saved
props.put("oracle.net.tns_admin",".");
//Consumer Group Name
props.put("group.id" , "CG1");
props.put("enable.auto.commit","false");
// Maximum number of records fetched in single poll call
props.put("max.poll.records", 2000);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Consumer<String , String> consumer = new KafkaConsumer<String, String>(props);
ConsumerRebalanceListener rebalanceListener = new ConsumerRebalance();
//Subscribe to a single topic named 'TEQ'.
consumer.subscribe(Arrays.asList("TEQ"), rebalanceListener);
int expectedMsgCnt = 40000;
int msgCnt = 0;
Instant startTime = Instant.now();
try {
while(true) {
try {
//Consumes records from the assigned partitions of 'TEQ' topic
ConsumerRecords <String, String> records = consumer.poll(Duration.ofMillis(10000));
//Print consumed records
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("partition = %d, offset = %d, key = %s, value =%sn ", record.partition(), record.offset(), record.key(), record.value());
for(Header h: record.headers())
{
System.out.println("Header: " h.toString());
}
}
//Commit all the consumed records
if(records != null && records.count() > 0) {
msgCnt = records.count();
System.out.println("Committing records " records.count());
try {
consumer.commitSync();
}catch(Exception e)
{
System.out.println("Exception in commit " e.getMessage());
continue;
}
if(msgCnt >= expectedMsgCnt )
{
System.out.println("Received " msgCnt " Expected " expectedMsgCnt ". Exiting Now.");
break;
}
}
else {
System.out.println("No Record Fetched. Retrying in 1 second");
Thread.sleep(1000);
}
}catch(Exception e)
{
System.out.println("Inner Exception " e.getMessage());
throw e;
}
}
}catch(Exception e)
{
System.out.println("Exception from OKafka consumer " e);
e.printStackTrace();
}finally {
long runDuration = Duration.between(startTime, Instant.now()).toMillis();
System.out.println("Closing OKafka Consumer. Received " msgCnt " records. Run Duration " runDuration);
consumer.close();
}
}
}
示例: Kafka Producer API
代码语言:javascript复制import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.oracle.okafka.clients.producer.KafkaProducer;
import java.time.Duration;
import java.time.Instant;
import java.util.Properties;
import java.util.concurrent.Future;
public class SimpleProducerOKafka {
public static void main(String[] args) {
try {
Properties props = new Properties();
//IP or Host name where Oracle Database 23c is running and Database Listener's Port
props.put("bootstrap.servers", "localhost:1521");
//name of the service running on the database instance
props.put("oracle.service.name", "freepdb1");
props.put("security.protocol","PLAINTEXT");
// location for ojdbc.properties file where user and password properties are saved
props.put("oracle.net.tns_admin",".");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
String baseMsg = "This is a test message ";
// Creates OKafka Producer
Producer<String, String> producer = new KafkaProducer<String, String>(props);
Future<RecordMetadata> lastFuture = null;
int msgCnt = 40000;
Instant startTime = Instant.now();
//Headers, common for all records
RecordHeader rH1 = new RecordHeader("CLIENT_ID", "FIRST_CLIENT".getBytes());
RecordHeader rH2 = new RecordHeader("REPLY_TO", "REPLY_TOPIC_NAME".getBytes());
//Produce 40000 messages into topic named "TEQ".
for(int i=0;i<msgCnt;i ) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("TEQ", "" i, baseMsg i);
producerRecord.headers().add(rH1).add(rH2);
lastFuture =producer.send(producerRecord);
}
//Waits until the last message is acknowledged
lastFuture.get();
long runTime = Duration.between( startTime, Instant.now()).toMillis();
System.out.println("Produced " msgCnt " messages. Run Duration " runTime);
//Closes the OKafka producer
producer.close();
}
catch(Exception e)
{
System.out.println("Exception in Main " e );
e.printStackTrace();
}
}
}
示例: 删除一个 Oracle Kafka Topic
代码语言:javascript复制import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.admin.Admin;
import org.oracle.okafka.clients.admin.AdminClient;
public class SimpleAdminDeleteTopic {
public static void main(String[] args) {
Properties props = new Properties();
//IP or Host name where Oracle Database 23c is running and Database Listener's Port
props.put("bootstrap.servers", "localhost:1521");
//name of the service running on the database instance
props.put("oracle.service.name", "freepdb1");
props.put("security.protocol","PLAINTEXT");
// location for ojdbc.properties file where user and password properties are saved
props.put("oracle.net.tns_admin",".");
try (Admin admin = AdminClient.create(props)) {
//Throws Exception if failed to delete the topic. Returns null on successful deletion.
org.apache.kafka.clients.admin.DeleteTopicsResult delResult =
admin.deleteTopics(Collections.singletonList("TEQ"));
Thread.sleep(1000);
System.out.println("Closing admin now");
}
catch(Exception e)
{
System.out.println("Exception while creating topic " e);
e.printStackTrace();
}
}
}