基于WebSocket实现微信小程序的消息推送

小程序端开发

微信小程序支持通过基于WebSocket进行消息推送,提供了相应的API,例如创建连接示例代码:

1
2
3
4
5
6
7
8
9
10
11
wx.connectSocket({
url: 'test.php',
data:{
x: '',
y: ''
},
header:{
'content-type': 'application/json'
},
method:"GET"
})

API详细说明可参见微信API文档,这里主要讲解服务端如何实现基于WebSocket的消息推送。

服务端开发

证书申请

服务端整体部署架构为Nginx+Tomcat,因为微信小程序要求使用WSS协议进行通信,因此,需要配置Nginx支持SSL,可以通过阿里云的证书服务申请免费证书:
阿里云证书申请
按照提示操作即可。

Nginx配置

证书申请成功后,在Nginx中配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
upstream websocket-server {
server 127.0.0.1:8080;
}

server {
listen 443;
server_name xxx.com;
ssl on;
ssl_certificate /usr/local/nginx/conf/server.pem;
ssl_certificate_key /usr/local/nginx/conf/server.key;
ssl_protocols TLSv1 TLSv1.1 TLSv1.2;

location ^~/websocket/ {
proxy_pass http://websocket-server;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_connect_timeout 3600s;
proxy_read_timeout 3600s;
proxy_send_timeout 3600s;
}
}

其中需要注意的是:

  1. 使用“proxy_set_header”设置转发后HTTP报文头的Upgrade和Connection属性;
  2. 使用“proxy_connect_timeout”、“proxy_read_timeout”、“proxy_send_timeout”设置连接超时时间,因为WebSocket是长连接,而Nginx反向代理默认超时时间是60秒,这里需要将超时时间设置长一些。

服务端配置

WebSocket服务是使用Spring Boot实现的。
配置类代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {

@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(onlineWebSocketHandler(), "/websocket").setAllowedOrigins("*").addInterceptors(onlineHandshakeInterceptor()).withSockJS();
}

@Bean
public WebSocketHandler onlineWebSocketHandler() {
return new PerConnectionWebSocketHandler(OnlineWebSocketHandler.class);
}

@Bean
public HandshakeInterceptor onlineHandshakeInterceptor() {
return new OnlineHandshakeInterceptor();
}

@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}

}

其中通过WebSocketHandlerRegistry实例注册WebSocket处理器,并注册拦截器进行权限控制。需要注意的是,在创建处理器时,使用了PerConnectionWebSocketHandler进行了封装,保证每个WebSocket连接使用一个处理器进行处理,这样处理器就可以是有状态的。

服务端连接建立和消息推送

WebSocket处理器代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class OnlineWebSocketHandler extends TextWebSocketHandler {

private static Logger logger = LoggerFactory.getLogger(OnlineWebSocketHandler.class);

private static Map<Integer, WebSocketSession> sessionMap = Maps.newConcurrentMap();

private Integer userId;
private WebSocketSession session;

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
this.userId = (Integer) session.getAttributes().get(Constant.USER_ID);
this.session = session;
sessionMap.put(this.userId, this.session);
logger.info("add session, userId={}, total={}", this.userId, sessionMap.size());
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status)
throws Exception {
sessionMap.remove(this.userId);
logger.info("remove session, userId={}, total={}", this.userId, sessionMap.size());
}

protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
logger.info(message.getPayload());
}

public static void sendMessage(Integer userId, String message) throws IOException {
if (sessionMap.containsKey(userId)) {
sessionMap.get(userId).sendMessage(new TextMessage(message));
}
}

}

其中,实现了WebSocketHandler接口的afterConnectionEstablished和afterConnectionClosed方法,在afterConnectionEstablished方法中,当WebSocket连接创建后,将连接所对应处理器实例的userId和session注册到静态Map对象中,在 afterConnectionClosed中,当WebSocket连接关闭后,将连接所对应处理器实例的userId和session从静态Map对象中删除。OnlineWebSocketHandler提供了一个静态方法sendMessage,通过调用这个方法,可以向某个指定用户推送消息,其内部是根据userId从静态Map对象查找session,并调用session的sendMessage方法推送消息。

服务端权限校验

WebSocket拦截器代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class OnlineHandshakeInterceptor implements HandshakeInterceptor {

private static Logger logger = LoggerFactory.getLogger(OnlineHandshakeInterceptor.class);

@Autowired
private UserService userService;

@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
HttpServletRequest httpRequest = ((ServletServerHttpRequest) request).getServletRequest();
String userIdStr = httpRequest.getParameter(Constant.USER_ID);
String session = httpRequest.getParameter(Constant.SESSION);
if (StringUtils.isBlank(userIdStr) || StringUtils.isBlank(session)) {
logger.info("userId and session needed, now useId={}, session={}", userIdStr, session);
throw new BaseException(BaseExceptionEnum.NOT_ALLOWED);
}
int userId = Integer.valueOf(userIdStr);
if (!userService.checkLogin(userId, session)) {
throw new BaseException(BaseExceptionEnum.NOT_ALLOWED);
}
attributes.put(Constant.USER_ID, userId);
return true;
}

@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {

}

}

其中,实现了HandshakeInterceptor接口的beforeHandshake方法,在连接创建开始时,从request中获取url参数userId和session进行权限判断。

服务端启动

Spring Boot启动类代码如下:

1
2
3
4
5
6
7
8
9
@SpringBootApplication
@EnableWebMvc
@EnableTransactionManagement
@EnableWebSocket
public class Application {

public static void main(String[] args) throws Exception {
SpringApplication.run(Application.class, args);
}

通过EnableWebSocket注解开启WebSocket。