本示例为基于行字符串的回显应用示例,用户端通过标准输入获取行字符串,并将字符串发送到服务端;服务端读取用户端行数据,解码之后将数据回写给用户端,同事服务端实现与用户端的读写心跳检测等;
服务端基于netty创立,解码器为LineBasedFrameDecoder和StringDecoder,主要功能是将ByteBuf类型数据解码转换为String类型数据;心跳机制基于IdleStateHandler实现;
SocketEchoServer 源码:
public class SocketEchoServer { private static int PORT = 9999; public static void main(String[] args){ SocketEchoServer.startServer(PORT); } private static void startServer(int port){ NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 256) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast("lineDecodeHandler", new LineBasedFrameDecoder(1024, false, false)) .addLast("stringDecoder", new StringDecoder()) .addLast("heartBeatHandler", new IdleStateHandler(0,0, 30)) .addLast("echoHandler", new SocketEchoServerHandler()); } }); ChannelFuture cf = sb.bind(PORT).sync(); System.out.println("---- server bind success on port:" + PORT + " ----"); cf.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}
说明:
SocketEchoServerHandler源码:
public class SocketEchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress address = (InetSocketAddress)ctx.channel().remoteAddress(); System.out.println("---- client active: host=" + address.getHostName() + ",port=" + address.getPort() + "----"); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { InetSocketAddress address = (InetSocketAddress)ctx.channel().remoteAddress(); System.out.println("---- client inactive: host=" + address.getHostName() + ",port=" + address.getPort() + "----"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ String str = (String)msg; ctx.writeAndFlush(Unpooled.copiedBuffer(str.getBytes())); System.out.println("recv form client:" + str); return; } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if(evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent) evt; DateTimeFormatter format = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss"); String dt = DateTime.now().toString(format); switch (event.state()){ case ALL_IDLE: System.out.println(dt + ":服务端与用户端读写超时!"); break; case READER_IDLE: System.out.println(dt + ":服务端读超时!"); break; case WRITER_IDLE: System.out.println(dt + ":服务端写超时!"); break; } } }}
channelRead:读取解码器解码转换后的String对象,将对象回写到对应的用户端;
userEventTriggered:若事件类型为IdleStateEvent表明为心跳检测事件,获取对应事件类型并打印;
用户端主要功能为连接服务器,从标准输入中读取行字符串,将行数据发送给服务端,同时接收服务端回写的数据;
SocketEchoClient源码:
public class SocketEchoClient { private static String LOCAL_HOST = "127.0.0.1"; private static int SERVER_PORT = 9999; public static void main(String[] args){ SocketEchoClient.startClient(SERVER_PORT); } private static void startClient(int serverPort){ NioEventLoopGroup group = new NioEventLoopGroup(1); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast("lineDecodeHandler", new LineBasedFrameDecoder(1024, false, false)) .addLast("stringDecoder",new StringDecoder()) .addLast("echoClientHandler", new SocketEchoClientHandler()); } }); ChannelFuture cf = b.connect(LOCAL_HOST, SERVER_PORT).sync(); System.out.println("client connect to server success! host:" + LOCAL_HOST + ":" + SERVER_PORT); cf.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); }finally { group.shutdownGracefully(); } } }
说明:
SocketEchoClientHandler源码:
public class SocketEchoClientHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof String){ String str = (String)msg; System.out.println("recv form server:" + str); return; } } @Override public void channelActive(final ChannelHandlerContext ctx) throws Exception { try { new Thread(new Runnable() { public void run() { Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String line = scanner.nextLine(); if(line.startsWith("##stop")){ ctx.close(); break; } line += System.getProperty("line.separator"); System.out.println(line); ctx.writeAndFlush(Unpooled.copiedBuffer(line.getBytes())); } } }).start(); }catch (Exception e){ e.printStackTrace(); ctx.close(); } }}
channelRead:读取服务端回写数据;
channelActive:用户端连接成功后,创立并启动读取标准输入解决的线程,线程中读取行数据并将数据发送给服务端;
以下为服务端及用户端的实例输入输出,序号为时序。
服务端:
服务端.png用户端:
用户端.png应用示例源码: zhaozhou11/netty-demo.git
Netty源码愫读:
Netty源码愫读(一)ByteBuf相关源码学习 【https://www.songma.com/p/016daa404957】
Netty源码愫读(二)Channel相关源码学习【https://www.songma.com/p/02eac974258e】
Netty源码愫读(三)ChannelPipeline、ChannelHandlerContext相关源码学习【https://www.songma.com/p/be82d0fcdbcc】
Netty源码愫读(四)ChannelHandler相关源码学习【https://www.songma.com/p/6ee0a3b9d73a】
Netty源码愫读(五)EventLoop与EventLoopGroup相关源码学习【https://www.songma.com/p/05096995d296】
Netty源码愫读(六)ServerBootstrap相关源码学习【https://www.songma.com/p/a71a9a0291f3】
Netty应用实战: