Netty编解码技术(三)

编解码技术就是java序列化技术,序列化的目的就两个,第一进行网络传输,第二对象持久化。虽然我们可以使用java进行对象序列化,nettty去传输,但是java序列化的硬伤太多,比如java序列化没法跨语言、序列化后码流太大、序列化性能太低等。这里就要依赖主流的编解码框架:

  • JBoss的Marshalling包
  • google的Protobuf
  • 基于Protobuf的Kyro
  • MessagePack框架

JBoss Marshalling

JBoss Marshalling是一个对象序列化包,对JDK默认的序列化框架进行了优化,但由保持跟java.io.Serializable接口的兼容,同时增加了一些可调的参数和附加特性。类库:jboss-marshalling-x.x.x、jboss-marshalling-serial-x.x.x。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Server {

public static void main(String[] args) throws Exception{

EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup();

ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//设置日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ReadTimeoutHandler(5));
sc.pipeline().addLast(new ServerHandler());
}
});

ChannelFuture cf = b.bind(8765).sync();

cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully();

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ServerHandler extends ChannelHandlerAdapter{

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request = (Request)msg;
System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
Response response = new Response();
response.setId(request.getId());
response.setName("response" + request.getId());
response.setResponseMessage("响应内容" + request.getId());
ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}



}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public final class MarshallingCodeCFactory {

/**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
return decoder;
}

/**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public class Client {

private static class SingletonHolder {
static final Client instance = new Client();
}

public static Client getInstance(){
return SingletonHolder.instance;
}

private EventLoopGroup group;
private Bootstrap b;
private ChannelFuture cf ;

private Client(){
group = new NioEventLoopGroup();
b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
//超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
sc.pipeline().addLast(new ReadTimeoutHandler(5));
sc.pipeline().addLast(new ClientHandler());
}
});
}

public void connect(){
try {
this.cf = b.connect("127.0.0.1", 8765).sync();
System.out.println("远程服务器已经连接, 可以进行数据交换..");
} catch (Exception e) {
e.printStackTrace();
}
}

public ChannelFuture getChannelFuture(){

if(this.cf == null){
this.connect();
}
if(!this.cf.channel().isActive()){
this.connect();
}

return this.cf;
}

public static void main(String[] args) throws Exception{
final Client c = Client.getInstance();
//c.connect();

ChannelFuture cf = c.getChannelFuture();
for(int i = 1; i <= 3; i++ ){
Request request = new Request();
request.setId("" + i);
request.setName("pro" + i);
request.setRequestMessage("数据信息" + i);
cf.channel().writeAndFlush(request);
TimeUnit.SECONDS.sleep(4);
}

cf.channel().closeFuture().sync();


new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("进入子线程...");
ChannelFuture cf = c.getChannelFuture();
System.out.println(cf.channel().isActive());
System.out.println(cf.channel().isOpen());

//再次发送数据
Request request = new Request();
request.setId("" + 4);
request.setName("pro" + 4);
request.setRequestMessage("数据信息" + 4);
cf.channel().writeAndFlush(request);
cf.channel().closeFuture().sync();
System.out.println("子线程结束.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();

System.out.println("断开连接,主线程结束..");

}



}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class ClientHandler extends ChannelHandlerAdapter{

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Response resp = (Response)msg;
System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
} finally {
ReferenceCountUtil.release(msg);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Request implements Serializable{

private static final long SerialVersionUID = 1L;

private String id ;
private String name ;
private String requestMessage ;

//getter...
//setter...


}
1
2
3
4
5
6
7
8
9
10
11
12
13
public class Response implements Serializable{

private static final long serialVersionUID = 1L;

private String id;
private String name;
private String responseMessage;

//getter...
//setter...


}

Netty的WebSocket

WebSocket将网络套接字引入到了客户端和服务端,我们之前实现聊天功能,可能需要古老的Socket技术,亦或是古老的DWR框架,反向Ajax技术,,再有可能就是Comet服务器技术,H5的WebSocket很轻松的可以进行聊天功能的实现,Netty和H5的WebSocket结合非常简单,Netty为我们封装了其协议类,可以很方百你的使用,WebSocket特点:

  • 单一的TCP连接,双方可通信
  • 对代理、防火墙和路由器透明
  • 无头部信息、Cookie和身份验证
  • 无安全开销
  • 通过ping/pong帧保持链路激活
  • 服务器可主动传递消息给客户端,不再需要客户端轮询
-------------本文结束感谢您的阅读-------------