一、客户端-普通的socket
Socket client; try { long start = System.nanoTime(); client = new Socket("127.0.0.1", 9123); OutputStream out = client.getOutputStream(); RequestBeanProtos.RequestBean request = RequestBeanProtos.RequestBean .newBuilder().setAccount("811") .setDeviceId("device1").setGroup("sync") .setType("card").build(); out.write(request.toByteArray()); InputStream in = client.getInputStream(); byte[] data = readContent(in, 4096); if (data != null && data.length > 0) { // ServiceResponseBeanProtos.ResponseBean response = ServiceResponseBeanProtos.ResponseBean // .parseFrom(data); // System.out.println(response.getContent()); } else { System.out.println("content is empty!"); } out.close(); in.close(); client.close();
二、服务端
public static void main(String[] args) { try { // 启动socket服务 ExecutorService executor = Executors.newCachedThreadPool(); NioSocketAcceptor acceptor = new NioSocketAcceptor(Runtime.getRuntime().availableProcessors() + 1); acceptor.setReuseAddress(true); acceptor.getSessionConfig().setReadBufferSize(2048); acceptor.getSessionConfig().setIdleTime(IdleStatus.BOTH_IDLE, 8); acceptor.getSessionConfig().setReceiveBufferSize(1024); acceptor.getSessionConfig().setSoLinger(0); acceptor.getFilterChain().addLast("codec", new StreamWriteFilter()); acceptor.getFilterChain().addLast("threadPool", new ExecutorFilter(executor)); acceptor.setHandler(new ServiceRouteExcutorHandler()); IoBuffer.setUseDirectBuffer(false); IoBuffer.setAllocator(new SimpleBufferAllocator()); try { acceptor.bind(new InetSocketAddress(Configuration .getConfiguration().getServerConfig().getPort())); } catch (IOException e) { log.error("acceptor bind IOException=" + e.toString()); } } catch (RouteEngineException e) { log.error("initialize machines RouteEngineException=" + e.toString()); } }
Handler处理
private Logger log = Logger.getLogger(getClass()); @Override public void exceptionCaught(IoSession session, Throwable arg1) throws Exception { log.error("exceptionCaught called.exception=" + arg1.toString()); session.close(true); } @Override public void messageReceived(IoSession session, Object message) throws Exception { if (log.isDebugEnabled()) { log.debug("messageReceived start."); } IoBuffer buffer = (IoBuffer) message; ByteBuffer bf = buffer.buf(); byte[] tempBuffer = new byte[bf.limit()]; bf.get(tempBuffer); RouteEngine engine = Configuration.getConfiguration().getEngine(); byte []data = engine.response(tempBuffer); buffer.free(); int len = data.length; IoBuffer resp = IoBuffer.allocate(len); resp.put(data, 0, len); resp.flip(); session.write(resp); resp.free(); if (log.isDebugEnabled()) { log.debug("messageReceived end."); } } @Override public void messageSent(IoSession arg0, Object arg1) throws Exception { if (log.isDebugEnabled()) { log.debug("messageSent called."); } } @Override public void sessionClosed(IoSession arg0) throws Exception { if (log.isDebugEnabled()) { log.debug("sessionClosed called."); } } @Override public void sessionCreated(IoSession session) throws Exception { if (log.isDebugEnabled()) { log.debug("sessionCreated called."); } } @Override public void sessionIdle(IoSession session, IdleStatus arg1) throws Exception { if (log.isDebugEnabled()) { log.debug("sessionIdle called.IdleStatus=" + arg1.toString()); } session.close(true); } @Override public void sessionOpened(IoSession arg0) throws Exception { if (log.isDebugEnabled()) { log.debug("sessionOpened called."); } }