- 该文档中,oracle版本为11g,jdk版本1.8,java项目使用maven构建,并使用了定时任务来做AQ监听的重连功能,解决由于外部原因导致连接断裂之后,需要手动重启项目才能恢复连接的问题
一、创建队列
1.1.管理员登录执行
- 管理员登录,执行授权操作,oracle使用队列需要单独的授权,默认未开启,须手动开启,授权命令如下,username使用自己的用户名即可
GRANT EXECUTE ON SYS.DBMS_AQ to 'username';GRANT EXECUTE ON SYS.DBMS_AQADM to 'username';GRANT EXECUTE ON SYS.DBMS_AQ_BQVIEW to 'username';GRANT EXECUTE ON SYS.DBMS_AQIN to 'username';GRANT EXECUTE ON SYS.DBMS_JOB to 'username';
1.2.用户登录执行执行
1.2.1. 创建消息负荷payload
- 创建的此type用来封装队列所带的,根据实际需求进行创建
CREATE OR REPLACE TYPE TYPE_QUEUE_INFO AS OBJECT( param_1 VARCHAR2(100), param_2 VARCHAR2(100))
1.2.2. 创建队列表
- 创建对列表,并指定队列数据的类型,队列表名自定义即可,数据类型使用上面刚创建的type
begin sys.dbms_aqadm.create_queue_table( queue_table => 'QUEUE_TABLE', queue_payload_type => 'TYPE_QUEUE_INFO', sort_list => 'ENQ_TIME', compatible => '10.0.0', primary_instance => 0, secondary_instance => 0);end;
1.2.3. 创建队列并启动
- 创建名称为QUEUE_TEST的队列,并指定对列表名【同一个oracle用户下,可以有多个对列表,同一个对列表中,可以有多个队列】
begin sys.dbms_aqadm.create_queue( queue_name => 'QUEUE_TEST', queue_table => 'QUEUE_TABLE', queue_type => sys.dbms_aqadm.normal_queue, max_retries => 5, retry_delay => 0, retention_time => 0);end;
- 刚创建的队列的状态默认是未开启的,需要手动开启一下,同理,存在删除、停止等操作
begin -- 启动队列 sys.dbms_aqadm.start_queue( queue_name => 'QUEUE_TEST' ); -- 暂停队列 --sys.dbms_aqadm.STOP_QUEUE( -- queue_name => 'QUEUE_TEST' --); -- 删除队列 --sys.dbms_aqadm.DROP_QUEUE( -- queue_name => 'QUEUE_TEST' --); -- 删除对列表 --sys.dbms_aqadm.DROP_QUEUE_TABLE( -- queue_table => 'QUEUE_TABLE' --);end;
1.2.4. 创建存储过程
- 储存过程的作用为把数据加载到队列中,生成的新的队列会自动添加进绑定的对列表中,等待消费者进行消费
CREATE OR REPLACE PROCEDURE pro_queue(param_1 VARCHAR2, param_2 VARCHAR2) as r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T; r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T; v_message_handle RAW(16); o_payload TYPE_QUEUE_INFO;begin -- 封装最终消息 o_payload := TYPE_QUEUE_INFO(param_1, param_2); -- 入队操作,指定队列 dbms_aq.enqueue(queue_name => 'QUEUE_TEST', enqueue_options => r_enqueue_options, message_properties => r_message_properties, payload => o_payload, msgid => v_message_handle); -- 出队操作 --dbms_aq.enqueue(queue_name => 'QUEUE_TEST', -- dequeue_options => r_dequeue_options, -- message_properties => r_message_properties, -- payload => o_payload, -- msgid => v_message_handle);end pro_queue;
二、Java中JMS的使用
2.1. 项目配置
2.1.1. maven
<dependency> <groupId>com.oracle</groupId> <artifactId>jmscommon</artifactId> <version>1.2</version></dependency><dependency> <groupId>com.oracle</groupId> <artifactId>orai18n</artifactId> <version>1.2</version></dependency><dependency> <groupId>com.oracle</groupId> <artifactId>jta</artifactId> <version>1.2</version></dependency><dependency> <groupId>com.oracle</groupId> <artifactId>aqapi_g</artifactId> <version>1.2</version></dependency>
2.1.2. yml
datasource: url: jdbc:oracle:thin:@ip:port/sid username: ** password: ** queue: aq: # 该队列是否可用,用来控制队列的加载和重连,不可省略 enable: true # 队列名称,不可省略 name: QUEUE_TEST # 队列重连的定时任务对应的时间表达式,不可省略 cron: 0 */1 * * * ?
2.2. AQ初始化
- 在项目启动结束后立即运行此类,会根据所配置的队列名称监听对应的队列
import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.stereotype.Component;/** * @Title: MessageAQInit.java * @Description: AQ 初始化 * @author wangqq * @date 2020年6月28日 下午3:45:23 * @version 1.0 */@Componentpublic class MessageAQInit implements CommandLineRunner { @Autowired private MessageAQConfig aqConfig; @Autowired private MessageAQListener listener; @Override public void run(String... args) throws RuntimeException { // 检查消息队列是否启用 if (aqConfig.enable) { // 设置AQ的消息监听器 MessageAQConnection.setListener(listener); // 设置oracle配置 if (!MessageAQConnection.initFactory(aqConfig)) { throw new RuntimeException("Message Oracle AQ initialization failed!"); } // 建立连接 if (!MessageAQConnection.establishConnection(aqConfig)) { throw new RuntimeException("Message Oracle AQ connection failed!"); } } }}
2.3. 配置信息类
- 配置类,将yml的配置文件转为java对象【时间表达式在代码中不会以对象属性的方式被使用,因此在该类中没有设置】
import org.springframework.beans.factory.annotation.Value;import org.springframework.stereotype.Component;/** * @Title: MessageAQConfig.java * @Description: ORACLE 消息队列配置 * @author wangqq * @date 2020年6月28日 下午3:36:08 * @version 1.0 */@Componentpublic class MessageAQConfig { /** 是否开启MessageAq功能 */ @Value("${queue.aq.enable}") public Boolean enable; /** 数据库用户名 */ @Value("${datasource.username}") public String userName; /** 数据库密码 */ @Value("${datasource.password}") public String password; /** 数据库地址url */ @Value("${datasource.url}") public String url; /** 队列名称 */ @Value("${queue.aq.name}") public String queue;}
2.4. AQ 连接工厂类
- AQ 链接的核心类,根据配置对象以及注入的监听对象,动态监听AQ队列
import javax.jms.Queue;import javax.jms.Session;import lombok.extern.slf4j.Slf4j;import oracle.jms.AQjmsConnection;import oracle.jms.AQjmsConnectionFactory;import oracle.jms.AQjmsConsumer;import oracle.jms.AQjmsSession;/** * @Title: MessageAQConnection.java * @Description: AQ 连接 * @author wangqq * @date 2020年6月28日 下午3:50:32 * @version 1.0 */@Slf4jpublic class MessageAQConnection { private static AQjmsConnectionFactory aQjmsConnectionFactory; private static AQjmsConsumer aQjmsConsumer; private static AQjmsSession aQjmsSession; private static AQjmsConnection aQjmsConnection; private static MessageAQListener listener; /** * 设置JMS监听器 * * @param messageAqJmsListener * @author wangqq * @date 2020年7月6日 上午8:33:57 */ public static void setListener(MessageAQListener messageAqJmsListener) { listener = messageAqJmsListener; } /** * 初始化 AQ 连接 Factory * * @param aqConfig 消息队列配置 * @return 是否成功 */ public static boolean initFactory(MessageAQConfig aqConfig) { try { aQjmsConnectionFactory = new AQjmsConnectionFactory(); aQjmsConnectionFactory.setJdbcURL(aqConfig.url); aQjmsConnectionFactory.setUsername(aqConfig.userName); aQjmsConnectionFactory.setPassword(aqConfig.password); return true; } catch (Exception e) { log.error(e.getMessage(), e); return false; } } /** * 连接消息队列 * * @param aqConfig 消息队列配置 * @return 是否成功 */ public static boolean establishConnection(MessageAQConfig aqConfig) { try { aQjmsConnection = (AQjmsConnection) aQjmsConnectionFactory.createConnection(); aQjmsSession = (AQjmsSession) aQjmsConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); aQjmsConnection.start(); Queue queue = aQjmsSession.getQueue(aqConfig.userName, aqConfig.queue); aQjmsConsumer = (AQjmsConsumer) aQjmsSession.createConsumer(queue, null, MessageORAData.getFactory(), null, false); aQjmsConsumer.setMessageListener(listener); return true; } catch (Exception e) { log.error(e.getMessage(), e); return false; } } /** * 关闭消息队列连接 * * @return 是否成功 */ public static boolean closeConnection() { try { aQjmsConsumer.close(); aQjmsSession.close(); aQjmsConnection.close(); return true; } catch (Exception e) { log.error(e.getMessage(), e); return false; } }}
2.5. 创建AQ 数据承载类
- 用来接收oracle队列中所带的参数,基本保证与数据库中的type格式相同即可
import lombok.Data;/** * @Title: Test.java * @Description: AQ 数据承载类 * @author wangqq * @date 2021-01-20 16:19:16 * @version 1.0 */@Datapublic class Test { private String param_1; private String param_2; }
2.6. 数据类型转换
- 将oracleAq所承载的数据,转化为我们自己需要的实例对象,及上述中的Test对象
package com.synjones.message.oracleaq;import java.sql.Connection;import java.sql.SQLException;import java.sql.Struct;import com.synjones.message.vo.MessageInfoVo;import lombok.NoArgsConstructor;import lombok.extern.slf4j.Slf4j;import oracle.sql.Datum;import oracle.sql.ORAData;import oracle.sql.ORADataFactory;/** * @Title: MessageORAData.java * @Description: 数据类型转换类 * @author synjones * @date 2018年12月3日 上午11:29:50 * @version 1.0 */@Slf4j@NoArgsConstructorpublic class MessageORAData implements ORAData, ORADataFactory { @SuppressWarnings("unused") private Object[] rawData = new Object[8]; private static final MessageORAData MESSAGE_FACTORY = new MessageORAData(); public static ORADataFactory getFactory() { return MESSAGE_FACTORY; } @Override public ORAData create(Datum datum, int sqlType) throws SQLException { if (datum == null) { return null; } else { try { MessageORAData payOraData = new MessageORAData(); Struct aStruct = (Struct) datum; payOraData.rawData = aStruct.getAttributes(); return payOraData; } catch (Exception e) { log.error(e.getMessage(), e); return null; } } } @Override public Datum toDatum(Connection arg0) throws SQLException { return null; } /** * 消息内容解析并封装 * * @return * @author wangqq * @date 2020年7月6日 上午8:38:01 */ public Test getContent() { try { return Test.builder() .param_1(rawData[0] == null ? null : rawData[0].toString()) .param_2(rawData[0] == null ? null : rawData[0].toString()) .build(); } catch (Exception e) { log.error(e.getMessage(), e); return null; } }}
2.7. AQ 监听
import javax.jms.Message;import javax.jms.MessageListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import lombok.extern.slf4j.Slf4j;import oracle.jms.AQjmsAdtMessage;/** * @Title: JMSListener.java * @Description: JMS监听ORACLEAQ的队列消息 * @author wangqq * @date 2020年6月28日 上午11:23:42 * @version 1.0 */@Slf4j@Componentpublic class MessageAQListener implements MessageListener { @Override public void onMessage(Message message1) { AQjmsAdtMessage adtMessage = (AQjmsAdtMessage)message1; try { MessageORAData payload = (MessageORAData)adtMessage.getAdtPayload(); // 获取消息内容 Test test = payload.getContent(); // 个人业务代码 } catch (Exception e) { log.error(e.getMessage(), e); } }}
2.8. AQ 监控任务, 在AQ断开后重连
- 通过定时任务,定时查询是否有入队时间在5分钟之内的队列未被消费【队列入队后,会在对列表中产生一条数据,消费之后该数据会被清除掉】,若存在,则说明监听异常,需要重新创建连接监听队列
- 数据库对列表中的入队时间在本次测试中为0时区的时间,故而在代码中转换了一下时区,否则无法根据入队时间查询数据
import com.synjones.core.tool.utils.DateUtil;import com.synjones.core.tool.utils.StringUtil;import com.synjones.message.mapper.MessageAqMapper;import lombok.extern.slf4j.Slf4j;/** * @Title: MessageAQMonitor.java * @Description: AQ 监控任务, 在AQ断开后重连 * @author wangqq * @date 2020年6月28日 下午4:35:31 * @version 1.0 */@Slf4j@Componentpublic class MessageAQMonitor { @Autowired private MessageAQConfig aqConfig; @Autowired private MessageAqMapper aqMapper; @Scheduled(cron = "${message.queue.aq.cron}") private void monitorJob() { // 检查消息队列是否启用 if (!aqConfig.enable) { return; } // 获取当前时间,并向前推5分钟 String formatDateTime = DateUtil.formatDateTime(new Date(System.currentTimeMillis() - 300000)); // 将该时间转为0时区的时间【数据库中存储的队列时间为0时区的时间】 String zeroZoneTime = DateUtil.timeConvert(formatDateTime, "+08:00", "+00:00", "yyyy-MM-dd HH:mm:ss"); // 查询是否存在5分钟以前的队列未被消费 int selectCount = aqMapper.selectCount(aqConfig.queue, zeroZoneTime); if (selectCount != 0) { // 若存在,则重新启动监听 if (MessageAQConnection.closeConnection()) { log.info("--> AQ connection has been closed."); if (MessageAQConnection.establishConnection(aqConfig)) { log.info("--> AQ connection has been re-established."); } } } }}
2.9. 队列表中队列数量的查询
- 根据队列名称和入队时间,查询在入队时间之后入对的队列数量
import org.apache.ibatis.annotations.Mapper;import org.apache.ibatis.annotations.Select;/** * @Title: MessageAqMapper.java * @Description: oracleAQ的查询 * @author wangqq * @date 2020年6月28日 下午4:04:50 * @version 1.0 */@Mapperpublic interface MessageAqMapper { /** * * 查询数据库中的队列表中符合条件的队列的条数 * * @param qName 队列名称 * @param minDatetime 队列入队的最小时间 * @return * @author wangqq * @date 2020-07-10 15:44:43 */ @Select("select count(msgid) from T_QUEUE_TABLE t where t.q_name = #{qName,jdbcType=VARCHAR} " + "and to_char(cast(t.enq_time AS DATE), 'yyyy-MM-dd HH24:mi:ss') < #{minDatetime,jdbcType=VARCHAR}") int selectCount(String qName, String minDatetime);}
原文转载:http://www.shaoqun.com/a/512387.html
crowd:https://www.ikjzd.com/w/880
patents:https://www.ikjzd.com/w/857
该文档中,oracle版本为11g,jdk版本1.8,java项目使用maven构建,并使用了定时任务来做AQ监听的重连功能,解决由于外部原因导致连接断裂之后,需要手动重启项目才能恢复连接的问题一、创建队列1.1.管理员登录执行管理员登录,执行授权操作,oracle使用队列需要单独的授权,默认未开启,须手动开启,授权命令如下,username使用自己的用户名即可GRANTEXECUTEONSYS.
孙琦:孙琦
hemingway:hemingway
跨境电商大卖们去年的净利率有多少 :跨境电商大卖们去年的净利率有多少
人形骨簪 :人形骨簪
十一北京去哪里购物好?十一北京旅游购物攻略:十一北京去哪里购物好?十一北京旅游购物攻略
没有评论:
发表评论