?????????TcpServer??????£?
public class TcpServer {
private final AttributeKey<ListenSession> LISTENSESSIONKEY = AttributeKey.valueOf("LISTENSESSIONKEY");
private final AttributeKey<ServerSession> SERVERSESSIONKEY = AttributeKey.valueOf("SERVERSESSIONKEY");
private final ServerBootstrap bootstrap = new ServerBootstrap();
private EventLoopGroup bossGroup = null;
private EventLoopGroup workerGroup = null;
private ArrayList<ListenSession> listenSessions = new ArrayList<ListenSession>();
...
private void start() {
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup(4);
bootstrap.group(bossGroup?? workerGroup);
bootstrap.channel(NioServerSocketChannel.class);
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encode"?? new ObjectEncoder());
pipeline.addLast("decode"?? new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(workerGroup?? new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ListenSession listenSession = ctx.channel().parent().attr(LISTENSESSIONKEY).get();
ServerSession serverSession = listenSession.createServerSession();
ctx.channel().attr(SERVERSESSIONKEY).set(serverSession);
serverSession.setChannel(ctx.channel());
serverSession.onActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ServerSession serverSession = ctx.channel().attr(SERVERSESSIONKEY).get();
serverSession.onInactive();
}
...
}
...
private void tryListen(ListenSession listenSession) {
if (!listenSession.isWorking()) {
return;
}
final int port = listenSession.getLocalPort();
final int interval = listenSession.getRelistenInterval();
ChannelFuture f = bootstrap.bind(port);
f.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
f.channel().attr(LISTENSESSIONKEY).set(listenSession);
} else {
f.channel().eventLoop().schedule(() -> tryListen(listenSession)?? interval?? TimeUnit.SECONDS);
}
}
});
}
}
?????????????????interval?????????????????????ServerSession??????Channel??
????TcpClient???????С????????????????Channel Inactive??????????
public class TcpClient {
private final AttributeKey<ClientSession> SESSIONKEY = AttributeKey.valueOf("SESSIONKEY");
private final Bootstrap bootstrap = new Bootstrap();
private EventLoopGroup workerGroup = null;
private ArrayList<ClientSession> clientSessions = new ArrayList<ClientSession>();
...
private void start() {
workerGroup = new NioEventLoopGroup();
bootstrap.group(workerGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encode"?? new ObjectEncoder());
pipeline.addLast("decode"?? new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
pipeline.addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ClientSession clientSession = ctx.channel().attr(SESSIONKEY).get();
clientSession.setChannel(ctx.channel());
clientSession.onActive();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
ClientSession clientSession = ctx.channel().attr(SESSIONKEY).get();
clientSession.onInactive();
final int interval = clientSession.getReconnectInterval();
ctx.channel().eventLoop().schedule(() -> tryConnect(clientSession)?? interval?? TimeUnit.SECONDS);
}
...
}
...
private void tryConnect(ClientSession clientSession) {
if (!clientSession.isWorking()) {
return;
}
final String host = clientSession.getRemoteHost();
final int port = clientSession.getRemotePort();
final int interval = clientSession.getReconnectInterval();
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host?? port));
future.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture f) throws Exception {
if (f.isSuccess()) {
f.channel().attr(SESSIONKEY).set(clientSession);
} else {
f.channel().eventLoop().schedule(() -> tryConnect(clientSession)?? interval?? TimeUnit.SECONDS);
}
}
});
}
}
???????????????????????????????????????????????ClientSession/ListenSession???ɡ??磺
????private TcpServer tcpServer = new TcpServer();
????private LSServer lsServer = new LSServer();
????private LSClient lsClient = new LSClient();
????lsServer.setLocalPort(30001);
????lsServer.setRelistenInterval(10);
????tcpServer.attach(lsServer);
????lsClient.setLocalPort(40001);
????lsClient.setRelistenInterval(10);
????tcpServer.attach(lsClient);
??????????????????????????????????bind???????????′???
????f.channel().closeFuture().sync();
????????????????????????????????main loop?????????????????У????main????????????????????????tick??????????main loop????????????????Щ?????μ????????????????????????????