【Spring Boot】集成Netty Socket.IO通讯框架
2019-01-11 08:35:57来源:博客园 阅读 ()
服务端
1 @Configuration 2 public class NettySocketConfig { 3 4 private static final Logger logger = LoggerFactory.getLogger(NettySocketConfig.class); 5 6 @Bean 7 public SocketIOServer socketIOServer() { 8 //创建Socket,并设置监听端口 9 com.corundumstudio.socketio.Configuration config = new com.corundumstudio.socketio.Configuration(); 10 // 设置主机名,默认是0.0.0.0 11 config.setHostname("192.168.8.107"); 12 // 设置监听端口 13 config.setPort(9096); 14 // 协议升级超时时间(毫秒),默认10000。HTTP握手升级为ws协议超时时间 15 config.setUpgradeTimeout(10000); 16 // Ping消息间隔(毫秒),默认25000。客户端向服务器发送一条心跳消息间隔 17 config.setPingInterval(60000); 18 // Ping消息超时时间(毫秒),默认60000,这个时间间隔内没有接收到心跳消息就会发送超时事件 19 config.setPingTimeout(180000); 20 // 这个版本0.9.0不能处理好namespace和query参数的问题。所以为了做认证必须使用全局默认命名空间 21 config.setAuthorizationListener(new AuthorizationListener() { 22 @Override 23 public boolean isAuthorized(HandshakeData data) { 24 // 可以使用如下代码获取用户密码信息 25 //String username = data.getSingleUrlParam("username"); 26 //String password = data.getSingleUrlParam("password"); 27 //logger.info("连接参数:username=" + username + ",password=" + password); 28 //ManagerInfo managerInfo = managerInfoService.findByUsername(username); 29 // 30 //String salt = managerInfo.getSalt(); 31 //String encodedPassword = ShiroKit.md5(password, username + salt); 32 //// 如果认证不通过会返回一个Socket.EVENT_CONNECT_ERROR事件 33 //return encodedPassword.equals(managerInfo.getPassword()); 34 35 return true; 36 } 37 }); 38 39 final SocketIOServer server = new SocketIOServer(config); 40 System.out.println("注入SocketIOServer"); 41 return server; 42 } 43 44 @Bean 45 public SpringAnnotationScanner springAnnotationScanner(SocketIOServer socketServer) { 46 return new SpringAnnotationScanner(socketServer); 47 } 48 }
1 @Component 2 public class MessageEventHandler { 3 4 private static final Logger logger = LoggerFactory.getLogger(MessageEventHandler.class); 5 6 /** 7 * 服务器socket对象 8 */ 9 public static SocketIOServer socketIoServer; 10 11 /** 12 * 客户端集合 13 */ 14 static ArrayList<UUID> listClient = new ArrayList<>(); 15 16 /** 17 * 超时时间 18 */ 19 static final int limitSeconds = 60; 20 21 @Autowired 22 public LoginService loginService; 23 24 /** 25 * 初始化消息事件处理器 26 * 27 * @param server 服务器socket对象 28 */ 29 @Autowired 30 public MessageEventHandler(SocketIOServer server) { 31 logger.info("初始化SOCKET消息事件处理器"); 32 this.socketIoServer = server; 33 } 34 35 /** 36 * 客户端发起连接时触发 37 * 38 * @param client 客户端Socket对象信息 39 */ 40 @OnConnect 41 public void onConnect(SocketIOClient client) { 42 logger.info("客户端{}已连接", client.getSessionId()); 43 listClient.add(client.getSessionId()); 44 } 45 46 /** 47 * 客户端断开连接时触发 48 * 49 * @param client 客户端Socket对象信息 50 */ 51 @OnDisconnect 52 public void onDisconnect(SocketIOClient client) { 53 logger.info("客户端{}断开连接", client.getSessionId()); 54 if (listClient.contains(client.getSessionId())) { 55 listClient.remove(client.getSessionId()); 56 } 57 } 58 59 60 /** 61 * 客户端发送消息时触发 62 * 63 * @param client 客户端Socket对象信息 64 * @param request AckRequest 回调对象 65 * @param data 消息信息实体 66 */ 67 @OnEvent(value = SocketConstants.SocketEvent.MESSAGE) 68 public void onEvent(SocketIOClient client, AckRequest request, MessageInfo data) { 69 System.out.println("发来消息:" + data.getMsgContent()); 70 socketIoServer.getClient(client.getSessionId()).sendEvent("messageevent", "back data"); 71 } 72 73 /** 74 * 效验连接事件并存储客户端信息 75 * 76 * @param client 客户端Socket对象信息 77 * @param data 客户端数据 78 * @param request AckRequest 回调对象 79 */ 80 @OnEvent(value = SocketConstants.SocketEvent.HEALTH_CHECK) 81 public void onEventByHealthCheck(SocketIOClient client, String data, AckRequest request) { 82 //logger.info("客户端{}效验连接请求", client.getSessionId()); 83 ////解析请求数据 84 //HealthCheckRequest healthCheckRequest = JSON.parseObject(data, HealthCheckRequest.class); 85 //if (healthCheckRequest != null) { 86 // //存储客户端信息 87 // SocketInstance instance = SocketInstance.getSocketInstance(); 88 // System.out.println(data); 89 // instance.insertSocketClient(healthCheckRequest.getEnCode(), client); 90 // logger.info("客户端{}效验连接响应:{}", client.getSessionId(), "OK"); 91 // //响应客户端 92 // request.sendAckData("OK"); 93 //} 94 } 95 96 /** 97 * 登录事件 98 * 99 * @param client 客户端Socket对象信息 100 * @param data 客户端数据 101 * @param request AckRequest 回调对象 102 */ 103 @OnEvent(value = SocketConstants.SocketEvent.LOGIN) 104 public void onEventByLogin(SocketIOClient client, String data, AckRequest request) { 105 logger.info("客户端{}登录请求:{}", client.getSessionId(), data); 106 AppResponseBase appResponseBase = new AppResponseBase(0, "通讯成功"); 107 //业务响应对象 108 LoginResponse loginResponse = null; 109 try { 110 //解析请求数据 111 LoginRequest loginRequest = JSON.parseObject(data, LoginRequest.class); 112 if (loginRequest == null) { 113 throw new AppException(AppResultCode.LoginAnalysis_Fail); 114 } 115 //调用登陆接口 116 loginResponse = loginService.appLogin(loginRequest); 117 if (loginResponse == null) { 118 throw new AppException(AppResultCode.LoginCloud_Fail); 119 } 120 if (EnumResult.Success.equals(loginResponse.getResultCode())) { 121 //保存客户端Socket信息 122 SocketInstance instance = SocketInstance.getSocketInstance(); 123 instance.insertSocketClient(loginRequest.getEnCode(), client); 124 } 125 } catch (AppException ex) { 126 loginResponse = new LoginResponse(ex.getAppResultCode().getCode(), ex.getAppResultCode().getMsg()); 127 } catch (Exception ex) { 128 loginResponse = new LoginResponse(AppResultCode.Exceptions.getCode(), AppResultCode.Exceptions.getMsg()); 129 ex.printStackTrace(); 130 } 131 appResponseBase.setRespData(loginResponse); 132 String result = JSON.toJSONString(appResponseBase); 133 logger.info("客户端{}登录响应:{}", client.getSessionId(), result); 134 //响应客户端 135 request.sendAckData(result); 136 } 137 138 /** 139 * 交易下单事件 140 * @param callPayRequest 下单请求信息实体 141 * @return 142 */ 143 public static String sendByPayEvent(CallPayRequest callPayRequest) { 144 String result = ""; 145 //获取客户端信息 146 SocketInstance instance = SocketInstance.getSocketInstance(); 147 SocketIOClient client = instance.getClientSocket(callPayRequest.getEnCode()); 148 if (client != null) { 149 //请求报文 150 String requestParam = JSON.toJSONString(callPayRequest); 151 //请求下单 152 client.sendEvent(SocketConstants.SocketEvent.PAY, new AckCallback<String>(String.class) { 153 @Override 154 public void onSuccess(String s) { 155 //响应信息 156 System.out.println("ack from client: " + client.getSessionId() + " data: " + s.toString()); 157 } 158 }, requestParam); 159 160 } else { 161 //客户端已断开连接 162 163 } 164 return result; 165 } 166 }
1 @Component 2 @Order(value = 1) 3 public class MyCommandLineRunner implements CommandLineRunner { 4 5 private final SocketIOServer server; 6 7 @Autowired 8 public MyCommandLineRunner(SocketIOServer server) { 9 System.out.println("初始化MyCommandLineRunner"); 10 this.server = server; 11 } 12 13 @Override 14 public void run(String... args) { 15 try { 16 server.start(); 17 System.out.println("socket.io启动成功!"); 18 } catch (Exception ex) { 19 ex.printStackTrace(); 20 } 21 } 22 }
1 public class SocketConstants { 2 3 /** 4 * Socket事件类 5 */ 6 public class SocketEvent { 7 8 /** 9 * 效验连接状况 10 */ 11 public static final String HEALTH_CHECK = "HEALTH_CHECK"; 12 13 /** 14 * 消息接收事件名称 15 */ 16 public static final String MESSAGE = "message"; 17 18 /** 19 * 登录事件名称 20 */ 21 public static final String LOGIN = "LOGIN"; 22 23 /** 24 * 获取交易要素事件名称 25 */ 26 public static final String QUERY_PAY_FIELDS = "QUERY_PAY_FIELDS"; 27 28 /** 29 * 创建订单事件名称 30 */ 31 public static final String CREATE_ORDER = "CREATE_ORDER"; 32 33 /** 34 * 监控订单状态事件名称 35 */ 36 public static final String CHECK_ORDER_STATUS = "CHECK_ORDER_STATUS"; 37 38 /** 39 * 获取订单事件名称 40 */ 41 public static final String QUERY_ORDER = "QUERY_ORDER"; 42 43 /** 44 * 支付事件名称 45 */ 46 public static final String PAY = "PAY"; 47 } 48 }
1 public class SocketInstance { 2 3 /** 4 * 客户端Socket连接对象容器 5 */ 6 private static Map<String, SocketIOClient> socketClients = null; 7 8 /** 9 * 私有构造 10 */ 11 private SocketInstance() { 12 //从缓存中获取socketClients 13 socketClients = new HashMap<>(); 14 } 15 16 /** 17 * 定义一个私有的内部类,在第一次用这个嵌套类时,会创建一个实例。而类型为SocketInstanceHolder的类,只有在SocketInstance.getSocketInstance()中调用, 18 * 由于私有的属性,他人无法使用SocketInstanceHolder,不调用SocketInstance.getSocketInstance()就不会创建实例。 19 * 优点:达到了lazy loading的效果,即按需创建实例。 20 * 无法适用于分布式集群部署 21 */ 22 private static class SocketInstanceHolder { 23 /** 24 * 创建全局唯一实例 25 */ 26 private final static SocketInstance instance = new SocketInstance(); 27 } 28 29 /** 30 * 获取全局唯一实例 31 * 32 * @return SocketInstance对象 33 */ 34 public static SocketInstance getSocketInstance() { 35 return SocketInstanceHolder.instance; 36 } 37 38 /** 39 * 新增客户端连接到容器 40 * 41 * @param encode 设备En号 42 * @param socketIOClient 客户端socket对象 43 */ 44 public void insertSocketClient(String encode, SocketIOClient socketIOClient) { 45 SocketIOClient oldSocketIOClient = socketClients.get(encode); 46 if (oldSocketIOClient != null) { 47 try { 48 //关闭客户端连接 49 oldSocketIOClient.disconnect(); 50 } catch (Exception ex) { 51 ex.printStackTrace(); 52 } 53 } 54 socketClients.put(encode, socketIOClient); 55 } 56 57 /** 58 * 获取客户端Socket对象 59 * 60 * @param encode 设备encode 61 * @return 客户端Socket对象 62 */ 63 public SocketIOClient getClientSocket(String encode) { 64 return socketClients.get(encode); 65 } 66 }
Android客户端
1 public class SocketClient { 2 3 /** 4 * 最大重连次数 5 */ 6 private int maxReConnectionCount = 5; 7 8 /** 9 * 重连次数 10 */ 11 private int reConnectionCount = 0; 12 13 /** 14 * 等待框对象 15 */ 16 private static ProgressDialog progressdialog; 17 18 /** 19 * 提示框 20 */ 21 private static AlertDialog.Builder dialogExitBuilder; 22 23 /** 24 * Toast提示对象 25 */ 26 private static Toast toast; 27 28 /** 29 * Socket客户端对象信息 30 */ 31 public static Socket socket; 32 33 /** 34 * 主页面对象,每个页面onCreate时必须设置,可在每个页面监控Socket连接状况 35 */ 36 public static Context nowContext; 37 38 /** 39 * Socket连接提示handler(等待框) 40 */ 41 Handler dialogMessageHandler = new Handler() { 42 @Override 43 public void handleMessage(Message msg) { 44 super.handleMessage(msg); 45 Bundle bundle = msg.getData(); 46 String message = bundle.getString(MessageUtil.MESSAGE); 47 setDialogMessage(message); 48 } 49 }; 50 51 /** 52 * Socket连接失败退出提示handler(提示框) 53 */ 54 Handler dialogExitHandler = new Handler() { 55 @Override 56 public void handleMessage(Message msg) { 57 super.handleMessage(msg); 58 Bundle bundle = msg.getData(); 59 String message = bundle.getString(MessageUtil.MESSAGE); 60 dialogExit(message); 61 } 62 }; 63 64 /** 65 * Socket连接提示handler(Toast) 66 */ 67 Handler toastMessageHandler = new Handler() { 68 @Override 69 public void handleMessage(Message msg) { 70 super.handleMessage(msg); 71 Bundle bundle = msg.getData(); 72 String message = bundle.getString(MessageUtil.MESSAGE); 73 showToast(message, Toast.LENGTH_SHORT); 74 } 75 }; 76 77 /** 78 * 等待框 79 * 80 * @param message 提示文字 81 */ 82 private static void setDialogMessage(String message) { 83 if (progressdialog == null) { 84 progressdialog = new ProgressDialog(nowContext); 85 } 86 progressdialog.setTitle("学通宝收银"); 87 progressdialog.setMessage(message); 88 progressdialog.setCancelable(false); 89 progressdialog.show(); 90 } 91 92 /** 93 * 退出提示框 94 * 95 * @param message 提示文字 96 */ 97 private void dialogExit(String message) { 98 //初始化退出builder 99 if (dialogExitBuilder == null) { 100 dialogExitBuilder = new AlertDialog.Builder(nowContext); 101 } 102 dialogExitBuilder.setMessage(message); 103 dialogExitBuilder.setTitle("提示"); 104 dialogExitBuilder.setIcon(R.mipmap.warning); 105 dialogExitBuilder.setPositiveButton("确认", new DialogInterface.OnClickListener() { 106 public void onClick(DialogInterface dialog, int which) { 107 dialog.dismiss(); 108 //参数用作状态码;根据惯例,非 0 的状态码表示异常终止。 109 System.exit(0); 110 } 111 }); 112 dialogExitBuilder.create().show(); 113 } 114 115 /** 116 * Toast消息提醒 117 * 118 * @param text 标题 119 * @param duration 时长 120 */ 121 public void showToast(String text, int duration) { 122 //只创建一次 123 if (toast == null) { 124 toast = Toast.makeText(nowContext, text, duration); 125 } else { 126 toast.setText(text); 127 toast.setDuration(duration); 128 } 129 toast.show(); 130 } 131 132 public void startSocket() throws URISyntaxException { 133 //初始化Socket配置 134 IO.Options options = new IO.Options(); 135 options.transports = new String[]{"websocket"}; 136 options.reconnectionAttempts = maxReConnectionCount; // 设置一个重连的最大尝试次数,超过这个值后Socket.io会使用所有允许的其他连接方式尝试重连,直到最终失败。 137 options.reconnectionDelay = 500; //为Socket.io的重连设置一个时间间隔,内部会在多次重连尝试时采用该值的指数值间隔,用来避免性能损耗(500 > 1000 > 2000 > 4000 > 8000) 138 options.reconnection = true; //当连接终止后,是否允许Socket.io自动进行重连 139 options.timeout = 9000; //连接超时时间(ms) 140 options.forceNew = true; 141 options.query = "appid=cn.xuetongbao.xtbpay"; 142 socket = IO.socket("http://192.168.8.107:9096/", options); 143 //连接成功 144 socket.on(Socket.EVENT_CONNECT, new Emitter.Listener() { 145 @Override 146 public void call(Object... args) { 147 //重连机制 148 if (reConnectionCount > 0) { 149 //连接存储客户端信息 150 DeviceInfoInstance instance = DeviceInfoInstance.getSocketInstance(); 151 HealthCheckRequest healthCheckRequest = new HealthCheckRequest(); 152 healthCheckRequest.setEnCode(instance.getDeviceInfo().getEnCode()); 153 socket.emit(SocketConstants.SocketEvent.HEALTH_CHECK, RequestUtil.createObject(healthCheckRequest), (Ack) args1 -> { 154 System.out.println("args1:" + args1.toString()); 155 }); 156 } 157 System.out.println("连接成功..."); 158 toastMessageHandler.sendMessage(MessageUtil.createMessage("服务器连接成功")); 159 //关闭等待框 160 if (progressdialog != null) { 161 progressdialog.dismiss(); 162 } 163 } 164 }); 165 166 //连接失败事件 167 socket.on(Socket.EVENT_CONNECT_ERROR, new Emitter.Listener() { 168 @Override 169 public void call(Object... args) { 170 System.out.println("Socket.EVENT_CONNECT_ERROR"); 171 System.out.println("reConnectionCount:" + reConnectionCount); 172 if (reConnectionCount >= maxReConnectionCount) { 173 dialogExitHandler.sendMessage(MessageUtil.createMessage("服务器连接失败,请稍后再试")); 174 } else { 175 dialogMessageHandler.sendMessage(MessageUtil.createMessage("服务器连接失败,正在重新连接...")); 176 } 177 } 178 }); 179 180 //连接中事件 181 socket.on(Socket.EVENT_RECONNECTING, new Emitter.Listener() { 182 @Override 183 public void call(Object... args) { 184 reConnectionCount++; 185 System.out.println("Socket.EVENT_RECONNECTING"); 186 dialogMessageHandler.sendMessage(MessageUtil.createMessage("正在连接服务器...")); 187 } 188 }); 189 190 //连接超时事件 191 socket.on(Socket.EVENT_CONNECT_TIMEOUT, new Emitter.Listener() { 192 @Override 193 public void call(Object... args) { 194 System.out.println("Socket.EVENT_CONNECT_TIMEOUT"); 195 if (nowContext != null) { 196 dialogMessageHandler.sendMessage(MessageUtil.createMessage("与服务器连接超时,正在重新建立连接...")); 197 socket.connect(); 198 } 199 } 200 }); 201 202 //心跳包 203 socket.on(Socket.EVENT_PING, new Emitter.Listener() { 204 @Override 205 public void call(Object... args) { 206 System.out.println("Socket.EVENT_PING"); 207 } 208 }); 209 //心跳包 210 socket.on(Socket.EVENT_PONG, new Emitter.Listener() { 211 @Override 212 public void call(Object... args) { 213 System.out.println("Socket.EVENT_PONG"); 214 } 215 }); 216 217 //消息接收事件 218 socket.on(Socket.EVENT_MESSAGE, new Emitter.Listener() { 219 @Override 220 public void call(Object... args) { 221 System.out.println("-----------接受到消息啦--------" + Arrays.toString(args)); 222 } 223 }); 224 225 //连接断开事件 226 socket.on(Socket.EVENT_DISCONNECT, new Emitter.Listener() { 227 @Override 228 public void call(Object... args) { 229 reConnectionCount = 0; 230 System.out.println("客户端断开连接啦。。。"); 231 if (nowContext != null) { 232 dialogMessageHandler.sendMessage(MessageUtil.createMessage("似乎与服务器断开连接,正在重新建立连接...")); 233 socket.connect(); 234 } 235 } 236 }); 237 238 //交易事件 239 socket.on(SocketConstants.SocketEvent.PAY, new Emitter.Listener() { 240 @Override 241 public void call(Object... args) { 242 Object data = args[0]; 243 Object ackCallBack = args[1]; 244 System.out.println("接收到服务端交易下单消息" + data); 245 CallPayRequest callPayRequest = JSON.parseObject(data.toString(), CallPayRequest.class); 246 if (callPayRequest != null) { 247 248 } 249 //data 250 CallPayResponse callPayResponse = new CallPayResponse(); 251 callPayResponse.setResultCode(AppResultCode.Success.getCode()); 252 callPayResponse.setResultMsg(AppResultCode.Success.getMsg()); 253 254 //响应服务端 255 ((Ack) ackCallBack).call(JSON.toJSONString(callPayResponse)); 256 } 257 }); 258 System.out.println("准备连接服务器..."); 259 socket.connect(); 260 } 261 }
注:仅供学习参考
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- Spring系列.ApplicationContext接口 2020-06-11
- springboot2配置JavaMelody与springMVC配置JavaMelody 2020-06-11
- 给你一份超详细 Spring Boot 知识清单 2020-06-11
- SpringBoot 2.3 整合最新版 ShardingJdbc + Druid + MyBatis 2020-06-11
- 掌握SpringBoot-2.3的容器探针:实战篇 2020-06-11
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash