[关闭]
@phper 2016-12-09T03:03:58.000000Z 字数 9385 阅读 878

swoole深入学习 2. tcp Server和tcp Client

swoole


这节来学习Swoole最基础的ServerClient。会通过创建一个tcp Server来讲解。

server

  1. <?php
  2. class Server
  3. {
  4. private $serv;
  5. public function __construct()
  6. {
  7. $this->serv = new Swoole\Server('127.0.0.1', 9501);
  8. //当启动一个Swoole应用时,一共会创建2 + n + m个进程,2为一个Master进程和一个Manager进程,其中n为Worker进程数。m为TaskWorker进程数。
  9. //默认如果不设置,swoole底层会根据当前机器有多少CPU核数,启动对应数量的Reactor线程和Worker进程。我机器为4核的。Worker为4。TaskWorker为0。
  10. //下面我来设置worker_num = 10。看下启动了多少个进程
  11. $this->serv->set([
  12. 'worker_num' => 10,
  13. //'task_worker_num' => 2,
  14. 'deamonize' => true,
  15. ]);
  16. //启动10个work,总共12个进程。
  17. /*
  18. ➜ Event git:(master) pstree |grep server.php
  19. | \-+= 54172 yangyi php server.php #Master进程
  20. | \-+- 54173 yangyi php server.php # Manager 进程
  21. | |--- 54174 yangyi php server.php #Work 进程
  22. | |--- 54175 yangyi php server.php
  23. | |--- 54176 yangyi php server.php
  24. | |--- 54177 yangyi php server.php
  25. | |--- 54178 yangyi php server.php
  26. | |--- 54179 yangyi php server.php
  27. | |--- 54180 yangyi php server.php
  28. | |--- 54181 yangyi php server.php
  29. | |--- 54182 yangyi php server.php
  30. | \--- 54183 yangyi php server.php
  31. *
  32. */
  33. //增加新的监控的ip:post:mode
  34. $this->serv->addlistener("::1", 9500, SWOOLE_SOCK_TCP);
  35. //监听事件
  36. /*
  37. *
  38. * - onStart
  39. * - onShutdown
  40. * - onWorkerStart
  41. * - onWorkerStop
  42. * - onTimer
  43. * - onConnect
  44. * - onReceive
  45. * - onClose
  46. * - onTask
  47. * - onFinish
  48. * - onPipeMessage
  49. * - onWorkerError
  50. * - onManagerStart
  51. * - onManagerStop
  52. */
  53. $this->serv->on('Start', array($this, 'onStart'));
  54. $this->serv->on('Connect', array($this, 'onConnect'));
  55. $this->serv->on('Receive', array($this, 'onReceive'));
  56. $this->serv->on('Close', array($this, 'onClose'));
  57. //master进程启动后, fork出Manager进程, 然后触发ManagerStart
  58. $this->serv->on('ManagerStart', function (\swoole_server $server){
  59. echo "On manager start.";
  60. });
  61. //manager进程启动,启动work进程的时候调用 workid表示第几个id, 从0开始。
  62. $this->serv->on('WorkerStart', function($serv, $workerId) {
  63. echo $workerId . '---';
  64. });
  65. //当一个work进程死掉后,会触发
  66. $this->serv->on('WorkerStop', function() {
  67. echo '--stop';
  68. });
  69. //启动
  70. $this->serv->start();
  71. }
  72. //启动server时候会触发。
  73. public function onStart( $serv ) {
  74. echo "Start\n";
  75. }
  76. //client连接成功后触发。
  77. public function onConnect( $serv, $fd, $from_id ) {
  78. $a = $serv->send( $fd, "Hello {$fd}!" );
  79. //var_dump($a); //成功返回true
  80. }
  81. //接收client发过来的请求
  82. public function onReceive( swoole_server $serv, $fd, $from_id, $data ) {
  83. echo "Get Message From Client {$fd}:{$data}\n";
  84. //$serv->send($fd, $data);
  85. //关闭该work进程
  86. //$serv->stop();
  87. //宕机
  88. //$serv->shutdown();
  89. //主动关闭 客户端连接,也会触发onClose事件
  90. //$serv->close($fd);
  91. $serv->send($fd, $data);
  92. //$list = $serv->connection_list();
  93. // foreach ($list as $fd) {
  94. // $serv->send($fd, $data);
  95. // }
  96. }
  97. }
  98. //客户端断开触发
  99. public function onClose( $serv, $fd, $from_id ) {
  100. echo "Client {$fd} close connection\n";
  101. }
  102. }
  103. //输出swoole的版本
  104. echo swoole_version(); // 1.9.0
  105. //输出本机iP
  106. var_dump(swoole_get_local_ip());
  107. /**
  108. array(1) {
  109. 'en4' =>
  110. string(13) "172.16.71.149"
  111. }
  112. */
  113. // 启动服务器
  114. $server = new Server();

我们启动服务端server:

  1. $ php server.php
  2. 0--start
  3. 2--start
  4. Start
  5. 1--start
  6. 3--start
  7. 4--start
  8. 5--start
  9. 6--start
  10. manager start.
  11. 7--start
  12. 9--start
  13. 8--start

我们来分析整个server 启动的步骤:

  1. 启动php server.php后,当前进程fork出Master进程,然后退出。
  2. Master进程启动成功之后,fork出Manager进程,并触发OnManagerStart事件。
  3. Manager进程启动成功时候,fork出Worker进程,并触发OnWorkerStart事件。

同步client

server端好了,那么就会需要client端来连接,swoole里面client分为同步和异步,先来一个同步clent客户端。

  1. <?php
  2. // sync 同步客户端
  3. class client
  4. {
  5. private $client;
  6. public function __construct()
  7. {
  8. $this->client = new Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
  9. $this->client->connect('127.0.0.1', 9501, 1);
  10. }
  11. public function connect()
  12. {
  13. //fwrite(STDOUT, "请输入消息:");
  14. //$msg = trim(fgets(STDIN));
  15. $msg = rand(1,12);
  16. //发送给消息到服务端
  17. $this->client->send( $msg );
  18. //接受服务端发来的信息
  19. $message = $this->client->recv();
  20. echo "Get Message From Server:{$message}\n";
  21. //关闭客户端
  22. $this->client->close();
  23. }
  24. }
  25. $client = new Client();
  26. $client->connect();

同步client是同步阻塞的。一整套connect->send()->rev()->close()是同步进行的。

所以,如果是大量的循环数据,就不适合同步client了:

比如下面:

  1. <?php
  2. // sync 同步客户端
  3. class client
  4. {
  5. private $client;
  6. public function __construct()
  7. {
  8. var_dump(swoole_get_local_ip());
  9. $this->client = new Swoole\Client(SWOOLE_SOCK_TCP | SWOOLE_KEEP);
  10. $this->client->connect('127.0.0.1', 9501, 1);
  11. $i = 0;
  12. while ($i < 100) {
  13. $this->client->send($i."\n");
  14. $message = $this->client->recv();
  15. echo "Get Message From Server:{$message}\n";
  16. $i++;
  17. }
  18. }
  19. }
  20. $client = new Client();

打印的结果就是顺序执行。要是想要异步了。

异步client

  1. <?php
  2. //异步客户端
  3. $client = new Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
  4. $client->on("connect", function($cli) {
  5. var_dump($cli->isConnected()); // true
  6. var_dump($cli->getsockname()); //['port' => 57305, 'host'=> '127.0.0.1']
  7. var_dump($cli->sock); // 5
  8. $i = 0;
  9. while ($i < 100) {
  10. $cli->send($i."\n");
  11. $i++;
  12. }
  13. //关闭
  14. //$cli->close();
  15. });
  16. $client->on("receive", function($cli, $data){
  17. echo "Receive: $data";
  18. });
  19. $client->on("error", function(swoole_client $cli){
  20. echo "error\n" . $cli->errCode;
  21. });
  22. $client->on("close", function(swoole_client $cli){
  23. echo "Connection close\n";
  24. });
  25. $client->connect('127.0.0.1', 9501);

这样就是一个异步的client了,处理更快,但是只支持php的cli模式

server与client交互

总结一下client与server的连接过程:

  1. Client主动Connect的时候,Client实际上是与Master进程中的某个Reactor线程发生了连接。
  2. 当TCP的三次握手成功了以后,由这个Reactor线程将连接成功的消息告诉Manager进程,再由Manager进程转交给Worker进程。
  3. 在这个Worker进程中触发了OnConnect的方法。
  4. 当Client向Server发送了一个数据包的时候,首先收到数据包的是Reactor线程,同时Reactor线程会完成组包,再将组好的包交给Manager进程,由Manager进程转交给Worker。
  5. 此时Worker进程触发OnReceive事件。
  6. 如果在Worker进程中做了什么处理,然后再用Send方法将数据发回给客户端时,数据则会沿着这个路径逆流而上。

关于上面说到的几个进程,解释下:

Master进程是一个多线程进程,其中有一组非常重要的线程,叫做Reactor线程(组),每当一个客户端连接上服务器的时候,都会由Master进程从已有的Reactor线程中,根据一定规则挑选一个,专门负责向这个客户端提供维持链接、处理网络IO与收发数据等服务。

而Manager进程,某种意义上可以看做一个代理层,它本身并不直接处理业务,其主要工作是将Master进程中收到的数据转交给Worker进程,或者将Worker进程中希望发给客户端的数据转交给Master进程进行发送。另外,Manager进程还负责监控Worker进程,如果Worker进程因为某些意外挂了,Manager进程会重新拉起新的Worker进程,有点像Supervisor的工作。

Worker进程了,顾名思义,Worker进程其实就是处理各种业务工作的进程,Manager将数据包转交给Worker进程,然后Worker进程进行具体的处理,并根据实际情况将结果反馈给客户端。

task_worker

在swoole中work进程分为EventWorker和TaskWorker,对应的配置文件设置为:

  1. $this->serv->set([
  2. 'worker_num' => 10, #EventWorker
  3. 'task_worker_num' => 2, #TaskWorker
  4. 'deamonize' => true,
  5. ]);

worker是基于event触发,而task则是manager直接生成的子进程。那么他们有什么区别呢?

共同点是:他们都是最底层负责处理业务的进程。

Swoole的业务逻辑部分是同步阻塞运行的,如果遇到一些耗时较大的操作,例如访问数据库、广播消息等,就会影响服务器的响应速度。因此Swoole提供了Task功能,将这些耗时操作放到另外的进程去处理,当前woker进程继续执行后面的逻辑。运行Task,需要在swoole服务中配置参数task_worker_num,即可开启task功能。此外,必须给swoole_server绑定两个回调函数:onTaskonFinish。这两个回调函数分别用于执行Task任务和处理Task任务的返回结果。

先来写一个demo,来如何用 taskWoker来处理业务。

taskServer.php

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: yangyi
  5. * Date: 2016/12/7
  6. * Time: 16:16
  7. */
  8. class taskServer
  9. {
  10. private $serv;
  11. /**
  12. * [__construct description]
  13. * 构造方法中,初始化 $serv 服务
  14. */
  15. public function __construct() {
  16. $this->serv = new Swoole\Server('0.0.0.0', 9501);
  17. //初始化swoole服务
  18. $this->serv->set(array(
  19. 'worker_num' => 8,
  20. 'daemonize' => false, //是否作为守护进程,此配置一般配合log_file使用
  21. 'max_request' => 1000,
  22. 'log_file' => './swoole.log',
  23. 'task_worker_num' => 8
  24. ));
  25. //设置监听
  26. $this->serv->on('Start', array($this, 'onStart'));
  27. $this->serv->on('Connect', array($this, 'onConnect'));
  28. $this->serv->on("Receive", array($this, 'onReceive'));
  29. $this->serv->on("Close", array($this, 'onClose'));
  30. $this->serv->on("Task", array($this, 'onTask'));
  31. $this->serv->on("Finish", array($this, 'onFinish'));
  32. //开启
  33. $this->serv->start();
  34. }
  35. public function onStart($serv) {
  36. echo SWOOLE_VERSION . " onStart\n";
  37. }
  38. public function onConnect($serv, $fd) {
  39. echo $fd."Client Connect.\n";
  40. }
  41. public function onReceive($serv, $fd, $from_id, $data) {
  42. echo "Get Message From Client {$fd}:{$data}\n";
  43. // send a task to task worker.
  44. $param = array(
  45. 'fd' => $fd
  46. );
  47. // start a task
  48. $serv->task(json_encode($param));
  49. echo "Continue Handle Worker\n";
  50. }
  51. public function onClose($serv, $fd) {
  52. echo "Client Close.\n";
  53. }
  54. public function onTask($serv, $task_id, $from_id, $data) {
  55. echo "This Task {$task_id} from Worker {$from_id}\n";
  56. echo "Data: {$data}\n";
  57. for($i = 0 ; $i < 200 ; $i ++ ) {
  58. sleep(1);
  59. echo "Task {$task_id} Handle {$i} times...\n";
  60. }
  61. $fd = json_decode($data, true);
  62. $serv->send($fd['fd'] , "Data in Task {$task_id}");
  63. return "Task {$task_id}'s result";
  64. }
  65. public function onFinish($serv,$task_id, $data) {
  66. echo "Task {$task_id} finish\n";
  67. echo "Result: {$data}\n";
  68. }
  69. }
  70. $server = new taskServer();

taskClient.php 异步的客户端

  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: yangyi
  5. * Date: 2016/12/7
  6. * Time: 16:18
  7. */
  8. class taskClient
  9. {
  10. private $client;
  11. public function __construct() {
  12. $this->client = new Swoole\Client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
  13. $this->client->on('Connect', array($this, 'onConnect'));
  14. $this->client->on('Receive', array($this, 'onReceive'));
  15. $this->client->on('Close', array($this, 'onClose'));
  16. $this->client->on('Error', array($this, 'onError'));
  17. }
  18. public function connect() {
  19. if(!$fp = $this->client->connect("127.0.0.1", 9501 , 1)) {
  20. echo "Error: {$fp->errMsg}[{$fp->errCode}]\n";
  21. return;
  22. }
  23. }
  24. //connect之后,会调用onConnect方法
  25. public function onConnect($cli) {
  26. fwrite(STDOUT, "Enter Msg:");
  27. swoole_event_add(STDIN,function(){
  28. fwrite(STDOUT, "Enter Msg:");
  29. $msg = trim(fgets(STDIN));
  30. $this->send($msg);
  31. });
  32. }
  33. public function onClose($cli) {
  34. echo "Client close connection\n";
  35. }
  36. public function onError() {
  37. }
  38. public function onReceive($cli, $data) {
  39. echo "Received: ".$data."\n";
  40. }
  41. public function send($data) {
  42. $this->client->send($data);
  43. }
  44. public function isConnected($cli) {
  45. return $this->client->isConnected();
  46. }
  47. }
  48. $client = new taskClient();
  49. $client->connect();

运行一下:

php taskServer.php
php taskClient.php

服务端打印:

  1. $ php task_server.php
  2. 1.9.0 onStart
  3. 1Client Connect.
  4. Get Message From Client 1:12345
  5. Continue Handle Worker
  6. This Task 0 from Worker 3
  7. Data: {"fd":1}
  8. Task 0 Handle 0 times...
  9. Task 0 Handle 1 times...
  10. Task 0 finish
  11. Result: Task 0's result

客户端打印:

  1. $ php task_client.php
  2. Enter Msg:12345
  3. Enter Msg:Received: Data in Task 0

这里面有几点需要注意:
1. 运行Task,必须要在swoole服务中配置参数task_worker_num,此外,必须给swoole_server绑定两个回调函数:onTaskonFinish
2. onTash 要return 数据
3. onFinish 会接收到onTash的数据,标记成完成。
4. swoole_event_add 把输入绑定成事件,这个后续将,这样client就可以连续的多次输入。

swoole的架构

上面说了这么,图表总结一下swoole结构:

swoole采用 多线程Reactor+多进程Worker

swoole架构

swoole的处理连接流程图如下:

swoole处理流程图

当请求到达时,swoole是这样处理的:

请求到达 Main Reactor
|
|
Main Reactor根据Reactor的情况,将请求注册给对应的Reactor
(每个Reactor都有epoll。用来监听客户端的变化)
|
|
客户端有变化时,交给worker来处理
|
|
worker处理完毕,通过进程间通信(比如管道、共享内存、消息队列)发给对应的reactor。
|
|
reactor将响应结果发给相应的连接
|
|
请求处理完成

因为reactor基于epoll,所以每个reactor可以处理无数个连接请求。 如此,swoole就轻松的处理了高并发。

参考资料:

http://rango.swoole.com/archives/305

https://github.com/szyhf/swoole_study/blob/master/Swoole%E7%9A%84%E8%BF%9B%E7%A8%8B%E6%A8%A1%E5%9E%8B.md

http://happyliu.blog.51cto.com/501986/1574923

https://segmentfault.com/a/1190000007614502

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注