SpringBoot 对IBM MQ进行数据监听接收以及数据发…

2019-11-27 16:04:43来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

SpringBoot 对IBM MQ进行数据监听接收以及数据发送

一、需求介绍

后端使用Spring Boot2.0框架,要实现IBM MQ的实时数据JMS监听接收处理,并形成回执通过MQ队列发送。

二、引入依赖jar包

<dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>4.3.18.RELEASE</version>
</dependency>
<dependency>
        <groupId>javax.jms</groupId>
        <artifactId>javax.jms-api</artifactId>
</dependency>
<dependency>
        <groupId>com.ibm.mq</groupId>
        <artifactId>com.ibm.mq.allclient</artifactId>
        <version>9.1.0.0</version>
</dependency>

三、监听实现

代码中分为三大块:

1、MQ通道连接,我这边是用的用户名密码连接,如果非密码的可不入参

2、MQ的队列连接并实现监听

3、MQ发送

@Configuration
public class MqTestConfig {
    
    @Autowired
    private MqProperties mqProperties;
    
    /**=======================MQ 通道工厂============================**/
    @Bean(name="mqQueueConnectionFactory")
    public MQQueueConnectionFactory mqQueueConnectionFactory(){
        MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
        mqQueueConnectionFactory.setHostName(mqProperties.getHostName());
        try {
            mqQueueConnectionFactory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            mqQueueConnectionFactory.setCCSID(mqProperties.getCcsid());
            mqQueueConnectionFactory.setChannel(mqProperties.getChannel());
            mqQueueConnectionFactory.setPort(mqProperties.getPort());
            mqQueueConnectionFactory.setQueueManager(mqProperties.getQueueManager());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return mqQueueConnectionFactory;
    }
    @Bean(name="userCredentialsConnectionFactoryAdapter")
    public UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter(MQQueueConnectionFactory mqQueueConnectionFactory){
        UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter = new UserCredentialsConnectionFactoryAdapter();
        userCredentialsConnectionFactoryAdapter.setUsername(mqProperties.getUserName());
        userCredentialsConnectionFactoryAdapter.setPassword(mqProperties.getPassword());
        userCredentialsConnectionFactoryAdapter.setTargetConnectionFactory(mqQueueConnectionFactory);
        return userCredentialsConnectionFactoryAdapter;
    }
    
    /**============================MQ 消息监听接收=============================**/
    //队列连接
    @Bean(name="mqueue")
    public MQQueue mqueue(){
        MQQueue mqQueue = new MQQueue();
        try {
            mqQueue.setBaseQueueName(mqProperties.getBaseQueueNameRecv());
            mqQueue.setBaseQueueManagerName(mqProperties.getBaseQueueManagerName());
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return mqQueue;
    }
    //对队列进行监听
    @Bean(name="simpleMessageListenerContainer")
    public SimpleMessageListenerContainer simpleMessageListenerContainer(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter,MQQueue mqueue){
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setConnectionFactory(userCredentialsConnectionFactoryAdapter);
        simpleMessageListenerContainer.setDestination(mqueue);
        simpleMessageListenerContainer.setMessageListener(decMqRiskRecvService());
        return simpleMessageListenerContainer;
    }
    //报文处理类
    @Bean(name="decMqRiskRecvService")
    public DecMqRiskRecvService decMqRiskRecvService(){
        return new DecMqRiskRecvService();
    }
    
    /**============================MQ 发送消息============================**/
    @Bean(name="cachingConnectionFactory")
    public CachingConnectionFactory cachingConnectionFactory(UserCredentialsConnectionFactoryAdapter userCredentialsConnectionFactoryAdapter){
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setTargetConnectionFactory(userCredentialsConnectionFactoryAdapter);
        cachingConnectionFactory.setSessionCacheSize(5);
        cachingConnectionFactory.setReconnectOnException(true);
        return cachingConnectionFactory;
    }
    @Bean(name="jmsTransactionManager")
    public PlatformTransactionManager jmsTransactionManager(CachingConnectionFactory cachingConnectionFactory){
        JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
        jmsTransactionManager.setConnectionFactory(cachingConnectionFactory);
        return jmsTransactionManager;
    }
    @Bean(name="jmsOperations")
    public JmsOperations jmsOperations(CachingConnectionFactory cachingConnectionFactory){
        JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
        jmsTemplate.setReceiveTimeout(mqProperties.getReceiveTimeout());
        return jmsTemplate;
    }

}

mq配置文件

记得要添加get和set方法

@Configuration
@ConfigurationProperties(prefix=MqProperties.MQ_PREFIX)
public class MqProperties {
    public static final String MQ_PREFIX = "mq";
    private String hostName;
    private int port;
    private String channel;
    private int ccsid;
    private String userName;
    private String password;
    private String queueManager;
    private String baseQueueManagerName;
    private String baseQueueNameRecv;
    private String baseQueueNameSend;
    private long receiveTimeout;
    
}

报文处理类及回执发送

1、实现类要实现MessageListener,重写onMessage方法,Message就是监听到的消息。

2、读取报文时为防止乱码,我这边按照格式分两种方式读取转码。

3、发送回执,之前发送发现报文多出了一些报文头信息,所以在队列信息加了

  "queue:///" + mqProperties.getBaseQueueNameSend() + "?targetClient=1"

  这样发送的报文会去掉报文头信息。

@Service
public class DecMqRiskRecvService implements MessageListener {

    @Autowired
    private JmsOperations jmsOperations;
    @Autowired
    private MqProperties mqProperties;

    @Override
    public void onMessage(Message message) {
        String str = null;
        // 1、读取报文
        try {
            if (message instanceof BytesMessage) {
                BytesMessage bm = (BytesMessage) message;
                byte[] bys = null;
                bys = new byte[(int) bm.getBodyLength()];
                bm.readBytes(bys);
                str = new String(bys, "UTF-8");
            } else {
                str = ((TextMessage) message).getText();
                str = new String(str.getBytes("ISO-8859-1"), "UTF-8");
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }

        // 2、处理报文

        // 3、组装回执发送
        String receipt = "";
        try {
            jmsOperations.convertAndSend("queue:///" + mqProperties.getBaseQueueNameSend() + "?targetClient=1", receipt.getBytes("UTF-8"));
        } catch (JmsException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }

}

 


原文链接:https://www.cnblogs.com/cloudam/p/11942373.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:springboot~maven集成开发里的docker构建

下一篇:【并发编程】Java对并发编程的支持历史