[关闭]
@bergus 2017-04-23T14:03:19.000000Z 字数 4303 阅读 1506

基于nsq的rpc探索和远程代码执行demo

python rpc nsq


server

  1. # -*- coding: utf-8 -*-
  2. import logging
  3. import ssl
  4. import msgpack
  5. import nsq
  6. from tornado.ioloop import IOLoop
  7. log = logging.getLogger(__name__)
  8. import redis
  9. class RPCServer(object):
  10. def __init__(self):
  11. self.__code_obj = {}
  12. nsq.Reader(
  13. topic='mq_input',
  14. channel='mq',
  15. name="mq_input.mq",
  16. nsqd_tcp_addresses=['127.0.0.1:4150'],
  17. lookupd_http_addresses=['http://127.0.0.1:4161'],
  18. message_handler=self._handle_monitor,
  19. heartbeat_interval=10,
  20. tls_options={'cert_reqs': ssl.CERT_NONE},
  21. output_buffer_size=4096,
  22. output_buffer_timeout=100,
  23. max_tries=5,
  24. max_in_flight=9,
  25. lookupd_poll_interval=60,
  26. low_rdy_idle_timeout=10,
  27. max_backoff_duration=128,
  28. lookupd_poll_jitter=0.3,
  29. lookupd_connect_timeout=1,
  30. lookupd_request_timeout=2,
  31. )
  32. self.writer = nsq.Writer(['127.0.0.1:4150'])
  33. self.redis_client = redis.Redis(host='127.0.0.1', port=6379)
  34. def __package(self, name):
  35. c = self.__code_obj.get(name)
  36. if c:
  37. return c
  38. redis_client = self.redis_client
  39. if '#' in name:
  40. _p, _f = name.split("#")
  41. _p = "." + _p
  42. else:
  43. _p = ""
  44. _f = name
  45. __s = redis_client.hget("__code__" + _p, _f)
  46. self.__code_obj[name] = self.__e(_f.split(".")[-1], __s)
  47. return self.__code_obj[name]
  48. def import_(self, name):
  49. return self.__package(name)
  50. def __e(self, name, source):
  51. try:
  52. exec source
  53. __c = eval(name)
  54. setattr(__c, "import_", self.import_)
  55. return __c
  56. except Exception, e:
  57. log.error(e.message)
  58. return e.message
  59. def f_m(self, conn, data):
  60. print conn, data
  61. def _handle_monitor(self, msg):
  62. _pp = msgpack.unpackb(msg.body, use_list=False)
  63. print _pp
  64. _name, _id, _package, _p_params, _method, _m_params = _pp
  65. _o = self.__code_obj.get((_package, str(_p_params)))
  66. if not _o:
  67. res = self.import_(_package)
  68. print res
  69. _o = res(_p_params)
  70. self.__code_obj[(_package, str(_p_params))] = _o
  71. __o = getattr(_o, _method)
  72. _r = __o(_m_params)
  73. print _r
  74. self.writer.pub(_name, msgpack.packb((_id, _r)), callback=self.f_m)
  75. return True
  76. if __name__ == '__main__':
  77. RPCServer()
  78. loop = IOLoop.instance()
  79. loop.start()

client

  1. # -*- coding: utf-8 -*-
  2. import inspect
  3. import logging
  4. import sys
  5. import uuid
  6. from os import listdir
  7. from os.path import splitext
  8. import msgpack
  9. import nsq
  10. import redis
  11. from tornado import gen
  12. from tornado import ioloop
  13. from tornado.concurrent import Future
  14. from tornado.ioloop import IOLoop
  15. log = logging.getLogger(__name__)
  16. class _Package(object):
  17. def __init__(self, req, _name, class_path, class_args):
  18. self.__req = req
  19. self.name = _name
  20. self.class_path = class_path
  21. self.class_args = class_args
  22. self.writer = nsq.Writer(['127.0.0.1:4150'])
  23. def f_m(self, conn, data):
  24. # print conn, data
  25. pass
  26. def call(self, _method, _m_args):
  27. _id = str(uuid.uuid4())
  28. self.__req[_id] = Future()
  29. self.writer.pub('mq_input', msgpack.packb((
  30. self.name,
  31. _id,
  32. self.class_path,
  33. self.class_args,
  34. _method,
  35. _m_args
  36. )), callback=self.f_m
  37. )
  38. return self.__req[_id]
  39. class RPCClient(object):
  40. __req = {}
  41. name = 'mq_output'
  42. code_path = 'test'
  43. def __init__(self):
  44. nsq.Reader(
  45. topic=self.name,
  46. channel='mq',
  47. name="mq_output.mq",
  48. # nsqd_tcp_addresses=['127.0.0.1:4150'],
  49. lookupd_http_addresses=['http://127.0.0.1:4161'],
  50. message_handler=self._handle_monitor,
  51. heartbeat_interval=10,
  52. output_buffer_size=4096,
  53. output_buffer_timeout=100,
  54. max_tries=5,
  55. max_in_flight=9,
  56. lookupd_poll_interval=60,
  57. low_rdy_idle_timeout=10,
  58. max_backoff_duration=128,
  59. lookupd_poll_jitter=0.3,
  60. lookupd_connect_timeout=1,
  61. lookupd_request_timeout=2,
  62. )
  63. import test
  64. self.code_path = test.__path__[0]
  65. print self.code_path
  66. self.redis_client = redis.Redis(host='127.0.0.1', port=6379)
  67. _p = self.code_path.split("/").pop()
  68. print(_p)
  69. for name, code in self.upload_code():
  70. self.redis_client.hset("__code__." + _p, name, code)
  71. def _handle_monitor(self, msg):
  72. _id, _res = msgpack.unpackb(msg.body, use_list=False)
  73. print _id, _res
  74. try:
  75. self.__req[_id].set_result(_res)
  76. except:
  77. pass
  78. return True
  79. def upload_code(self):
  80. code_path = self.code_path
  81. sys.path.append(code_path)
  82. for f in listdir(code_path):
  83. name, f_ext = splitext(f)
  84. if name.startswith('__') or name.endswith("__") or f_ext != '.py':
  85. continue
  86. __obj = __import__(name)
  87. for k, v in inspect.getmembers(__obj):
  88. if v.__class__.__name__ == 'type':
  89. yield "{}.{}".format(name, k), "# -*- coding: utf-8 -*-\n\n\n{}".format(inspect.getsource(v))
  90. def package(self, class_path, args=None):
  91. return _Package(self.__req, self.name, class_path, args)
  92. @gen.coroutine
  93. def _call():
  94. _a = []
  95. for i in xrange(10):
  96. a = yield p.call('__call__', "jj")
  97. _a.append(a)
  98. raise gen.Return(_a)
  99. @gen.coroutine
  100. def _call1():
  101. _a = yield _call()
  102. print _a
  103. for i in xrange(10):
  104. b = yield s.call("get", 'http://www.baidu.com')
  105. print b
  106. if __name__ == '__main__':
  107. c = RPCClient()
  108. p = c.package("test#hello.Hello", ("ssss", dict(www=2)))
  109. s = c.package("test#spider.Spider")
  110. ioloop.PeriodicCallback(_call1, 1000).start()
  111. loop = IOLoop.instance()
  112. loop.start()
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注