logo头像

路漫漫兮其修远兮

消息推送

1 什么是消息推送

很多手机APP会不定时的给用户推送消息,例如一些新闻APP会给用户推送用户可能感兴趣的新闻,或者APP有更新了,会给用户推送是否选择更新的消息等等,这就是所谓的“消息推送”。

对于APP或者桌面客户端这种C/S架构的软件,实现消息推送其实比较简单,只需要维护TCP连接就行了,因为TCP本身是全双工的,客户端和服务端都能发送消息。但Web环境就不太一样了,目前的Web软件大多数都是B/S架构(即浏览器/服务端),使用的消息传输协议也大多数是HTTP,HTTP1.0和HTTP1.1都无法实现服务端向客户端(浏览器)主动发送消息,所以实现的手段主要就是客户端定时或者不定时轮询(例如间隔时间动态变化),这种方式实现并不算复杂,最大的问题就是性能,轮询是需要消耗CPU资源的,如果很长一段时间内,服务端都没有消息要给客户端,那么这个CPU空轮询的占比就比较大了,而且轮询也会对服务端造成压力,因为如果服务端没有消息要给客户端,那么其实这样的“请求-响应”是没有意义的,算是服务端的额外压力。

可能有朋友难以理解上面所描述的情况,下面我画个图来描述这个问题:

FudDcn.png

可以从图中看到,客户端老是不断的孜孜不倦的跑去问服务端“有没有新的推送消息”,而且还不长记性,每次都问同样的问题(HTTP是无状态的协议),这事给谁谁都得烦,是吧,服务端老被客户端“骚扰”,所以有时候就会“罢工”不干了!(服务端压力过大,短暂不可用)那怎么解决这个问题呢?也就是说让服务端过得舒服一些?

2 解决方案

目前主要有两种主流的解决方案:

  • 利用新版的HTTP2.0,重构原有的代码,因为HTTP2.0支持服务端主动发送数据给客户端,但这种方案实现其实是比较困难的,一是服务端实现起来并不简单,而是一般也不用来推送大量的数据,常见的使用场景是请求.html,然后服务端把HTML作为响应给前端,并且同时把CSS,JS文件都“推”给前端(在传统的HTTP中,要拿到这三个东西,至少需要三次HTTP请求)。本文不讨论该方案。
  • 利用WebSocket协议,WebSocket是一种在浏览器环境下可以全双工通信的应用层协议(等会儿会给出简单介绍)。这种方案就是利用WebSocket的全双工通信的特性,使得B/S架构的软件也能像C/S架构的软件那样简单的实现消息推送。本文主要介绍的就是这种解决方案。

3 WebSocket协议

WebSocket和HTTP,FTP等一样,都属于应用层协议,诞生于2008年,与2011年称为国际标准(说实话,这段时间真的很短,说明WebSocket确实解决了一些问题)。下面是维基百科上的定义:

WebSocket是一种在单个TCP连接上进行全双工通信的协议。WebSocket通信协议于2011年被IETF定为标准RFC 6455,并由RFC7936补充规范。WebSocket API也被W3C定为标准。

WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

其优点也不少(这里所提到的优点是相对于HTTP来说的):

  • 支持全双工通信,这对于某些场景尤其重要,例如消息推送。
  • 更好的二进制支持,HTTP本身是文本传输协议,对于二进制的数据需要特殊处理(不过2.0版本也对二进制做了补充),而WebSocket定义了二进制帧,所以传输二进制数据的时候不需要像HTTP那样特殊处理。
  • 较少的控制开销。连接创建后,ws客户端、服务端进行数据交换时,协议控制的数据包头部较小。在不包含头部的情况下,服务端到客户端的包头只有2~10字节(取决于数据包长度),客户端到服务端的的话,需要加上额外的4字节的掩码。而HTTP协议每次通信都需要携带完整的头部。
  • ……………

那WebSocket连接是如何建立的呢?答案是:握手协议。

在真正建立WebSocket之前,会先建立一个HTTP连接,然后服务端响应状态码101,表示切换协议,之后通信协议会升级成WebSocket,这样WebSocket连接才算是建立起来。下面是一个WebSocket握手的示例:

客户端请求:

1
2
3
4
5
6
7
8
GET / HTTP/1.1
#Upgrade就表示要把协议升级成WebSocket
Upgrade: websocket
Connection: Upgrade
Host: example.com
Origin: http://example.com
Sec-WebSocket-Key: sN9cRrP/n9NdMgdcy2VJFQ==
Sec-WebSocket-Version: 13

服务端响应:

1
2
3
4
5
6
#客户端收到该HTTP报文之后,会将通信协议升级成WebSocket,之后的数据传输就都使用WebSocket了
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: fFBooB7FAkLlXgRSz0BT3v4hq5s=
Sec-WebSocket-Location: ws://example.com/

这就是所谓的握手了(双方建立友好关系)。

4 利用Netty + WebSocket实现消息推送

在动手之前,先声明一点:本文不会介绍WebSocket的简单使用,因为本文的标题是“消息推送”,而不是“WebSocket入门”,Netty也是同理。

首先,我们先确定一些设计方案:

  1. 客户端和服务端使用WebSocket作为通信协议,当服务端有新的推送消息的时候,主动把消息“推”给客户端。
  2. Netty作为网络通信的基础框架。
  3. 服务端监听消息队列,当消息队列中有新的消息时,把消息发送给客户端。
  4. 可以还有另外一个专门往消息队列里放入消息的服务,至此整个系统就形成一个完整的消息推送系统了。

大致了解了方案之后,可以着手实现了,先来看看服务端的实现:

服务启动类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
public class WebSocketServer {

//RabbitMQ客户端连接工厂
private static ConnectionFactory connectionFactory = new ConnectionFactory();

//客户端连接
private static Connection connection;

//客户端Channel
private static com.rabbitmq.client.Channel channel;

//Jackson,序列化用的
private static ObjectMapper objectMapper = new ObjectMapper();


public static void main(String[] args) {
//初始化ServerBootstrap
ServerBootstrap server = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();

server.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new HttpServerCodec());
ch.pipeline().addLast(new HttpObjectAggregator(16 * 16 * 1024));
ch.pipeline().addLast(new ChunkedWriteHandler());
ch.pipeline().addLast(new WebSocketServerHandler());
}
});

//初始化RabbitMQ的配置
connectionFactory.setHost("xxx.xxx.xxx.xxx");
connectionFactory.setUsername("xxx");
connectionFactory.setPassword("xxx");

try {
connection = connectionFactory.newConnection();
channel = connection.createChannel();
channel.queueDeclare(RabbitMQConfig.PUSH_MSG_QUEUE, false, false, true, null);
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}

//服务端绑定8081端口
server.bind(8081).syncUninterruptibly().addListener(future -> {
if (future.isSuccess()) {
System.out.println("绑定成功");
startPushMessage();
startMQListener();
} else {
System.out.println("绑定失败");
}
});


}

//开启消息队列的监听,当有消息的时候,就把消息推送给客户端
private static void startMQListener() {
new Thread(() -> {
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
//这里就比较粗暴的获取ChannleGroup了,建议读者尝试的时候用更好的方法
ChannelGroup group = WebSocketServerHandler.group;
if (group != null) {

group.writeAndFlush(new TextWebSocketFrame(message));
}
}


};
System.out.println("开始监听");
try {
channel.basicConsume(RabbitMQConfig.PUSH_MSG_QUEUE, true, consumer);
} catch (IOException e) {
e.printStackTrace();
}
}).start();
}

private static final AtomicLong id = new AtomicLong(0);

//随机生成一个消息
private static Notify generateNotify() {
Notify notify = new Notify();
notify.setId(id.getAndIncrement());
notify.setTitle(UUID.randomUUID().toString());
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 500; i++) {
builder.append(getRandomChar());
}
notify.setContent(builder.toString());
notify.setPushTime(new Date());
return notify;
}

private static char getRandomChar() {
String str = "";
int hightPos; //
int lowPos;

Random random = new Random();

hightPos = (176 + Math.abs(random.nextInt(39)));
lowPos = (161 + Math.abs(random.nextInt(93)));

byte[] b = new byte[2];
b[0] = (Integer.valueOf(hightPos)).byteValue();
b[1] = (Integer.valueOf(lowPos)).byteValue();

try {
str = new String(b, "GBK");
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
System.out.println("错误");
}

return str.charAt(0);
}


//这里我不另外写专门的生产消息的服务了,直接定时的往消息队列里放入消息
private static void startPushMessage() {
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
service.scheduleAtFixedRate(() -> {
try {
Notify notify = generateNotify();
String message = objectMapper.writeValueAsString(notify);
channel.basicPublish("", RabbitMQConfig.PUSH_MSG_QUEUE, null, message.getBytes());
} catch (IOException e) {
e.printStackTrace();
}
}, 5, 5, TimeUnit.SECONDS);
}
}

WebSocketHandler类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {

//WebSocket握手
private static WebSocketServerHandshaker handshaker;

//客户端的群组
public static ChannelGroup group;

//客户端在线人数
private static AtomicLong onlineCount = new AtomicLong(0);

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
onlineCount.incrementAndGet();
System.out.println("有用户上线,当前在线人数是: " + onlineCount.get());
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//第一次请求肯定是HTTP请求,所以先去处理HTTP请求,在该处理方法里做WebSocket握手的操作
if (msg instanceof FullHttpRequest) {
handlerHttpRequest(ctx, (FullHttpRequest)msg);
}

//能到这,肯定是连接成功了的
if (onlineCount.get() == 1) {
//创建群组
group = new DefaultChannelGroup(ctx.executor());
group.add(ctx.channel());
} else {
group.add(ctx.channel());
}

//之后的请求就都是WebSocket帧了,不过对于我们的系统来说,这倒不是特别主要的
//之所以还要处理,是为了处理客户端主动关闭连接的情况以及维持心跳
if (msg instanceof WebSocketFrame) {
handlerWebSocketFrame(ctx, (WebSocketFrame)msg);
}
}



private void handlerHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
//如果request解析失败或者upgrade不是websocket,那么就直接发送BAD_REQUEST状态即可
if (!request.decoderResult().isSuccess()
|| !"websocket".equals(request.headers().get("upgrade"))) {
try {
sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
return;
}

//如果一切正常,那么就开始进行WebSocket握手
WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory("ws://localhost:8081/ws", null, false);
handshaker = factory.newHandshaker(request);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
//这里就是握手操作了
handshaker.handshake(ctx.channel(), request);
}
}

private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request,
FullHttpResponse response) throws UnsupportedEncodingException {
if (response.status().code() != 200) {
ByteBuf buf = ctx.alloc().buffer();
buf.writeBytes("发生错误哦".getBytes("utf-8"));
response.content().writeBytes(buf);
buf.release();
}

ChannelFuture future = ctx.channel().writeAndFlush(response);
if (request.headers().get("Keep-Alive") == null) {
future.addListener(ChannelFutureListener.CLOSE);
}
}


private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
if (frame instanceof CloseWebSocketFrame) {
//如果该帧是CloseWebSocketFrame类型的,也就是说客户端主动关闭连接
//那么就做相应的处理
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
group.remove(ctx.channel());
onlineCount.decrementAndGet();
System.out.println("有用户下线,当前在线人数是: " + onlineCount.get());
return;
}

//WebSocket的客户段会发送心跳数据包,返回PongWebSocketFrame就行了
if (frame instanceof PingWebSocketFrame) {
ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
return;
}

//本系统只支持本文数据
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException("不支持该类型消息");

}
//向所有在线的用户发送消息
group.writeAndFlush(new TextWebSocketFrame(LocalDateTime.now().toString()));
}
}

不知道各位注意到没有,我们的系统中不存在WebSocketFrame的编解码器,熟悉Netty的朋友应该知道,如果真的没有WebSocketFrame的编解码器的话,我们的系统是无法处理WebSocket传输的数据的。其实Netty在进行WebSocket握手的时候,就自动的帮我们添加了编解码器,如下是handshaker.handshake(ctx.channel(), request)的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public ChannelFuture handshake(Channel channel, FullHttpRequest req) {
return handshake(channel, req, null, channel.newPromise());
}

public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
HttpHeaders responseHeaders, final ChannelPromise promise) {

if (logger.isDebugEnabled()) {
logger.debug("{} WebSocket version {} server handshake", channel, version());
}
FullHttpResponse response = newHandshakeResponse(req, responseHeaders);
ChannelPipeline p = channel.pipeline();
if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class);
}
if (p.get(HttpContentCompressor.class) != null) {
p.remove(HttpContentCompressor.class);
}
ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);
final String encoderName;
if (ctx == null) {
// this means the user use a HttpServerCodec
ctx = p.context(HttpServerCodec.class);
if (ctx == null) {
promise.setFailure(
new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
return promise;
}
//就是这里了,加入默认的编解码器
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
encoderName = ctx.name();
} else {
p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());

encoderName = p.context(HttpResponseEncoder.class).name();
p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());
}
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
p.remove(encoderName);
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}

服务端完事了,接下来看看客户端的代码,其实就是前端代码了(代码是我网上直接抄的):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket测试</title>
</head>
<body>
<h1>WebSocket测试</h1>
<div id="context"></div>
<script src="https://code.jquery.com/jquery-3.3.1.min.js"></script>
<script type="application/javascript">
var websocket = null;

// 判断当前浏览器是否支持WebSocket
if ('WebSocket' in window) {
// 创建WebSocket 对象,连接服务器端点
websocket = new WebSocket("ws://localhost:8081/ws");
} else {
alert('您的浏览器不支持websocket');
}

// 连接发生错误的回调方法
websocket.onerror = function() {
appendMessage ("WebSocket连接失败");
}

// 连接成功建立的回调方法
websocket.onopen = function(event) {
appendMessage ("WebSocket连接成功");
}

// 接收到消息的回调方法
websocket.onmessage = function (event) {
console.log("收到消息")
jsonObject = JSON.parse(event.data)
console.log(jsonObject)
appendMessage(jsonObject.title);
}

websocket.onclose = function() {
appendMessage("关闭连接");
}

websocket.onbeforeupload = function() {
websocket.close();
}

function appendMessage(message) {
var context = $('#context').html() + '<br>' + message;
$('#context').html(context);
}

function closeWebSocket() {
websocket.close();
}

function sendMessage() {
var message = $('#message').val();
websocket.send(message);
}
</script>
</body>
</html>

接下来启动服务,然后直接打开该文件,应该就能看到效果了,如下所示:

FurGo6.png

你也可以多打开几个客户端试试,会发现消息会传递给每个客户端了,而且这期间不存在什么客户端主动请求的情况,即对于客户端来说,这些个消息就好像“天上掉下来”的一样,这就简单实现了消息推送系统,现在再来看看客户端和服务端的通信情况:

Furfyj.png

对比上面的那张图,是不是觉得更加“清爽”了?现在不再需要在客户端不断去轮询,去骚扰服务端了,当有新的推送消息的时候,服务端就主动的把消息“推”给客户端了,这样服务端的压力也减少了很多,客户端的CPU也不用一直做没有意义的事了。

补充(于2019-01-14)

其实上面的WebSocketServerHandler可以实现得更加简单(我也是今天才知道),Netty自带了一个WebSocketServerProtocolHandler类,该类可以帮助我们完成握手操作,这样我们就不需要自己手动的编写握手操作了。先往pipiline里添加该handler,如下所示:

1
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));  //字符串即websocket的Path

之后重写WebSocketServerHandler,这时候就非常简单了,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

private static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
System.out.println("服务端收到消息:" + msg.text());
group.writeAndFlush(new TextWebSocketFrame(LocalDateTime.now().toString() + " : " +msg.text()));
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
group.remove(ctx.channel());
System.out.println("用户退出,id是:" + ctx.channel().id().asLongText());
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
group.add(ctx.channel());
System.out.println("用户登录,id是:" + ctx.channel().id().asLongText());
}
}

注意,这里SimpleChannelInboundHandler里的泛型类型我写的是TextWebSocketFrame(实际上,比较好的方式是写WebSocketFrame),可以这么写的原因是WebSocketServerProtocolHandler其实同时是个编解码器,解码后的对象类似是WebSocketFrame。如果处理逻辑复杂,也可以写多个Handler用来处理其他类型WebSocketFrame,这就因人而异了。除此之外,原来代码中的用户登录和用户退出的功能就无法那样实现了,取而代之的是利用Channel的生命周期方法,channelActive和channelInactive,其实这样实现更加简单,语义也更加明确了。其他部分的代码可以不做改动,应该不会出现什么问题。

5 Spring Boot + WebSocket实现消息推送

我还想介绍一种实现方案,这种方案相较于Netty的实现更加的简单,那就是利用Spring 对WebSocket的支持来实现。就不多说废话了,直接来看实现吧:

建立好Spring Boot项目之后,加入如下依赖(maven):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<!--RabbitMQ的支持-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<!--web mvc的支持-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<!--websocket的支持-->
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

之后来做两个配置,一是配置RabbitMQ,而是配置WebSocket:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
//RabbitMQ配置
@Configuration
public class RabbitMQConfig {

//三个分别是队列,交换器以及路由键
public static final String PUSH_MSG_EXCHANGE = "push_msg_exchange";

public static final String PUSH_MSG_QUEUE = "push_msg_queue";

public static final String PUSH_MSG_ROUTE_KEY = "push_msg.direct";

@Bean
public DirectExchange pushMsgExchange() {
return new DirectExchange(PUSH_MSG_EXCHANGE, true, true);
}

@Bean
public Queue pushMsgQueue() {
return new Queue(PUSH_MSG_QUEUE, true, false, true);
}

//将队列和交换器绑定
@Bean
public Binding pushMsgBinding() {
return BindingBuilder.bind(pushMsgQueue()).to(pushMsgExchange()).with(PUSH_MSG_ROUTE_KEY);
}
}

//WebSocket配置
@Configuration
public class WebSocketConfig {

//只需要配置ServerEndpointExporter这个Bean就行了
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}

接下来就是实现类了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@ServerEndpoint("/ws")
@Service
public class WebSocketService {

private static Set<WebSocketService> webSocketServiceSet = new CopyOnWriteArraySet<>();

private Session session;

private static AtomicLong onlineCount = new AtomicLong(0);

@OnOpen
public void onOpen(Session session) {
this.session = session;
onlineCount.incrementAndGet();
webSocketServiceSet.add(this);
System.out.println("有用户上线,当前在线人数有:" + onlineCount.get());
}

@OnClose
public void onClose() {
webSocketServiceSet.remove(this);
onlineCount.decrementAndGet();
System.out.println("有用户下线,当前在线人数有: " + onlineCount.get());
}

@OnMessage
public void onMessage(Session session, String message) {
System.out.println("来自客户端的消息,客户端IP:PORT是 : ");
System.out.println(session.getRequestURI().getHost() + ":" + session.getRequestURI().getPort());
System.out.println("消息是: " + message);
}

@OnError
public void onError(Throwable throwable) {
System.out.println("服务端异常");
throwable.printStackTrace();
}

private void sendMessage(String message) {
try {
this.session.getBasicRemote().sendText(message);
} catch (IOException e) {
e.printStackTrace();
}
}

@RabbitListener(queues = RabbitMQConfig.PUSH_MSG_QUEUE)
public void ListenMessageFromMQ(String message) {
for (WebSocketService webSocketService : webSocketServiceSet) {
//消息推送了,向每个在线客户端发送消息
webSocketService.sendMessage(message);
}
}
}

对于每个客户端,都会有一个与之对应的WebSocketService实例对象以及Session,就好像Netty里的Channel一样(只是一个比喻,并不等同),所以用一个静态的Set来保存这些实例对象,当需要发送消息的时候,直接取出来,调用Session的发送消息的方法就行了。

好了,就是那么简单,几个注解@OnOpen,@OnMessage,@OnClose,@OnError,听名字应该就知道啥意思了吧,不多说了,相比于Netty的实现,简单的太多了。不过最后还差一点,消息从哪来呢?和之前一样,我这里开启定时任务,定时的往消息队列里塞消息就行了,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class MyTask {


@Autowired
private RabbitTemplate rabbitTemplate;

@Scheduled(cron = "*/5 * * * * ?")
public void sendMessageToMQ() {
//这里的消息我就隐编码了,实际上可以有多种方式来构造消息
String message = "Hello, Websocket!!!";
rabbitTemplate.convertAndSend(RabbitMQConfig.PUSH_MSG_EXCHANGE,
RabbitMQConfig.PUSH_MSG_ROUTE_KEY,
message);
}
}

最后,别忘了应用主类上加入@EnableScheduling,否则定时任务不会生效。这里的实现效果和之前的实现几乎一样,客户端也不需要修改什么,就不多说了。

6 小结

消息推送系统是一个用途广泛的系统,本文简单介绍了两种实现方法,分别是Netty+WebSocket和Spring Boot+WebSocket,后者其实是基于Servlet实现的,所以性能上和Netty还是有一些差异的。不过无论哪种实现吧,最核心的部分还是WebSocket协议,

有些代码写的不太合理,望谅解。

7 参考资料

《Netty 权威指南》WebSocket相关章节

Spring Boot 使用 WebSocket 实现消息推送 及 WebSocket原理

WebSocket 教程