Netty 기초
Netty - async and event-driven
기존 blocking model 은 아래와 같음
매 Thread 마다 socket connection 을 가지게 하고, blocking I/O 모델일 경우 다음과 같은 drawback 이 발생할 수 있다
- 많은 thread 가 I/O 이벤트 결과가 나타날 때까지 기다려야 함
- 각 연결마다 thread 가 배정되는 형태이기 때문에, connection 이 늘어난다면 thread 도 늘어나게 될 것이고 그에 따른 context-switching 비용도 늘어날 것이다
그래서 Java NIO 가 도입되었음
- setsockopt() : socket의 read/write call 이 즉시 리턴됨
- non-blocking socket 을 등록할 수 있음: event notification API (e.g. select(), poll() ) 를 이용해서 데이터가 읽기/쓰기 준비가 되었는지 확인할 수 있음
java.nio.channels.Selector : event notification API 를 이용해 어떤 socket이 I/O 준비가 되었는지 체크
Figure 1.2 모델을 사용하면서
- 적은 thread 로부터 많은 connection을 핸들링 할 수 있음, memory 관리나 context-switching 으로부터 오버헤드가 줄어듬
- 스레드는 핸들할 I/O 가 없을 경우 다른 task 의 작업을 할 수 있게 됨
즉 Netty는 Asynchrounous 하고 event-driven 이다
- operation 이 끝날 때까지 블록되지 않게 함. async method 는 즉시 리턴되며 완료 시 user에게 알림한다
- Selector는 적은 수의 thread 로부터 이벤트에 대한 많은 연결을 할 수 있게 함
Netty의 Core Component 로는 다음과 같은 것들이 있다
- Channels:파일이나 네트워크 소켓과 같은 것들의 연결이라고 볼 수 있음, I/O operation 들을 수행함
- an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.
- Callbacks: Netty는 콜백을 이벤트를 핸들링할때 사용한다 interface ChannelHandler 구현하면서 이벤트가 핸들링되며, 콜백이 트리거 되게 된다
public class ConnectHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) //새로운 connection 이 만들어 질때 트리거됨
throws Exception {
System.out.println(
"Client " + ctx.channel().remoteAddress() + " connected");
}
}
- Futures: async operation 의 결과에 대한 placeholder 로 작동, 미래의 어떤 순간에 완료될 것이며 결과에 대한 access 를 제공한다 Netty에서는 ChannelFuture 제공, ChannelFutureListner 사용해서 operation 이 완료되었는지 에러로 종료되었는지 여부를 알 수 있다 Nettty의 outbound I/O operation 은 ChannelFuture 를 반환하게 된다
Channel channel = ...;
// Does not block
ChannelFuture future = channel.connect(
new InetSocketAddress("192.168.0.1", 25));
future.addListener(new ChannelFutureListener() {//operation 이 완료되면 알림받는 리스너 등록
@Override
public void operationComplete(ChannelFuture future) { //완료될 시에 호출됨
if (future.isSuccess()){
ByteBuf buffer = Unpooled.copiedBuffer(
"Hello",Charset.defaultCharset());
ChannelFuture wf = future.channel()
.writeAndFlush(buffer);
....
} else {
Throwable cause = future.cause();
cause.printStackTrace();
}
}
});
- Event & Handlers:
모든 event 는 user가 구현한 handlers 에 의해 처리됨
Netty의 ChannelHandler 는 hanlder 에 대한 abstraction 을 제공
Netty application
//다수 Channel 에서도 이 Handler 가 안전하게 사용될 수 있음
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println(
"Server received: " + in.toString(CharsetUtil.UTF_8));
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
ChannelHandler 는 다양한 유형의 이벤트에 대해 호출됨
애플리케이션은 이벤트 수명 주기에 연결 및 커스텀 로직을 작성하기 위해 ChannelHandler 를 구현할 수 있음
public class EchoSever {
private final int port;
public EchoSever(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception{
if (args.length != 1){
System.err.println("Usage: "+EchoSever.class.getSimpleName() +" <port");
}
int port = Integer.parseInt(args[0]);
new EchoSever(port).start();
}
public void start() throws Exception{
final EchoServerHandler serverHandler = new EchoServerHandler();
EventLoopGroup group = new NioEventLoopGroup(); //create the event loop group
try{
ServerBootstrap b = new ServerBootstrap(); //create the server bootstrap
b.group(group)
.channel(NioServerSocketChannel.class) //specify the use of NIO transport Channel
.localAddress(new InetSocketAddress(port)) //sets the socket address using port
.childHandler(new ChannelInitializer<SocketChannel>() { //add EchoServerHanlder to the channel's channelPipleline
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(serverHandler);
}
});
ChannelFuture f = b.bind().sync(); //bind the server asychronously
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
Channelnitializer: 새로운 connection 이 허용되면, 자식 Channel 이 만들어지고 Channel Initializer 가 ChannelPipleline 에 EchoServerHandler 를 추가함
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new EchoClientHandler());
}
});
ChannelFuture f = b.connect().sync();
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println(
"Usage: " + EchoClient.class.getSimpleName() +
" <host> <port>");
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
}
@Sharable
public class EchoClientHandler extends
SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",
CharsetUtil.UTF_8));
}
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
System.out.println(
"Client received: " + in.toString(CharsetUtil.UTF_8));
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,
Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Netty components and design
Channel
Transports
Selector: Channel 의 상태에 변화가 있을 때 알림받을 수 있음
가능한 상태의 변화는 다음과 같음
- New Channel 이 accept 되고 준비되었을 때
- Channel 연결이 완료되었을 때
- Channel 이 데이터가 존재하고 읽기가 준비되었을 때
- Channel 이 데이터 쓰기가 가능할 때
아래의 패턴은 java.nio.channels.SelectionKey 에 정의된 bit pattern 임
상태에 변화가 있을 때 이 패턴들을 결합해서 사용함
select() 대신 epoll() 사용