博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
webSocket-Java开发总结
阅读量:4042 次
发布时间:2019-05-24

本文共 10451 字,大约阅读时间需要 34 分钟。

websocket实现客户端跟服务端的双向传输,解决客户端向服务端轮训请求。应用到推送GPS位置信息,弹幕,聊天信息等场景。

一、Java服务端实现

maven依赖

org.springframework.boot
spring-boot-starter-websocket

核心服务代码

@Configurationpublic class WebSocketConfig {    @Bean    public ServerEndpointExporter serverEndpointExporter() {        return new ServerEndpointExporter();    }}

主要是OnOpen OnClose OnMessage OnError方法,里面再买上业务处理;

@Slf4j@ServerEndpoint(value = "/websocket/user/{accessToken}")@Component@EnableSchedulingpublic class WebSocketUserServer {    //静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。    private static AtomicInteger onlineCount = new AtomicInteger(0);    //concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。    private static CopyOnWriteArrayList
userSockets = new CopyOnWriteArrayList<>(); //与某个客户端的连接会话,需要通过它来给客户端发送数据 private Session session; /* 连接打开时执行 */ @OnOpen public void onOpen(@PathParam("accessToken") String accessToken , Session session) { open:{ addOnlineCount(); //在线数加1 this.session = session; log.info("WebSocketUserServer 有新连接加入!当前在线人数为" + getOnlineCount()); if(accessToken ==null || "".equals(accessToken)){ log.info("WebSocketUserServer onOpen accessToken is null"); closeSession(session); break open; } //校验accessToken,获取user信息 //获取service方法 UcTokenFeign ucTokenFeign = SpringUtil.getBean(UcTokenFeign.class) ; ObjectRestResponse
userResult = ucTokenFeign.getUserInfo(accessToken) ; if(userResult.getStatus() != CodeStatus.CODE_SUCCESS.getValue() ||userResult.getData() == null){ log.info("WebSocketUserServer onOpen token feign not success:"+userResult.getMsg()); closeSession(session); break open; } //添加到用户Session对应关系中 UserSocket userSocket = new UserSocket(); userSocket.setUserId(userResult.getData().getUserId()) userSocket.setWebSocketUserServer(this); userSockets.add(userSocket) ;// try {// sendMessage("server连接成功");// } catch (IOException e) {// log.error("websocket IO异常");// } log.info("WebSocketUserServer Connected ... " + session.getId()); } } /* 服务端不接收非合规的client,进行关闭操作 */ private void closeSession(Session session){ try { session.close(); } catch (IOException e) { log.error("WebSocketUserServer close error:"+e); e.printStackTrace(); } } /** * 连接关闭调用的方法 */ @OnClose public void onClose() { subOnlineCount(); //在线数减1 log.info("WebSocketUserServer 有一连接关闭!当前在线人数为" + getOnlineCount()); userSockets.stream() .forEach(u ->{ if(u.getWebSocketUserServer() == this){ userSockets.remove(u) ; log.info("WebSocketUserServer userSockets remove user socket,user:"+u.getUserId()); } } ); log.info("WebSocketUserServer 删除关闭连接的对应关系"); } /** * 收到客户端消息后调用的方法 * * @param message 客户端发送过来的消息*/ @OnMessage public void onMessage(String message, Session session) { log.info("WebSocketUserServer 来自客户端的消息:" + message);// //群发消息// for (WebSocketUserServer item : webSocketSet) {// try {// item.sendMessage(message);// } catch (IOException e) {// e.printStackTrace();// }// } } /** * * @param session * @param error */ @OnError public void onError(Session session, Throwable error) { log.error("WebSocketUserServer 发生错误:"+error); error.printStackTrace(); } /* 给客户端发送文本信息 */ public void sendMessage(String message) throws IOException { this.session.getBasicRemote().sendText(message); log.info("WebSocketUserServer sendMessage:"+message); } /* 发送用户状态信息 */ public void sendUserPushMsgInfo(String status) throws IOException{ this.session.getBasicRemote().sendText(status); log.info("WebSocketUserServer sendUserPushMsgInfo:"+status); } /* 根据userID给客户端推送用户状态 */ public static void PushMsgInfoToUser(String userId,String os ,String businessType,String status) throws IOException { if(status !=null && !"".equals(status)){ userSockets.stream() .forEach(u->{ if(u.getUserId().equals(userId)){ try { u.getWebSocketUserServer().sendUserPushMsgInfo(status); log.info("WebSocketUserServer PushMsgInfoToUser success,userId:"+userId); } catch (IOException e) { e.printStackTrace(); log.error("WebSocketUserServer PushMsgInfoToUser error,userId:"+userId); } } }); } } /* 定时检查存活的Session,如果未存活进行处理 */ @Scheduled(cron = "0 0/2 * * * ?") public static void checkAliveSession(){ log.info("WebSocketUserServer checkAliveSession start:"+new Date()); userSockets.stream() .forEach(u->{ if(!u.getWebSocketUserServer().session.isOpen()){ userSockets.remove(u) ; log.info("WebSocketUserServer checkAlive remove not open session,userId:"+u.getUserId()); } }); log.info("WebSocketUserServer checkAliveSession end:"+new Date()); } public static synchronized int getOnlineCount() { return onlineCount.get(); } public static synchronized void addOnlineCount() { WebSocketUserServer.onlineCount.getAndIncrement(); } public static synchronized void subOnlineCount() { WebSocketUserServer.onlineCount.getAndDecrement(); }}

二、websocket服务中引入service服务

采用辅助工具类SpringUtil获取Bean,

UcTokenFeign ucTokenFeign = SpringUtil.getBean(UcTokenFeign.class) ;
@Componentpublic class SpringUtil implements ApplicationContextAware {    private static ApplicationContext applicationContext;    @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        if(SpringUtil.applicationContext == null) {            SpringUtil.applicationContext = applicationContext;        }    }    //获取applicationContext    public static ApplicationContext getApplicationContext() {        return applicationContext;    }    //通过name获取 Bean.    public static Object getBean(String name){        return getApplicationContext().getBean(name);    }    //通过class获取Bean.    public static 
T getBean(Class
clazz){ return getApplicationContext().getBean(clazz); } //通过name,以及Clazz返回指定的Bean public static
T getBean(String name,Class
clazz){ return getApplicationContext().getBean(name, clazz); }}

三、websocket的分布式实现

1.session 放到redis中,实现数据共享,但是websocket session不支持序列号,存储不了

2.加入消息中间件,实现收到消息后的共享

consumer有两种消费模式:集群消费和广播消费。集群消费:多个consumer平均消费该topic下所有mq的消息,即某个消息在某个message queue中被一个consumer消费后,其他消费者就不会消费到它;广播消费:所有consumer可以消费到发到这个topic下的所有消息。

因为Session不支持序列化,nginx分发不能保证一定指定到同一台服务器,特别移动互联网,移动设备下。

故采用消息订阅模式进行实现。每台服务器都订阅相同主题的消息,接收到消息后,关联到session则进行推送消息。

CopyOnWriteArrayList
userSockets = new CopyOnWriteArrayList<>() 这里便是维护关系点

四、客户端调用的html测试代码

WebSocket Chat

WebSocket 聊天室1:

五、nginx配置

upstream websocket {       server 1.203.115.27:8877;       server 127.0.0.1:8877;       }    server {        listen 8888;        location / {            proxy_pass http://websocket;            proxy_http_version 1.1;            proxy_set_header Upgrade $http_upgrade;            proxy_set_header Connection "upgrade";        }    }

 六、zuul负载websocket

spring cloud zuul 1.x版本临时不直接支持spring websocket,到2.×版本会支持。

1.x版本也可以支持websocket,配置比较麻烦,需要结合sock.js stomp等。详细可以参考该网站:

七、断网、弱网、切换网等场景,实际使用的问题处理

待补充

八、websocket发送消息支持的长度大小

下面代码循环执行50000次,前端还可以接收到,不过已经需要好几秒的时间,这个时间延迟已经不低。

10W次也可以接收到,已经延迟到几分钟的程度。

循环次数  length

10000    33400

20000   66800
50000  16700011

100000 33400011

StringBuffer msg =new StringBuffer()  ;//                {"centerLat":"27.403234","centerLon":"117.504426","maxLat":"31.849878","maxLon":"121.434785","minLat":"22.956590","minLon":"113.574066","locations":[{"lon":"121.434785","lat":"31.849878","vehicleNo":"京AA8866","transportStatus":"0"},{"lon":"113.574066","lat":"22.956590","vehicleNo":"京ETYUUII","transportStatus":"0"}]}                for(int i=0;i<100000;i++){                    msg.append( "                {\"centerLat\":\"27.403234\",\"centerLon\":\"117.504426\",\"maxLat\":\"31.849878\"" +                            ",\"maxLon\":\"121.434785\",\"minLat\":\"22.956590\",\"minLon\":\"113.574066\"" +                            ",\"locations\":[{\"lon\":\"121.434785\",\"lat\":\"31.849878\",\"vehicleNo\":\"京AA8866\",\"transportStatus\":\"0\"}" +                            ",{\"lon\":\"113.574066\",\"lat\":\"22.956590\",\"vehicleNo\":\"京ETYUUII\",\"transportStatus\":\"0\"}]}\n"                    );                }                sendMessage("server连接成功;"+msg.toString());

九、支持SSL(https)

修改前端websocket连接代码,原本ws://需要改为wss://(购买或)生成密钥和证书,过程省略。需要注意的是:自己生成的证书在很多浏览器上会报警告,忽略后websocket仍然能用,如Chrom、Firefox,但有些浏览器不能用,如Safari。修改/etc/nginx/conf.d/mzsg.conf

upstream cat {       server 172.168.0.2:8080 weight=5;       server 172.168.0.2:7080 weight=5; }server {    listen       443;    server_name  testsocket;    access_log  /data/logs/nginx/test.access.log;    error_log /data/logs/nginx/test.error.log;    ssl on;    ssl_certificate   cert/214745699540016.pem;    ssl_certificate_key  cert/214745699540016.key;    ssl_session_timeout 5m;    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4;    ssl_protocols TLSv1 TLSv1.1 TLSv1.2;    ssl_prefer_server_ciphers on;    location / {        proxy_http_version 1.1;        proxy_set_header Upgrade $http_upgrade;        proxy_set_header Connection "upgrade";        proxy_set_header X-real-ip $remote_addr;        proxy_set_header X-Forwarded-For $remote_addr;        proxy_pass http://10.144.130.86:8877;    }}

 

 

转载地址:http://zradi.baihongyu.com/

你可能感兴趣的文章
拓扑结构相同子树练习题
查看>>
字符串空格替换练习题
查看>>
最长无重复字符子串练习题
查看>>
获取栈中最小值函数,时间复杂度为O(1)
查看>>
两个栈实现一个队列
查看>>
栈的反转
查看>>
栈的排序,栈顶元素最大.
查看>>
next数组计算.
查看>>
队列的滑动窗口最大值练习题.
查看>>
数组变树练习题
查看>>
打印两个链表的公共值练习题
查看>>
链表逆序问题
查看>>
链表所有为key的节点全部删除
查看>>
判断俩单链表是否相交
查看>>
前中后序遍历二叉树的非递归实现
查看>>
大数相乘,结果在2000位以内
查看>>
二叉树是否对称
查看>>
动态规划-找零钱
查看>>
动态规划-跳台阶
查看>>
动态规划-01背包问题
查看>>