手写MQ框架(三)-客户端实现

2019-11-24 16:01:38来源:博客园 阅读 ()

新老客户大回馈,云服务器低至5折

手写MQ框架(三)-客户端实现

一、背景

书接手写MQ框架(二)-服务端实现  ,前面介绍了服务端的实现。但是具体使用框架过程中,用户肯定是以客户端的形式跟服务端打交道的。客户端的好坏直接影响了框架使用的便利性。

虽然框架目前是通过web的形式提供功能的,但是某的目标其实是通过socket实现,所以不仅需要有客户端,还要包装一下,让用户在使用过程中不需要关心服务端是如何实现的。

简单来说,就是客户端使用必须方便。

二、客户端实现

1、HttpUtil

目前客户端的核心功能是HttpUtil这个类,使用httpClient实现的,主要是为了请求服务端。

具体实现如下:

package com.shuimutong.gmq.client.util;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.http.HttpEntity;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.shuimutong.gmq.client.bean.HttpResponseBean;
import com.shuimutong.gutil.common.GUtilCommonUtil;

/**
 * http请求工具类
 * @ClassName: HttpUtil
 * @Description:(这里用一句话描述这个类的作用)
 * @author: 水木桶
 * @date: 2019年10月29日 下午9:43:54
 * @Copyright: 2019 [水木桶] All rights reserved.
 */
public class HttpUtil {
    private final static Logger log = LoggerFactory.getLogger(HttpUtil.class);
    private static CloseableHttpClient HTTP_CLIENT = HttpClients.createMinimal();
    static {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                try {
                    HTTP_CLIENT.close();
                } catch (IOException e) {
                    log.error("HTTP_CLIENT-closeException", e);
                }
            }
        });
    }

    /**
     * get请求
     * 
     * @param url
     * @return
     * @throws IOException
     */
    public static HttpResponseBean get(String url) throws IOException {
        HttpResponseBean responseBean = null;
        HttpGet httpGet = new HttpGet(url);
        CloseableHttpResponse res = HTTP_CLIENT.execute(httpGet);
        try {
            HttpEntity httpEntity = res.getEntity();
            String body = EntityUtils.toString(httpEntity);
            responseBean = new HttpResponseBean(res.getStatusLine(), body);
            EntityUtils.consume(httpEntity);
        } finally {
            res.close();
        }
        return responseBean;
    }
    
    /**
     * 带参数的get请求
     * @param url
     * @param requsetParams
     * @return
     * @throws IOException
     * @throws URISyntaxException
     */
    public static HttpResponseBean get(String url, Map<String, String> requsetParams) throws IOException {
        HttpResponseBean responseBean = null;
        HttpGet httpGet;
        try {
            URIBuilder uriBuilder = new URIBuilder(url);
            if(!GUtilCommonUtil.checkListEmpty(requsetParams)) {
                List<NameValuePair> nvps = new ArrayList<NameValuePair>();
                requsetParams.forEach((k,v) -> {
                    nvps.add(new BasicNameValuePair(k, v));
                });
                uriBuilder.setParameters(nvps);
            }
            httpGet = new HttpGet(uriBuilder.build());
        } catch (Exception e) {
            throw new IOException(e);
        }
        CloseableHttpResponse res = HTTP_CLIENT.execute(httpGet);
        try {
            HttpEntity httpEntity = res.getEntity();
            String body = EntityUtils.toString(httpEntity);
            responseBean = new HttpResponseBean(res.getStatusLine(), body);
            EntityUtils.consume(httpEntity);
        } finally {
            res.close();
        }
        return responseBean;
    }

    /**
     * post请求
     * @param url
     * @param requsetParams
     * @return
     * @throws IOException
     */
    public static HttpResponseBean post(String url, Map<String, String> requsetParams) throws IOException {
        HttpResponseBean responseBean = null;
        HttpPost httpPost = new HttpPost(url);
        if(!GUtilCommonUtil.checkListEmpty(requsetParams)) {
            List<NameValuePair> nvps = new ArrayList<NameValuePair>();
            requsetParams.forEach((k,v) -> {
                nvps.add(new BasicNameValuePair(k, v));
            });
            httpPost.setEntity(new UrlEncodedFormEntity(nvps));
        }
        CloseableHttpResponse response = HTTP_CLIENT.execute(httpPost);
        try {
            HttpEntity httpEntity = response.getEntity();
            String body = EntityUtils.toString(httpEntity);
            responseBean = new HttpResponseBean(response.getStatusLine(), body);
            EntityUtils.consume(httpEntity);
        } finally {
            response.close();
        }
        return responseBean;
    }
}

 

封装了get请求和post请求,封装了响应结果。

加了一个钩子,在jvm关闭时能够主动关闭创建的资源。

2、订阅消息、生产消息

这两部分主要就是调用上面的HttpUtil,然后将结果包装一下。

具体代码请参考前文的git。

3、实例管理

为了使得用户不需要关心具体实现,所以建了实例管理类。

package com.shuimutong.gmq.client.util;

import com.shuimutong.gmq.client.cache.CommonObjCache;
import com.shuimutong.gmq.client.cache.impl.CommonObjCacheImpl;
import com.shuimutong.gmq.client.consumer.GmqConsumer;
import com.shuimutong.gmq.client.producer.GmqProducer;

public class GmqInstanceManage {
    public static GmqProducer getGmqProducer(String gmqServerUrl) {
        return new GmqProducer(gmqServerUrl);
    }
    
    public static GmqConsumer getGmqConsumer(String gmqServerUrl) {
        return new GmqConsumer(gmqServerUrl);
    }
    
    public static CommonObjCache getCommonCache(String serverUrl) {
        return new CommonObjCacheImpl(serverUrl);
    }
}

 

主要是为了封装变化。因为之后再迭代的话,实例的具体实现肯定不是目前这么简单,所以要尽量让使用者少关心具体实现。

使用时关心的越多,后续项目迭代肯定越困难。

三、使用示例

1、生产消息

@Test
    public void produceMsg() {
        GmqProducer producer = GmqInstanceManage.getGmqProducer(gmqServerUrl);
        for(int i=0; i<5; i++) {
            String message = "message:" + i;
            try {
                SendMqResult res = producer.sendMq(topic, message);
                System.out.println(res.getRes());
            } catch (SendMqException e) {
                e.printStackTrace();
            }
        }
    }

 

2、消费消息

主要思路是:消费消息之前,先查询当前已经消费到了哪条消息。消息消费之后,将消费的编号存入缓存。

典型的主动拉消息,消息是否消费由自己负责的模式。

实现如下:

@Test
    public void comsumerMsgByCache() {
        GmqConsumer comsumer = GmqInstanceManage.getGmqConsumer(gmqServerUrl);
        CommonObjCache commonCache = GmqInstanceManage.getCommonCache(gmqServerUrl);
        String gmqSign = "gmq_consumer_id";
        long consumerId = 0;
        int size = 2;
        for(int i=0; i<5; i++) {
            try {
                CacheObj cacheId = commonCache.getById(gmqSign);
                if(cacheId != null) {
                    consumerId = Long.parseLong(cacheId.getContent());
                }
                
                List<MqContent> res = comsumer.getMq(topic, consumerId, size);
                for(MqContent mq : res) {
                    System.out.println(JSONObject.toJSONString(mq));
                    if(mq.getId() > consumerId) {
                        consumerId = mq.getId();
                    }
                }
                commonCache.save(gmqSign, String.valueOf(consumerId));
                System.out.println("保存consumerId:" + consumerId);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

 

四、总结

gmq的初版至今已经完成,当然这只是开始。

后续计划先将gmvc框架替换掉,直接使用netty进行通信。

然后把消息存到数据库改为存到磁盘上。

然后就是服务的高可用改造。

届时欢迎指导。

第2版设计、开发中……

 

 

 

 

 

 

 

 

 

 


原文链接:https://www.cnblogs.com/shuimutong/p/11923420.html
如有疑问请与原作者联系

标签:

版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有

上一篇:RabbitMQ的简单模式快速入门与超时异常的处理方法

下一篇:Java程序在内存中运行详解