공부/Java

Netty 기초

흑개1 2025. 4. 22. 23:12

Netty - async and event-driven

기존 blocking model 은 아래와 같음

 

매 Thread 마다 socket connection 을 가지게 하고, blocking I/O 모델일 경우 다음과 같은 drawback 이 발생할 수 있다

 

  • 많은 thread 가 I/O 이벤트 결과가 나타날 때까지 기다려야 함
  • 각 연결마다 thread 가 배정되는 형태이기 때문에, connection 이 늘어난다면 thread 도 늘어나게 될 것이고 그에 따른 context-switching 비용도 늘어날 것이다

그래서 Java NIO 가 도입되었음

  • setsockopt() : socket의 read/write call 이 즉시 리턴됨
  • non-blocking socket 을 등록할 수 있음: event notification API (e.g. select(), poll() ) 를 이용해서 데이터가 읽기/쓰기 준비가 되었는지 확인할 수 있음

 

java.nio.channels.Selector : event notification API 를 이용해 어떤 socket이 I/O 준비가 되었는지 체크

Figure 1.2 모델을 사용하면서

  • 적은 thread 로부터 많은 connection을 핸들링 할 수 있음, memory 관리나 context-switching 으로부터 오버헤드가 줄어듬
  • 스레드는 핸들할 I/O 가 없을 경우 다른 task 의 작업을 할 수 있게 됨

즉 Netty는 Asynchrounous 하고 event-driven 이다

  • operation 이 끝날 때까지 블록되지 않게 함. async method 는 즉시 리턴되며 완료 시 user에게 알림한다
  • Selector는 적은 수의 thread 로부터 이벤트에 대한 많은 연결을 할 수 있게 함

Netty의 Core Component 로는 다음과 같은 것들이 있다

  • Channels:파일이나 네트워크 소켓과 같은 것들의 연결이라고 볼 수 있음, I/O operation 들을 수행함
  • an open connection to an entity such as a hardware device, a file, a network socket, or a program component that is capable of performing one or more distinct I/O operations, for example reading or writing.
  • Callbacks: Netty는 콜백을 이벤트를 핸들링할때 사용한다 interface ChannelHandler 구현하면서 이벤트가 핸들링되며, 콜백이 트리거 되게 된다
public class ConnectHandler extends ChannelInboundHandlerAdapter {
@Override
	public void channelActive(ChannelHandlerContext ctx) //새로운 connection 이 만들어 질때 트리거됨
	throws Exception {
	System.out.println(
	"Client " + ctx.channel().remoteAddress() + " connected");
	}
}
  • Futures: async operation 의 결과에 대한 placeholder 로 작동, 미래의 어떤 순간에 완료될 것이며 결과에 대한 access 를 제공한다 Netty에서는 ChannelFuture 제공, ChannelFutureListner 사용해서 operation 이 완료되었는지 에러로 종료되었는지 여부를 알 수 있다 Nettty의 outbound I/O operation 은 ChannelFuture 를 반환하게 된다
Channel channel = ...;
// Does not block
ChannelFuture future = channel.connect(
new InetSocketAddress("192.168.0.1", 25));
	future.addListener(new ChannelFutureListener() {//operation 이 완료되면 알림받는 리스너 등록
			@Override
			public void operationComplete(ChannelFuture future) { //완료될 시에 호출됨
			if (future.isSuccess()){
			ByteBuf buffer = Unpooled.copiedBuffer(
			"Hello",Charset.defaultCharset());
			ChannelFuture wf = future.channel()
			.writeAndFlush(buffer);
			....
			} else {
			Throwable cause = future.cause();
			cause.printStackTrace();
			}
			}
});
  • Event & Handlers:

모든 event 는 user가 구현한 handlers 에 의해 처리됨

Netty의 ChannelHandler 는 hanlder 에 대한 abstraction 을 제공

Netty application

//다수 Channel 에서도 이 Handler 가 안전하게 사용될 수 있음
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println(
                "Server received: " + in.toString(CharsetUtil.UTF_8));
        ctx.write(in);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
                .addListener(ChannelFutureListener.CLOSE);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
                                Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

ChannelHandler 는 다양한 유형의 이벤트에 대해 호출됨

애플리케이션은 이벤트 수명 주기에 연결 및 커스텀 로직을 작성하기 위해 ChannelHandler 를 구현할 수 있음

public class EchoSever {
    private final int port;

    public EchoSever(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws Exception{
        if (args.length != 1){
            System.err.println("Usage: "+EchoSever.class.getSimpleName() +" <port");
        }
        int port = Integer.parseInt(args[0]);
        new EchoSever(port).start();
    }

    public void start() throws Exception{
        final EchoServerHandler serverHandler = new EchoServerHandler();
        EventLoopGroup group = new NioEventLoopGroup(); //create the event loop group
        try{
            ServerBootstrap b = new ServerBootstrap(); //create the server bootstrap
            b.group(group)
                    .channel(NioServerSocketChannel.class) //specify the use of NIO transport Channel
                    .localAddress(new InetSocketAddress(port))  //sets the socket address using port
                    .childHandler(new ChannelInitializer<SocketChannel>() { //add EchoServerHanlder to the channel's channelPipleline
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            ChannelFuture f = b.bind().sync(); //bind the server asychronously
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

Channelnitializer: 새로운 connection 이 허용되면, 자식 Channel 이 만들어지고 Channel Initializer 가 ChannelPipleline 에 EchoServerHandler 를 추가함

public class EchoClient {
    private final String host;
    private final int port;
    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture f = b.connect().sync();
            f.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }

    public static void main(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.println(
                    "Usage: " + EchoClient.class.getSimpleName() +
                            " <host> <port>");
            return;
        }
        String host = args[0];
        int port = Integer.parseInt(args[1]);
        new EchoClient(host, port).start();
    }
}
@Sharable
public class EchoClientHandler extends
        SimpleChannelInboundHandler<ByteBuf> {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!",
                CharsetUtil.UTF_8));
    }
    @Override
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
        System.out.println(
                "Client received: " + in.toString(CharsetUtil.UTF_8));
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,
                                Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

Netty components and design

Channel

Transports

Selector: Channel 의 상태에 변화가 있을 때 알림받을 수 있음

가능한 상태의 변화는 다음과 같음

  • New Channel 이 accept 되고 준비되었을 때
  • Channel 연결이 완료되었을 때
  • Channel 이 데이터가 존재하고 읽기가 준비되었을 때
  • Channel 이 데이터 쓰기가 가능할 때

아래의 패턴은 java.nio.channels.SelectionKey 에 정의된 bit pattern 임

상태에 변화가 있을 때 이 패턴들을 결합해서 사용함

 

select() 대신 epoll() 사용