peace唠叨

netty入门学习

本文简单介绍下netty。Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

全部代码下载:Github链接:github链接,点击惊喜;写文章不易,欢迎大家采我的文章,以及给出有用的评论,当然大家也可以关注一下我的github;多谢;

1.netty介绍:

Netty 是一个异步的,事件驱动的网络编程框架和工具,使用Netty 可以快速开发出可维护的,高性能、高扩展能力的协议服务及其客户端应用。
也就是说,Netty 是一个基于NIO的客户,服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。Netty相当简化和流线化了网络应用的编程开发过程,例如,TCP和UDP的socket服务开发。
“快速”和“简单”并不意味着会让你的最终应用产生维护性或性能上的问题。Netty 是一个吸收了多种协议的实现经验,这些协议包括FTP,SMPT,HTTP,各种二进制,文本协议,并经过相当精心设计的项目,最终,Netty成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。

2.开发准备:

  1. 引入jar包:netty-all-4.0.41.Final.jar (我使用的,大家可以选择自己合适的) 下载地址见:http://netty.io/downloads.html
  2. 使用maven:
    1
    2
    3
    4
    5
    6
    <dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty</artifactId> <!-- Use 'netty-all' for 4.0 or above -->
    <version>X.Y.Z.Q</version>
    <scope>compile</scope>
    </dependency>

3.Hello world工程:

下面简单介绍一个入门工程,包括服务端和客户端代码。详细见代码注释

3.1 server端的建立

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package cn.wpeace.hello;
import java.net.InetAddress;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* netty服务端建立
* 1.设置线程池
* 2.设置处理器
* 3.绑定端口
* @author peace
*
*/

public class NettyServer {
public static void main(String[] args) {
int port=8081;//服务器监听端口
EventLoopGroup bossGroup=new NioEventLoopGroup();//boss线程池
EventLoopGroup worerGroup=new NioEventLoopGroup();//work线程池
try {
ServerBootstrap bootstrap=new ServerBootstrap();//服务启动器
bootstrap.group(bossGroup,worerGroup);//指定Netty的Boss线程和work线程
bootstrap.channel(NioServerSocketChannel.class);//设置服务器通道类
bootstrap.childHandler(new ChannelInitializer<NioSocketChannel>(){//设置处理器
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
// 以("\n")为结尾分割的 解码器,用于消息识别
channel.pipeline().addLast("split",new DelimiterBasedFrameDecoder(1000, Delimiters.lineDelimiter()));
channel.pipeline().addLast("decoder",new StringDecoder());//对字符串进行处理 解码器
channel.pipeline().addLast("encoder",new StringEncoder());//对字符串进行处理 编码器
channel.pipeline().addLast("hander",new FirstServerHandler());//自定义的处理器

}
});
ChannelFuture future = bootstrap.bind(port).sync();//设置绑定的端口
future.channel().closeFuture().sync();

} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
worerGroup.shutdownGracefully();
}
}
}
/**
* 自定义处理类
* @author peace
*
*/

class FirstServerHandler extends SimpleChannelInboundHandler<String>{
/**
* 消息过来后执行此方法
*/

@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println(ctx.channel().remoteAddress()+":"+msg);
ctx.writeAndFlush("received your message:"+msg);
}
/**
* 通道被客户端激活时执行此方法
*/

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");//通道激活
ctx.writeAndFlush( "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");//回送进入服务系统
}

}

3.2 client端的建立

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package cn.wpeace.hello;
import java.util.Scanner;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* 客户端建立
* 1.设置线程池
* 2.设置处理器
* 3.连接端口
* @author peace
*
*/

public class NettyClient {
public static void main(String[] args) {
String host="127.0.0.1";//服务端IP
int port=8081;//端口
EventLoopGroup group=new NioEventLoopGroup();//线程池
try {
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {//设置处理器
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
channel.pipeline().addLast("split",new DelimiterBasedFrameDecoder(1000, Delimiters.lineDelimiter()));
channel.pipeline().addLast("decoder",new StringDecoder());
channel.pipeline().addLast("encoder",new StringEncoder());
channel.pipeline().addLast("hander",new FirstClientHandler());//自定义客户端处理器
}
});
Channel channel = bootstrap.connect(host,port).sync().channel();
Scanner scanner=new Scanner(System.in);//读取键盘输入
while(true){
String line = scanner.nextLine();
if(line==null||"".equals(line)){
continue;
}
if("exit".equals(line)){//退出字符串
channel.close();
break;
}
channel.writeAndFlush(line+ "\r\n");//\n为了分隔识别,\r为了格式化输出
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}

}
}
class FirstClientHandler extends SimpleChannelInboundHandler<String>{
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("Server :"+msg);//读取消息

}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client active");//通道激活调用
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Client close");//通道退出调用
super.channelInactive(ctx);
}
}

3.3运行效果:

1.服务端:
01
2.客户端:
02

4.核心类和方法介绍

4.1 EventLoopGroup用于管理Channel连接的

  1. bossGroup线程组:
    boss线程负责接收socket的连接请求,每接收一个连接请求就产生一个Channel,并把这个channel交给ServerBootstrap初始化时指定的 ServerSocketChannelFactory来处理,boss线程则继续处理socket的连接请求。

注意:ServerBootstrap监听几个端口对应几个boss线程。

  1. workerGroup线程组:
    worker线程负责继续处理channel请求。
    ServerSocketChannelFactory会从worker线程组中找出一个worker线程来继续处理这个请求。

如果是OioServerSocketChannelFactory也就是阻塞型IO时,这个channel上所有的socket消息,从开始到 channel(socket)关闭,都只由这个特定的worker来处理。
如果是NioServerSocketChannelFactory也就是非阻塞型IO时,每个worker可以服务不同的socket或者说channel,worker线程和channel不再有一一对应的关系。
worker的生命周期:对于普通IO,worker线程从连接建立后就保持。对于NIO则消息来临时从worker线程组中取出一个可用的线程,执行相应的ChannelPipeline(类似过滤器)等后,如果没有产生异常则会被回收。

4.2 Bootstrap

ServerBootstrap用于绑定端口,以及初始化设置线程组,handler等

Boostarp用于连接对应的IP和端口,以及初始化。

4.3ChannelPipeline和ChannelHandler

  1. channerlPipeLine:
    Netty中的每一个Channel,都有一个独立的ChannelPipeline,中文称为“通道水管”。只不过这个水管是双向的里面流淌着数据,数据可以通过这个“水管”流入到服务器,也可以通过这个“水管”从服务器流出。
  2. ChannelHandler:
    在ChannelPipeline中,有若干的过滤器。我们称之为“ChannelHandler”(处理器或者过滤器)。同“流入”和“流出”的概念向对应:用于处理/过滤 流入数据的ChannelHandler,称之为“ChannelInboundHandler”;用于处理/过滤 流出数据的ChannelHandler,称之为“ChannelOutboundHandler”。

类似Servlet中的过滤器,也是典型的责任链模式。需要注意,虽然数据管道中的Handler是按照顺序执行的,但不代表某一个Handler会处理任何一种由“上一个handler”发送过来的数据。某些Handler会检查传来的数据是否符合要求,如果不符合自己的处理要求,则不进行处理。

4.4一些Handler类举例

  1. StringDecoder:实现了对接收的byte数据转换为String
    (blog.wpeace.cn)

  2. HttpRequestDecoder:实现了Http协议的数据输入格式的解析。这个类将数据编码为HttpMessage对象,并交由下一个ChannelHandler进行处理。

  3. ByteArrayDecoder:最基础的数据流输入处理器,将所有的byte转换为ByteBuf对象(一般的实现类是:io.netty.buffer.UnpooledUnsafeDirectByteBuf)。我们进行一般的文本格式信息传输到服务器时,最好使用这个Handler将byte数组转换为ByteBuf对象。
  4. DelimiterBasedFrameDecoder:这个数据流输入处理器,会按照外部传入的数据中给定的某个关键字符/关键字符串,重新将数据组装为新的段落并发送给下一个Handler处理器。后文中,我们将使用这个处理器进行TCP半包的问题。
  5. 还有很多直接支持标准数据格式解析的处理器,例如支持Google Protocol Buffers 数据格式解析的ProtobufDecoder和ProtobufVarint32FrameDecoder处理器。
  6. 相反的有对应成对使用的解码器:HttpResponseEncoder,ByteArrayEncoder,StringEncoder;

本文来自伊豚(blog.wpeace.cn)

Peace wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!