您现在的位置是:亿华云 > IT科技

Netty源码之Reactor模式

亿华云2025-10-04 03:04:21【IT科技】4人已围观

简介学习目标什么是Reactor模式? Reactor模式由什么组成的? Reactor模式解决什么问题? Reactor模式线程模型有哪些?演进过程?

 

学习目标

什么是码之r模Reactor模式? Reactor模式由什么组成的? Reactor模式解决什么问题? Reactor模式线程模型有哪些?演进过程?

web处理请求架构

大多数web请求处理流程可以抽象成这几个步骤:读取(read),解码(decode),码之r模处理(process),码之r模编码(encode),码之r模发送(send),码之r模如下图所示:

同时,码之r模处理web请求通常有两种架构:传统基于线程架构和事件驱动架构。码之r模

传统基于线程架构

概念

每个新连接创建一个线程来处理。码之r模对于长连接服务,码之r模如果一个client和server保持一个连接的码之r模话,有多少个client接入,码之r模server就需要创建同等的码之r模线程来处理。线程上下文切换,码之r模数据同步和内存消耗,码之r模对server来说,码之r模将是非常大的开销。

代码实现

传统基于线程架构通常采用BIO的方式来实现,代码如下:

public class Server implements Runnable {      int port;     public Server(int port) {          this.port = port;     }     @Override     public void run() {          try {              ServerSocket serverSocket = new ServerSocket(port);             while (true){                  System.out.println("等待新连接...");                 new Thread(new Handler(serverSocket.accept())).start();             }         } catch (IOException e) {              e.printStackTrace();         }     }     static class Handler implements Runnable{          private Socket socket;         public Handler(Socket socket){              this.socket = socket;         }         @Override         public void run() {              try {                  byte[] input = new byte[1024];                 this.socket.getInputStream().read(input);                 byte[] output = process(input);                 this.socket.getOutputStream().write(output);                 this.socket.getOutputStream().flush();                 this.socket.close();                 System.out.println("响应完成!");             } catch (IOException e) {                  e.printStackTrace();             }         }         private byte[] process(byte[] input) {              System.out.println("读取内容:" + new String(input));             return input;         }     }     public static void main(String[] args) throws InterruptedException {          Thread thread = new Thread(new Server(2021));         thread.setDaemon(true);         thread.start();         synchronized (Server.class) {              Server.class.wait();         }     } } 

为了避免线程创建销毁的开销,我们通常会采用线程池,但是同样也有很大的弊端:

同步阻塞IO,读写阻塞,线程等待时间过长 在制定线程策略的时候,香港云服务器只能根据CPU的数目来限定可用线程资源,不能根据连接并发数目来制定,也就是连接有限制。否则很难保证对客户端请求的高效和公平。 多线程之间的上下文切换,造成线程使用效率并不高,并且不易扩展 状态数据以及其他需要保持一致的数据,需要采用并发同步控制

应用场景

既然传统基于线程架构弊端这么多,它存在还有什么价值?它的应用场景是什么?

传统基于线程架构适用于连接数目比较小且一次传输大量数据的场景,比如上传,下载。

事件驱动架构

事件驱动架构:可以把线程和连接解耦,线程只用于执行事件注册的回调函数。事件驱动架构由事件生产者和事件消费者组成,前者是事件的来源,它只负责监听哪些事件发生;后者是直接处理事件或者事件发生时,响应事件的实体。

Reactor模式

什么是Reactor模式?

Reactor模式是事件驱动架构的高防服务器一种具体实现方法,简而言之,就是一个单线程进行循环监听就绪IO事件,并将就绪IO事件分发给对应的回调函数。

Reactor模式由什么组成的?

Reactor模式分为两个重要组成部分,Reactor和Handler。 Reactor(反应器):循环监听就绪IO事件,并分发给回调函数。 Handler(回调函数):执行对应IO事件的实际业务逻辑。

Reactor模式解决什么问题?

反应器模式可以实现同步的多路复用,同步是指按照事件到达的顺序分发处理。反应器 接收来自不同的客户端的消息、请求和连接,尽管客户端是并发的,但是反应器可以按照事件到达的顺序触发回调函数。因此,Reactor模式将连接和线程解耦,不需要为每个连接创建单独线程。这个问题和C10K问题相同,提供了一个解决思路。服务器托管

Reactor模式下的三种模型

单线程模型:IO事件轮询,分发(accept)和IO事件执行(read,decode,compute,encode,send)都在一个线程中完成,如下图所示:

在单线程模型下,不仅IO操作在Reactor线程上,而非IO操作(handlder中process()方法)也在Reactor线程上执行了,当非IO操作执行慢的话,这会大大延迟IO请求响应,所以应该把非IO操作拆出来,来加速Reactor线程对IO请求响应,就出现多线程模型。

单线程模型实现:

// reactor public class Reactor implements Runnable {      int port;     Selector selector;     ServerSocketChannel serverSocket;     public Reactor(int port) throws IOException {          this.port = port;         // 创建serverSocket对象         serverSocket = ServerSocketChannel.open();         // 绑定端口         serverSocket.socket().bind(new InetSocketAddress(port));         // 配置非阻塞         serverSocket.configureBlocking(false);         // 创建selector对象         selector = Selector.open();         // serversocket注册到selector上,帮忙监听accpet事件         serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverSocket,selector));         /** 还可以使用 SPI provider,来创建selector和serversocket对象         SelectorProvider p = SelectorProvider.provider();         selector = p.openSelector();         serverSocket = p.openServerSocketChannel();         */     }     @Override     public void run() {          try {              while (!Thread.interrupted()) {                  System.out.println("start select event...");                 selector.select();                 Set selectedKeys = selector.selectedKeys();                 Iterator it = selectedKeys.iterator();                 while (it.hasNext()) {                      dispatch((SelectionKey)it.next());                 }                 selectedKeys.clear();             }         } catch (IOException e) {              e.printStackTrace();         }     }     private void dispatch(SelectionKey key) {          Runnable r = (Runnable) key.attachment();         if (r != null) {              r.run();         }     }     public static void main(String[] args) throws IOException, InterruptedException {          Thread thread = new Thread(new Reactor(2021));         thread.start();         synchronized (Reactor.class) {              Reactor.class.wait();         }     } } // acceptor调度器 public class Acceptor implements Runnable {      ServerSocketChannel serverSocket;     Selector selector;     public Acceptor(ServerSocketChannel serverSocket,Selector selector) {          this.serverSocket = serverSocket;         this.selector = selector;     }     @Override     public void run() {          try {              SocketChannel socket = this.serverSocket.accept();             if (socket != null) {                  new Handler(selector,socket);             }         } catch (IOException e) {              e.printStackTrace();         }     } } // 回调函数handler public class Handler implements Runnable {      Selector selector;     SocketChannel socket;     SelectionKey sk;     ByteBuffer input = ByteBuffer.allocate(1024);     ByteBuffer output = ByteBuffer.allocate(1024);     static final int READING = 0, SENDING = 1;     int state = READING;     public Handler(Selector selector, SocketChannel socket) throws IOException {          this.selector = selector;         this.socket = socket;         this.socket.configureBlocking(false);         sk = this.socket.register(selector,0);         sk.attach(this);         sk.interestOps(SelectionKey.OP_READ);         selector.wakeup();     }     @Override     public void run() {          try{              if (state == READING) {                  read();             } else if (state == SENDING) {                  send();             }         } catch (IOException ex) {              ex.printStackTrace();         }     }     private void read() throws IOException {          socket.read(input);         if (inputIsComplete()) {              // 执行业务逻辑代码             process();             state = SENDING;             // Normally also do first write now             sk.interestOps(SelectionKey.OP_WRITE);         }     }     private void send() throws IOException {          socket.write(output);         socket.close();         if (outputIsComplete()) sk.cancel();     }     boolean inputIsComplete() {  return true;}     boolean outputIsComplete() { return true;}     // 处理非IO操作(业务逻辑代码)     void process(){          String msg = new String(input.array());         System.out.println("读取内容:" + msg);         output.put(msg.getBytes());         output.flip();     } }  多线程模型:与单线程模型不同的是添加一个业务线程池,将非IO操作(业务逻辑处理)交给业务线程池来处理,提高Reactor线程的IO响应,如图所示:

在多线程模型下,虽然将非IO操作拆出去了,但是所有IO操作都在Reactor单线程中完成的。在高负载、高并发场景下,也会成为瓶颈,于是对Reactor单线程进行了优化,出现了主从线程模型。

多线程模型实现:

public class Reactor implements Runnable {      int port;     Selector selector;     ServerSocketChannel serverSocket;     public Reactor(int port) throws IOException {          this.port = port;         // 创建serverSocket对象         serverSocket = ServerSocketChannel.open();         // 绑定端口         serverSocket.socket().bind(new InetSocketAddress(port));         // 配置非阻塞         serverSocket.configureBlocking(false);         // 创建selector对象         selector = Selector.open();         // serversocket注册到selector上,帮忙监听accpet事件         serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor("Acceptor",serverSocket,selector));         /** 还可以使用 SPI provider,来创建selector和serversocket对象         SelectorProvider p = SelectorProvider.provider();         selector = p.openSelector();         serverSocket = p.openServerSocketChannel();         */     }     @Override     public void run() {          try {              while (!Thread.interrupted()) {                  System.out.println("start select event...");                 selector.select();                 Set selectedKeys = selector.selectedKeys();                 Iterator it = selectedKeys.iterator();                 while (it.hasNext()) {                      dispatch((SelectionKey)it.next());                 }                 selectedKeys.clear();             }         } catch (IOException e) {              e.printStackTrace();         }     }     private void dispatch(SelectionKey key) {          SelfRunable r = (SelfRunable) key.attachment();         if (r != null) {              System.out.println("dispatch to " + r.getName() + "====");             r.run();         }     }     public static void main(String[] args) throws IOException, InterruptedException {          Thread thread = new Thread(new Reactor(2021));         thread.start();         synchronized (Reactor.class) {              Reactor.class.wait();         }     } } public class Acceptor implements SelfRunable {      ServerSocketChannel serverSocket;     Selector selector;     String name;     public Acceptor(String name, ServerSocketChannel serverSocket,Selector selector) {          this.name = name;         this.serverSocket = serverSocket;         this.selector = selector;     }     @Override     public void run() {          try {              SocketChannel socket = this.serverSocket.accept();             if (socket != null) {                  new Handler("handler_" + ((InetSocketAddress)socket.getLocalAddress()).getPort(), selector,socket);             }         } catch (IOException e) {              e.printStackTrace();         }     }     @Override     public String getName() {          return this.name;     } } public class Handler implements SelfRunable {      String name;     Selector selector;     SocketChannel socket;     SelectionKey sk;     ByteBuffer input = ByteBuffer.allocate(1024);     ByteBuffer output = ByteBuffer.allocate(1024);     static final int READING = 0, SENDING = 1,  PROCESSING = 3;     volatile int state = READING;     static ExecutorService poolExecutor = Executors.newFixedThreadPool(5);     public Handler(String name, Selector selector, SocketChannel socket) throws IOException {          this.selector = selector;         this.socket = socket;         this.name = name;         this.socket.configureBlocking(false);         sk = this.socket.register(selector,0);         sk.attach(this);         sk.interestOps(SelectionKey.OP_READ);         selector.wakeup();     }     @Override     public void run() {          try{              System.out.println("state:" + state);             if (state == READING) {                  read();             } else if (state == SENDING) {                  send();             }         } catch (IOException ex) {              ex.printStackTrace();         }     }     synchronized void read() throws IOException {          socket.read(input);         if (inputIsComplete()) {              state = PROCESSING;            poolExecutor.execute(new Processer());         }     }     synchronized void processAndHandOff() {          System.out.println("processAndHandOff=========");         process();         state = SENDING; // or rebind attachment         sk.interestOps(SelectionKey.OP_WRITE);         selector.wakeup();         System.out.println("processAndHandOff finish ! =========");     }     private void send() throws IOException {          System.out.println("start send ...");         socket.write(output);         socket.close();         System.out.println("start send finish!");         if (outputIsComplete()) sk.cancel();     }     boolean inputIsComplete() {  return true;}     boolean outputIsComplete() { return true;}     void process(){          String msg = new String(input.array());         System.out.println("读取内容:" + msg);         output.put(msg.getBytes());         output.flip();     }     @Override     public String getName() {          return this.name;     }     class Processer implements Runnable {          public void run() {  processAndHandOff(); }     } }  主从线程模型: 相比多线程模型而言,对于多核cpu,为了充分利用资源,将Reactor拆分成了mainReactor 和 subReactor,但是,主从线程模型也有弊端,不适合大量数据传输。 mainReactor:负责监听接收(accpet)新连接,将新连接后续操作交给subReactor来处理,通常由一个线程处理。 subReactor: 负责处理IO的读写操作,通常由多个线程处理。 非IO操作依然由业务线程池来处理。

主从线程模型实现:

public class Reactor implements Runnable {      int port;     Selector selector;     ServerSocketChannel serverSocket;     int SUBREACTOR_SIZE = 1;     SubReactor[] subReactorPool = new SubReactor[SUBREACTOR_SIZE];     public Reactor(int port) throws IOException {          this.port = port;         // 创建serverSocket对象         serverSocket = ServerSocketChannel.open();         // 绑定端口         serverSocket.socket().bind(new InetSocketAddress(port));         // 配置非阻塞         serverSocket.configureBlocking(false);         // 创建selector对象         selector = Selector.open();         // serversocket注册到selector上,帮忙监听accpet事件         serverSocket.register(selector, SelectionKey.OP_ACCEPT, new Acceptor("Acceptor",serverSocket,subReactorPool));         // 初始化subreactor pool         initSubReactorPool();         /** 还可以使用 SPI provider,来创建selector和serversocket对象         SelectorProvider p = SelectorProvider.provider();         selector = p.openSelector();         serverSocket = p.openServerSocketChannel();         */     }     @Override     public void run() {          try {              while (!Thread.interrupted()) {                  System.out.println("mainReactor start select event...");                 selector.select();                 Set selectedKeys = selector.selectedKeys();                 Iterator it = selectedKeys.iterator();                 while (it.hasNext()) {                      dispatch((SelectionKey)it.next());                 }                 selectedKeys.clear();             }         } catch (IOException e) {              e.printStackTrace();         }     }     void initSubReactorPool() {          try {              for (int i = 0; i < SUBREACTOR_SIZE; i++) {                  subReactorPool[i] = new SubReactor("SubReactor" + i);             }         } catch (IOException ex) {  /* ... */ }     }     private void dispatch(SelectionKey key) {          SelfRunable r = (SelfRunable) key.attachment();         if (r != null) {              System.out.println("mainReactor dispatch to " + r.getName() + "====");             r.run();         }     }     public static void main(String[] args) throws IOException, InterruptedException {          Thread thread = new Thread(new Reactor(2021));         thread.start();         synchronized (Reactor.class) {              Reactor.class.wait();         }     } } public class SubReactor implements SelfRunable {      private Selector selector;     private String name;     private List<SelfRunable> task = new ArrayList<SelfRunable>();     public SubReactor(String name) throws IOException {          this.name = name;         selector = Selector.open();         new Thread(this).start();     }     @Override     public String getName() {          return this.name;     }     @Override     public void run() {          try {              while (!Thread.interrupted()) {                  System.out.println("subReactor start select event...");                 selector.select(5000);                 Set selectedKeys = selector.selectedKeys();                 Iterator it = selectedKeys.iterator();                 while (it.hasNext()) {                      dispatch((SelectionKey)it.next());                 }                 selectedKeys.clear();             }         } catch (IOException e) {              e.printStackTrace();         }     }     private void dispatch(SelectionKey key) {          SelfRunable r = (SelfRunable) key.attachment();         if (r != null) {              System.out.println("subReactor dispatch to " + r.getName() + "====");             r.run();         }     }     public Selector getSelector(){          return this.selector;     }     public void submit(SelfRunable runnable) {          task.add(runnable);     } } public class Acceptor implements SelfRunable {      int next = 0;     String name;     SubReactor[] subReactorPool;     ServerSocketChannel serverSocket;     public Acceptor(String name, ServerSocketChannel serverSocket,SubReactor[] subReactorPool) {          this.name = name;         this.serverSocket = serverSocket;         this.subReactorPool = subReactorPool;     }     @Override     public void run() {          try {              SocketChannel socket = this.serverSocket.accept();             if (socket != null) {                  new Handler("handler", subReactorPool[next].getSelector(),socket);             }             if (++next == subReactorPool.length) { next=0;}         } catch (IOException e) {              e.printStackTrace();         }     }     @Override     public String getName() {          return this.name;     } } public class Handler implements SelfRunable {      String name;     Selector selector;     SocketChannel socket;     SelectionKey sk;     ByteBuffer input = ByteBuffer.allocate(1024);     ByteBuffer output = ByteBuffer.allocate(1024);     static final int READING = 0, SENDING = 1,  PROCESSING = 3;     volatile int state = READING;     static ExecutorService poolExecutor = Executors.newFixedThreadPool(5);     public Handler(String name, Selector selector, SocketChannel socket) throws IOException {          this.selector = selector;         this.socket = socket;         this.name = name;         this.socket.configureBlocking(false);         sk = this.socket.register(this.selector,0);         sk.attach(this);         sk.interestOps(SelectionKey.OP_READ);         selector.wakeup();     }     @Override     public void run() {          try{              System.out.println("state:" + state);             if (state == READING) {                  read();             } else if (state == SENDING) {                  send();             }         } catch (IOException ex) {              ex.printStackTrace();         }     }     synchronized void read() throws IOException {          socket.read(input);         if (inputIsComplete()) {              state = PROCESSING;            poolExecutor.execute(new Processer());         }     }     synchronized void processAndHandOff() {          System.out.println("processAndHandOff=========");         process();         state = SENDING; // or rebind attachment         sk.interestOps(SelectionKey.OP_WRITE);         selector.wakeup();         System.out.println("processAndHandOff finish ! =========");     }     private void send() throws IOException {          System.out.println("start send ...");         socket.write(output);         socket.close();         System.out.println("start send finish!");         if (outputIsComplete()) sk.cancel();     }     boolean inputIsComplete() {  return true;}     boolean outputIsComplete() { return true;}     void process(){          String msg = new String(input.array());         System.out.println("读取内容:" + msg);         output.put(msg.getBytes());         output.flip();     }     @Override     public String getName() {          return this.name;     }     class Processer implements Runnable {          public void run() {  processAndHandOff(); }     } } 

Reactor线程模型演进

模型

简介

弊端

单线程模型

IO/非IO操作都在Reactor单线程中完成

非IO操作执行慢,影响IO操作响应延迟

多线程模型

拆分非IO操作交给业务线程池执行,IO操作由Reator单线程执行

高并发,高负载场景下,Reactor单线程会成为瓶颈

主从线程模型

Reactor单线程拆分为mainReactor和subReactor

不适合大量数据传输

Netty线程模型

Reactor主从线程模型-抽象模型

创建ServerSocketChannel过程(创建channel,配置非阻塞) ServerSocketChannel注册到mainReactor的selector对象上,监听accept事件 mainReactor的selector监听到新连接SocketChannel,将SocketChannel注册到subReactor的selector对象上,监听read/write事件 subReactor的selector监听到read/write事件,移交给业务线程池(对应netty的pipeline)

Netty线程模型

我们再好好看看mainReactor和subReactor,其实这两个类功能非常相似,所以Netty将mainReactor和subReactor统一成了EventLoop。对于Netty零基础的,请参考这个Reactor主从线程模型-抽象模型和下面这张图来理解EventLoop。

很赞哦!(56)