基于OKHttp实现WebSocket客户端

新建WebSocketService类

新建接口类为了后续回调使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

/**
* WebSocket监听事件接口
*/
public interface IWebSocketServiceListener {

void onWSClosed(WebSocket webSocket, int code, String reason);

void onWSClosing(WebSocket webSocket, int code, String reason);

void onWSFailure(WebSocket webSocket, Throwable t, Response response);

void onWSMessage(WebSocket webSocket, String text);

void onWSMessage(WebSocket webSocket, ByteString bytes);

}

新建内部类继承WebSocketListener以及自定义回调接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
class MyWebSocketListener extends WebSocketListener {

@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
//自定义心跳检测
heartBeatTimer = new Timer();
heartBeatTimerTask = new TimerTask() {
@Override
public void run() {
webSocket.send("ping");
}
};
heartBeatTimer.schedule(heartBeatTimerTask, 0, 10000);
}

@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
webSocketServiceListener.onWSClosed(webSocket, code, reason);
}

@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
super.onClosing(webSocket, code, reason);
webSocketServiceListener.onWSClosing(webSocket, code, reason);
//失败重试
if(retryTimes<MAX_RETRY_TIMES){
connect();
retryTimes ++;
}
}

@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
super.onFailure(webSocket, t, response);
webSocketServiceListener.onWSFailure(webSocket, t, response);
//失败重试
// if(retryTimes<MAX_RETRY_TIMES){
// connect();
// retryTimes ++;
// }
}

@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
JSONObject json = JSONObject.parseObject(text);
if("ping".equals(json.getString("type"))){
heartBeat();
}
webSocketServiceListener.onWSMessage(webSocket, text);
}

@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
webSocketServiceListener.onWSMessage(webSocket, bytes);
}
}

完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
public class WebSocketService {

private static OkHttpClient mClient;
private WebSocket websocket;
private IWebSocketServiceListener webSocketServiceListener;
/**
* 重试次数
*/
private int retryTimes = 0;
/**
* 最大重试次数
*/
private static int MAX_RETRY_TIMES = 5;

public WebSocketService(IWebSocketServiceListener webSocketServiceListener){
this.webSocketServiceListener = webSocketServiceListener;
connect();
}

/**
* 建立websocket连接
*/
public void connect(){
if(mClient == null){
mClient = new OkHttpClient.Builder()
.readTimeout(5, TimeUnit.SECONDS)//设置读取超时时间
.writeTimeout(5, TimeUnit.SECONDS)//设置写的超时时间
.connectTimeout(5, TimeUnit.SECONDS)//设置连接超时时间
// .pingInterval(10, TimeUnit.SECONDS) //OkHttp自带心跳检测,如果有自定义的需要使用heartBeatTimer进行定时发送心跳检测
.build();
}
String url = "wss://...";
Request request = new Request.Builder().get().url(url).build();
websocket = mClient.newWebSocket(request, new MyWebSocketListener());
}

/**
* 建立websocket连接
*/
public void reConnect(){
close();
connect();
}

/**
* 发送消息
* @param msg
* @return
*/
public boolean sendMsg(String msg){
return websocket.send(msg);
}

public void close(){
if(websocket != null){
websocket.close(1001, "客户端主动关闭连接");
websocket = null;
}
}

class MyWebSocketListener extends WebSocketListener {

@Override
public void onOpen(WebSocket webSocket, Response response) {
super.onOpen(webSocket, response);
//自定义心跳检测
heartBeatTimer = new Timer();
heartBeatTimerTask = new TimerTask() {
@Override
public void run() {
webSocket.send("ping");
}
};
heartBeatTimer.schedule(heartBeatTimerTask, 0, 10000);
}

@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
webSocketServiceListener.onWSClosed(webSocket, code, reason);
}

@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
super.onClosing(webSocket, code, reason);
webSocketServiceListener.onWSClosing(webSocket, code, reason);
//失败重试
if(retryTimes<MAX_RETRY_TIMES){
reConnect();
retryTimes ++;
}
}

@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
super.onFailure(webSocket, t, response);
webSocketServiceListener.onWSFailure(webSocket, t, response);
//失败重试
// if(retryTimes<MAX_RETRY_TIMES){
// reConnect();
// retryTimes ++;
// }
}

@Override
public void onMessage(WebSocket webSocket, String text) {
super.onMessage(webSocket, text);
JSONObject json = JSONObject.parseObject(text);
if("ping".equals(json.getString("type"))){
heartBeat();
}
webSocketServiceListener.onWSMessage(webSocket, text);
}

@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
super.onMessage(webSocket, bytes);
webSocketServiceListener.onWSMessage(webSocket, bytes);
}
}
}

使用方法

首先在需要使用的类中实现IWebSocketServiceListener接口
然后初始化webSocketService

1
WebSocketService = new WebSocketService(this);

然后在接口方法中实现具体的业务逻辑就可以了


本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 协议 ,转载请注明出处!