[关闭]
@Dubyoo 2016-02-29T08:28:38.000000Z 字数 6041 阅读 3322

ACE Reactor (ACE反应器)

ACE


1. ACE Reactor schedule_timer

要点:
1. 继承 ACE_Event_Handler
2. 重载回调函数 handle_timeout()
3. 设置定时器 reactor()->schedule_timer( , , , )
4. 循环调用Reactor事件处理 ACE_Reactor::instance()->handle_events()

一个计时器的例子

  1. // ACEReactorTimerTest.h
  2. #pragma once
  3. #include <iostream>
  4. #include "ace/Reactor.h"
  5. #include "ace/OS.h"
  6. #include "ace/Log_Msg.h"
  7. using namespace std;
  8. class MyTimerHandler : public ACE_Event_Handler
  9. {
  10. private:
  11. int delay;
  12. int inteval;
  13. int timerid;
  14. public:
  15. MyTimerHandler(int delay, int inteval)
  16. : delay(delay)
  17. , inteval(inteval)
  18. { }
  19. int open()
  20. {
  21. ACE_Time_Value delaytime(delay);
  22. ACE_Time_Value intevaltime(inteval);
  23. // Reactor中的定时器
  24. timerid = reactor()->schedule_timer(
  25. this, // 第1个参数:指定一个ACE_Event_Handler类指针,到达delaytime时,会调用该类下的handle_timeout()函数
  26. "Hello timeout", // 第2个参数:向handle_timeout(const ACE_Time_Value&, const void*)的传参,类型为const void*
  27. delaytime, // 第3个参数:延时delaytime,到达时间后调用ACE_Event_Handler下的handle_timeout()
  28. intevaltime // 第4个参数:间隔时间,每隔一段时间调用handle_timeout();若为0,则只执行一次
  29. );
  30. return timerid;
  31. }
  32. int close()
  33. {
  34. return reactor()->cancel_timer(timerid);
  35. }
  36. int handle_timeout(const ACE_Time_Value& current_time, const void* arg)
  37. {
  38. time_t epoch = ((timespec_t)current_time).tv_sec;
  39. cout << "timeout arg:\t" << (char*)arg << endl;
  40. ACE_DEBUG((LM_INFO, ACE_TEXT("handle_timeout: %s\n"), ACE_OS::ctime(&epoch)));
  41. return 0;
  42. }
  43. };
  1. // main.cpp
  2. #pragma comment(lib, "ACED.lib")
  3. #include "ACEReactorTimerTest.h"
  4. int main(int, char*[])
  5. {
  6. MyTimerHandler * timer = new MyTimerHandler(1, 3);
  7. timer->reactor(ACE_Reactor::instance());
  8. timer->open();
  9. int i = 0;
  10. while (++i)
  11. {
  12. cout << "Repeat begin\t" << i << endl;
  13. ACE_Reactor::instance()->handle_events();
  14. cout << "Repeat end\t" << i << endl;
  15. }
  16. return 0;
  17. }

2. ACE Reactor Client

要点:
1. 继承 ACE_Event_Handler
2. 重载回调函数 handle_*()
3. 登记到反应器 ACE_Reactor::instance()->register_handler()
4. 循环调用Reactor事件处理 ACE_Reactor::instance()->handle_events()

MyClient.h

  1. // MyClient.h
  2. #pragma once
  3. #include <iostream>
  4. #include "ace/Reactor.h"
  5. #include "ace/SOCK_Stream.h"
  6. #include "ace/SOCK_Connector.h"
  7. #include "ace/OS.h"
  8. using namespace std;
  9. class MyClient : public ACE_Event_Handler
  10. {
  11. public:
  12. ACE_HANDLE get_handle() const;
  13. int handle_input(ACE_HANDLE fd);
  14. int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
  15. ACE_SOCK_STREAM& Peer();
  16. int handle_timeout(const ACE_Time_Value& current_time, const void* arg);
  17. private:
  18. ACE_SOCK_STREAM peer;
  19. };
  20. ACE_HANDLE MyClient::get_handle() const
  21. {
  22. return peer.get_handle();
  23. }
  24. int MyClient::handle_input(ACE_HANDLE fd)
  25. {
  26. int recv = 0;
  27. char buffer[1024] = "";
  28. memset(buffer, 0, 1024);
  29. if ((recv = peer.recv(buffer, 1024)) <= 0)
  30. return -1;
  31. buffer[recv] = '\0';
  32. cout << "recv:\t" << buffer << endl;
  33. // peer.send(buffer, recv + 1);
  34. return 0;
  35. }
  36. int MyClient::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
  37. {
  38. cout << "Connection closed." << endl;
  39. return ACE_Event_Handler::handle_close(handle, close_mask);
  40. }
  41. ACE_SOCK_STREAM& MyClient::Peer()
  42. {
  43. return peer;
  44. }
  45. int MyClient::handle_timeout(const ACE_Time_Value& current_time, const void* arg)
  46. {
  47. cout << "send:\t" << (char*)arg << endl;
  48. return peer.send((char*)arg, strlen((char*)arg) + 1);
  49. }
  50. void RunClient()
  51. {
  52. MyClient client;
  53. ACE_SOCK_CONNECTOR connector;
  54. ACE_INET_Addr addr(3000, "127.0.0.1");
  55. ACE_Time_Value timeout(5, 0);
  56. if (connector.connect(client.Peer(), addr, &timeout) != 0)
  57. {
  58. cout << endl << "Connect fail." << endl;
  59. }
  60. cout << "Connected." << endl;
  61. ACE_Reactor::instance()->register_handler(&client, ACE_Event_Handler::READ_MASK);
  62. ACE_Time_Value delaytime(5);
  63. ACE_Time_Value intevaltime(2);
  64. ACE_Reactor::instance()->schedule_timer(&client, "Keep alive...", delaytime, intevaltime);
  65. while (true)
  66. {
  67. ACE_Reactor::instance()->handle_events();
  68. }
  69. }

main.cpp

  1. // main.cpp
  2. #pragma comment(lib, "ACED.lib")
  3. #include "MyClient.h"
  4. int main(int, char*[])
  5. {
  6. RunClient();
  7. return 0;
  8. }

3.ACE Reactor Server

MyServer.h

  1. // MyServer.h
  2. #pragma once
  3. #include <iostream>
  4. #include "ace/Reactor.h"
  5. #include "ace/SOCK_Stream.h"
  6. #include "ace/SOCK_Connector.h"
  7. #include "ace/SOCK_Acceptor.h"
  8. using namespace std;
  9. class MyServer : public ACE_Event_Handler
  10. {
  11. public:
  12. ACE_SOCK_STREAM& Peer();
  13. int open();
  14. ACE_HANDLE get_handle() const;
  15. virtual int handle_input(ACE_HANDLE fd);
  16. virtual int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);
  17. private:
  18. ACE_SOCK_STREAM peer;
  19. };
  20. class MyAcceptor : public ACE_Event_Handler
  21. {
  22. public:
  23. ~MyAcceptor();
  24. int open(const ACE_INET_Addr& addr);
  25. ACE_HANDLE get_handle() const;
  26. virtual int handle_input(ACE_HANDLE fd);
  27. private:
  28. ACE_SOCK_Acceptor acceptor;
  29. };
  30. ACE_SOCK_STREAM& MyServer::Peer()
  31. {
  32. return peer;
  33. }
  34. int MyServer::open()
  35. {
  36. return ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::READ_MASK);
  37. }
  38. ACE_HANDLE MyServer::get_handle() const
  39. {
  40. return peer.get_handle();
  41. }
  42. int MyServer::handle_input(ACE_HANDLE fd)
  43. {
  44. int recv = 0;
  45. char buffer[1024] = "";
  46. memset(buffer, 0, 1024);
  47. ACE_INET_Addr client_addr;
  48. peer.get_remote_addr(client_addr);
  49. if ((recv = peer.recv(buffer, 1024)) > 0)
  50. {
  51. buffer[recv] = '\0';
  52. cout << "recv from client(" << client_addr.get_host_addr() << ":" << client_addr.get_port_number() << "):\t" << buffer << endl;
  53. return 0;
  54. }
  55. return -1;
  56. }
  57. int MyServer::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
  58. {
  59. cout << endl << "Connection closed." << endl;
  60. return ACE_Event_Handler::handle_close(handle, close_mask);
  61. }
  62. MyAcceptor::~MyAcceptor()
  63. {
  64. this->handle_close(ACE_INVALID_HANDLE, 0);
  65. }
  66. int MyAcceptor::open(const ACE_INET_Addr& listen_addr)
  67. {
  68. if (acceptor.open(listen_addr, 1) == -1)
  69. {
  70. cout << endl << "Open port fail." << endl;
  71. return -1;
  72. }
  73. cout << "Open port:\t" << listen_addr.get_host_addr() << ":" << listen_addr.get_port_number() << endl;
  74. return ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
  75. }
  76. ACE_HANDLE MyAcceptor::get_handle() const
  77. {
  78. return acceptor.get_handle();
  79. }
  80. int MyAcceptor::handle_input(ACE_HANDLE fd)
  81. {
  82. MyServer * server = new MyServer();
  83. if (acceptor.accept(server->Peer()) == -1)
  84. {
  85. cout << endl << "Accept client fail." << endl;
  86. return -1;
  87. }
  88. ACE_INET_Addr client_addr;
  89. server->Peer().get_remote_addr(client_addr);
  90. cout << "Accept client:\t" << client_addr.get_host_addr() << ":" << client_addr.get_port_number() << endl;
  91. server->Peer().send("Hello client! You are online.", 30);
  92. ACE_Reactor::instance()->register_handler(this, ACE_Event_Handler::READ_MASK);
  93. if (server->open() == -1)
  94. server->handle_close(ACE_INVALID_HANDLE, 0);
  95. return 0;
  96. }
  97. void RunServer()
  98. {
  99. ACE_INET_Addr server_addr(3000, "127.0.0.1");
  100. MyAcceptor acceptor;
  101. acceptor.open(server_addr);
  102. while (true)
  103. {
  104. ACE_Reactor::instance()->handle_events();
  105. }
  106. }

main.cpp

  1. // main.cpp
  2. #pragma comment(lib, "ACED.lib")
  3. #include "MyServer.h"
  4. int main(int, char*[])
  5. {
  6. RunServer();
  7. return 0;
  8. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注