RedisTemplate实现消息队列并且批量插入数据。
2020-05-22 16:04:31来源:博客园 阅读 ()
RedisTemplate实现消息队列并且批量插入数据。
早期由于生产环境业务量小。所以日志是一条一条commit的。运行也没出过问题。
后来随着业务扩大并发量上来后,日志写入因为频繁与数据库打交道导致数据库连接池经常占满,直至程序崩溃。
因为日志并非需要实时响应。所以考虑改用异步+批量提交的方式。
为了缓解jvm内存压力。采用redis做消息队列(因为原项目有集成过redis,公司不想使用其他mq增加维护成本)。
所以在网上找了篇springboot整合redistemplate做消息队列的资料。稍微改了一下。
参考资料:https://blog.csdn.net/qq_38553333/article/details/82833273
首先是redisConfig。
import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.cache.annotation.EnableCaching; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.redis.connection.RedisConnectionFactory; import org.springframework.data.redis.core.*; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer; import org.springframework.data.redis.serializer.StringRedisSerializer; @Configuration @EnableCaching //开启注解 public class RedisConfig { /** * retemplate相关配置 * @param factory * @return */ @Bean public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) { RedisTemplate<String, Object> template = new RedisTemplate<>(); // 配置连接工厂 template.setConnectionFactory(factory); //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式) Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper om = new ObjectMapper(); // 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); // 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常 om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); jacksonSeial.setObjectMapper(om); // 值采用json序列化 template.setValueSerializer(jacksonSeial); //使用StringRedisSerializer来序列化和反序列化redis的key值 template.setKeySerializer(new StringRedisSerializer()); // 设置hash key 和value序列化模式 template.setHashKeySerializer(new StringRedisSerializer()); template.setHashValueSerializer(jacksonSeial); template.afterPropertiesSet(); return template; } @Bean public RedisMessageListenerContainer container(RedisConnectionFactory redisConnectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); return container; } }
消息实体Message
import com.alibaba.fastjson.JSON; import lombok.Data; import java.util.UUID; @Data public class Message { private String id; private Integer retryCount; private String content; private Integer status; private String topic; public Message() { } public Message(String topic, Object object) { this.id = UUID.randomUUID().toString().replace("-", ""); this.retryCount = 0; this.content = JSON.toJSONString(object); this.status = 0; this.topic = topic; } }
Redis订阅管理,采用观察者模式。
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @Component public class TopicSubscriber { private final Map<String, Set<String>> subscriberMap = new HashMap(); @Autowired private RedisTemplate<String, Object> redisTemplate; // 观察者模式实现消费者注册。 public Boolean addConsumer(String topic, String consumer) { Set<String> consumerList = subscriberMap.get(topic); if (consumerList == null) { consumerList = new HashSet<>(); } Boolean b = consumerList.add(consumer); subscriberMap.put(topic, consumerList); return b; } public Boolean removeConsumer(String topic, String comsumer) { Set<String> consumerList = subscriberMap.get(topic); Boolean b = false; if (consumerList != null) { b = consumerList.remove(comsumer); subscriberMap.put(topic, consumerList); } return b; } //消息广播 public void broadcast(String topic, String id) { if (subscriberMap.get(topic) != null) { for (String consumer : subscriberMap.get(topic)) { String key = String.join("_", topic, consumer, id); if (!redisTemplate.hasKey("fail_" + key)) { redisTemplate.opsForValue().set(key, id); redisTemplate.opsForList().leftPush(topic + "_" + consumer, topic); } } } } }
然后是Redis发布者
import com.alibaba.fastjson.JSON; import com.redis.mq.subscriber.TopicSubscriber; import io.netty.util.CharsetUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class RedisPublisher { @Autowired private RedisTemplate<String, Object> redisTemplate; @Autowired TopicSubscriber subscriber; @PostConstruct public void init() throws Exception { // todo test thread /*new Thread(() -> { int count = 0; try { Thread.sleep(3000l); } catch (InterruptedException e) { e.printStackTrace(); } while (count < 14) { try { Thread.sleep(100l); Generate generate = new Generate(); generate.setIdNo("" + count); this.publish("GenerateLog", generate); count++; } catch (Exception e) { } } }).start();*/ } public void publish(String topic, Object content) { //消息发布到redis Message message = new Message(topic, content); subscriber.broadcast(topic, message.getId()); redisTemplate.getConnectionFactory().getConnection().publish( topic.getBytes(CharsetUtil.UTF_8), JSON.toJSONString(message).getBytes() ); } }
Redis消费者。实现MessageListener的onMessage就可以。为了易于扩展,这里使用了泛型。
import com.alibaba.fastjson.JSON; import com.cache.redis.mq.subscriber.TopicSubscriber; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.connection.MessageListener; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.listener.RedisMessageListenerContainer; import java.lang.reflect.ParameterizedType; import java.util.concurrent.TimeUnit; public abstract class RedisListener<T> implements MessageListener { @Autowired protected RedisTemplate<String, Object> redisTemplate; @Autowired protected RedisMessageListenerContainer messageListenerContainer; @Autowired protected TopicSubscriber subscriber; @Override public void onMessage(org.springframework.data.redis.connection.Message message, byte[] bytes) { String name = this.getClass().getSimpleName(); String topic = new String(message.getChannel()); String content = new String(message.getBody()); Message m = JSON.parseObject(content, Message.class); String key = String.join("_", topic, name, m.getId()); Object b = redisTemplate.opsForList().rightPop(topic + "_" + name); if (b != null && b.equals(m.getTopic())) { T t = JSON.parseObject(m.getContent(), ((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[0]); handler(t); // 处理redis消息。 // set data expire.使用redis的expire接口直接丢弃消费过的数据。 redisTemplate.expire(key, 1, TimeUnit.NANOSECONDS); } else { // todo retry redisTemplate.opsForValue().set("fail_" + key, content); } } protected abstract void handler(T t); }
到这里,基础的redisMq就差不多了。下面涉及具体的业务及批量插入。
首先,加一个logHander接口。
public interface LogHandler { void process(); }
写一个抽象类继承RedisListener并且实现LogHander。这里用到了redis的put和poll阻塞队列。
因为使用了mybatisplus又不想重新写mybatis foreach批量查询语句。所以这里偷懒直接用mybatis的sqlsession的单条预编译,批量commit。
import com.cache.redis.mq.RedisListener; import com.server.log.store.LogStore; import lombok.extern.slf4j.Slf4j; import org.apache.ibatis.session.ExecutorType; import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSessionFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.listener.ChannelTopic; import javax.annotation.PostConstruct; import java.lang.reflect.ParameterizedType; import java.util.List; @Slf4j public abstract class AbstractLogHandler<T, M> extends RedisListener<T> implements LogHandler { @Autowired SqlSessionFactory factory; @PostConstruct public void addListener() { messageListenerContainer.addMessageListener(this, new ChannelTopic(getTopic())); subscriber.addConsumer(getTopic(), this.getClass().getSimpleName()); process(); } @Override protected void handler(T t) { getStore().put(t); //阻塞直到能新写入。这里其实可以加个超时时间。避免一直阻塞。 } protected abstract String getTopic(); protected abstract LogStore<T> getStore(); protected void commit(List<T> data) { if (data == null || data.isEmpty()) return; SqlSession session = factory.openSession(ExecutorType.BATCH); try { M mapper = session.getMapper( (Class<M>) (((ParameterizedType) this.getClass().getGenericSuperclass()).getActualTypeArguments()[1]) ); save(data, mapper); session.commit(); } catch (Exception e) { log.error(String.format("topic %s 数据批量写入失败。{}", getTopic()), e); session.rollback(); }finally { session.close(); } data.forEach(o -> o = null); data.clear(); } protected abstract void save(List<T> data, M m); }
LogStore阻塞队列
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; @Slf4j public class LogStore<T> { private static final Integer QUEUE_CAPACITY = 10000; private BlockingQueue<T> logQueue; public LogStore() { this(QUEUE_CAPACITY); } public LogStore(int capacity) { this.logQueue = new LinkedBlockingQueue<>(capacity); } public void put(T t) { try { logQueue.put(t); } catch (InterruptedException e) { log.info("logStore put exception:{}", e); } } public T poll(long seconds) { try { return logQueue.poll(seconds, TimeUnit.SECONDS); } catch (InterruptedException e) { return null; } } }
到这里,基础的业务代码就写的差不多了。然后我们看下具体的业务处理类怎么写。
比如我们的注册日志,只要实现抽象类AbstraceLogHandler就可以了
import comcommon.constant.Constant; import com.common.po.RegLog; import com.dao.mapper.RegLogMapper; import com.server.log.store.LogStore; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @Component public class RegisterLogHandler extends AbstractLogHandler<RegLog, RegLogMapper> { private final LogStore<RegLog> store = new LogStore<>(); private String topic = Constant.TOPIC_REGISTER_LOG; // todo 可配置 private final Integer batchSize = 300; private final Integer waitSeconds = 2; ExecutorService executor = Executors.newSingleThreadExecutor(); @Override protected String getTopic() { return this.topic; } @Override protected LogStore<RegLog> getStore() { return this.store; } @Override public void process() { executor.execute(() -> { //开启线程从redis中poll数据。 List<RegLog> data = new ArrayList<>(batchSize); while (true) { RegLog generate = this.store.poll(waitSeconds); if (generate != null) { if (data.size() >= batchSize) { commit(data); } data.add(generate); } else { //处理不足batchSize的尾巴数据。 if (data.size() > 0) { commit(data); } } } }); } @Override protected void save(List<RegLog> data, RegLogMapper mapper) { data.forEach(o -> { if (o.getRegNo() == null) { String genNo = UUID.randomUUID().toString(); o.setRegNo(genNo); } mapper.insert(o); //因为不想写mybatis的foreach语句。所以这里直接用mybatisplus的insert单条语句。到这里sqlssesion并没有commit. }); } }
调用:
@Autowired protected RedisPublisher publisher; publisher.publish(Constant.TOPIC_REGISTER_LOG, log);
原文链接:https://www.cnblogs.com/braska/p/12936937.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- DES/3DES/AES 三种对称加密算法实现 2020-06-11
- 数据源管理 | Kafka集群环境搭建,消息存储机制详解 2020-06-11
- SpringBoot + Vue + ElementUI 实现后台管理系统模板 -- 后 2020-06-10
- Spring Boot 实现定时任务的 4 种方式 2020-06-10
- JSP+SSH+Mysql+DBCP实现的租车系统 2020-06-09
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash