[关闭]
@lupnfer 2017-03-01T01:06:20.000000Z 字数 13462 阅读 825

Engine Media Process

Code


IA Engine Media Process

  1. /**********************************************************************
  2. Description: IA Engine Media Process
  3. --------------------------------------------------------------------------------
  4. ChangeLog
  5. DATE NAME DESCRIPTION
  6. --------------------------------------------------------------------------------
  7. **********************************************************************/
  8. #ifndef WISEOS_IAE_MP_MEDIA_PROCESS_H_
  9. #define WISEOS_IAE_MP_MEDIA_PROCESS_H_
  10. #include <string>
  11. #include <set>
  12. #include <vector>
  13. #include <map>
  14. #include <cstddef>
  15. #include <iostream>
  16. #include <fstream>
  17. #include <mutex>
  18. #include "boost/thread.hpp"
  19. #include "boost/thread/condition_variable.hpp"
  20. #include "boost/thread/mutex.hpp"
  21. namespace ia10k {
  22. class BaseMediaElement;
  23. using MediaProcessType = enum {
  24. MediaProcessTypePipe = 1,
  25. MediaProcessTypeJoin = 2,
  26. MediaProcessTypeSplit = 3,
  27. MediaProcessTypeMultiplex = 4,
  28. MediaProcessTypeProducer = 5,
  29. MediaProcessTypeConsumer = 6,
  30. MediaProcessTypeRunloop = 7,
  31. };
  32. class MediaProcessInterface {
  33. public:
  34. MediaProcessInterface() {}
  35. virtual const MediaProcessType getType() const = 0;
  36. virtual const size_t getInputCount() const = 0;
  37. virtual const size_t getOutputCount() const = 0;
  38. virtual void input(const size_t &index, const std::shared_ptr<BaseMediaElement> &mediaElement) = 0;
  39. virtual void setOutputHandler(const size_t &index,
  40. std::function<void(std::shared_ptr<BaseMediaElement>)> outputHandler) = 0;
  41. // for generator only, return continue flag, if true continue, else break.
  42. virtual bool produce() = 0;
  43. // stop current produce/input
  44. virtual void stop(bool graceful) = 0;
  45. // init start operation if needed
  46. virtual void start() = 0;
  47. // wait finish
  48. virtual void wait() = 0;
  49. // reset to start status if needed
  50. virtual void reset() = 0;
  51. // error handle for produce/input.
  52. virtual void setErrorHandler(std::function<bool(const std::exception &)> errorHandler) = 0;
  53. };
  54. class BaseMediaProcess : public MediaProcessInterface {
  55. public:
  56. BaseMediaProcess() : errorHandler_(nullptr), generator_(nullptr) {}
  57. template<typename...Args>
  58. explicit BaseMediaProcess(Args... args):BaseMediaProcess() {
  59. init(0, args...);
  60. }
  61. virtual const size_t getInputCount() const {
  62. return inputCount_;
  63. }
  64. virtual const size_t getOutputCount() const {
  65. return outputCount_;
  66. }
  67. virtual void setErrorHandler(std::function<bool(const std::exception &)> errorHandler) {
  68. errorHandler_ = errorHandler;
  69. }
  70. virtual void input(const size_t &index, const std::shared_ptr<BaseMediaElement> &mediaElement) {
  71. return inputHandlers_[index](mediaElement);
  72. }
  73. virtual void setOutputHandler(const size_t &index,
  74. std::function<void(std::shared_ptr<BaseMediaElement>)> outputHandler) {
  75. outputHandlers_[index] = outputHandler;
  76. }
  77. virtual bool produce() {
  78. if (generator_) {
  79. return generator_();
  80. } else {
  81. throw std::runtime_error("not impl.");
  82. }
  83. }
  84. virtual void stop(bool graceful = true) {
  85. auto it = mps_.begin();
  86. auto itEnd = mps_.end();
  87. while (it != itEnd) {
  88. it->get()->stop(graceful);
  89. if (graceful) {
  90. it->get()->wait();
  91. }
  92. ++it;
  93. }
  94. }
  95. virtual void start() {
  96. auto it = mps_.rbegin();
  97. auto itEnd = mps_.rend();
  98. while (it != itEnd) {
  99. it->get()->start();
  100. ++it;
  101. }
  102. }
  103. virtual void wait() {
  104. auto it = mps_.rbegin();
  105. auto itEnd = mps_.rend();
  106. while (it != itEnd) {
  107. it->get()->wait();
  108. ++it;
  109. }
  110. }
  111. virtual void reset() {
  112. auto it = mps_.begin();
  113. auto itEnd = mps_.end();
  114. while (it != itEnd) {
  115. it->get()->reset();
  116. ++it;
  117. }
  118. }
  119. private:
  120. template<typename...Args>
  121. void init(size_t level, std::shared_ptr<BaseMediaProcess> mp, Args... args) {
  122. if (level == 0) {
  123. // check if start with a generator
  124. if (mp->getInputCount() == 0) {
  125. BaseMediaProcess *mpPtr = mp.get();
  126. generator_ = [mpPtr]() -> bool {
  127. return mpPtr->produce();
  128. };
  129. }
  130. }
  131. std::vector<std::shared_ptr<BaseMediaProcess> > mps({mp});
  132. return init(level, mps, args ...);
  133. }
  134. template<typename...Args>
  135. void init(size_t level, std::vector<std::shared_ptr<BaseMediaProcess> > mps, Args... args) {
  136. if (level == 0) {
  137. // collect input
  138. for (auto mp : mps) {
  139. // hold it in memory.
  140. mps_.emplace_back(mp);
  141. BaseMediaProcess *mpPtr = mp.get();
  142. size_t count = mp->getInputCount();
  143. for (size_t i = 0; i < count; ++i) {
  144. inputHandlers_[inputCount_] = [mpPtr, i](std::shared_ptr<BaseMediaElement> me) -> void {
  145. return mpPtr->input(i, me);
  146. };
  147. ++inputCount_;
  148. }
  149. }
  150. } else {
  151. // prev.output -> curr.input
  152. std::vector<std::function<void(std::shared_ptr<BaseMediaElement>)>> funcs;
  153. for (auto mp : mps) {
  154. // hold it in memory
  155. mps_.emplace_back(mp);
  156. BaseMediaProcess *mpPtr = mp.get();
  157. size_t count = mp->getInputCount();
  158. for (size_t i = 0; i < count; ++i) {
  159. funcs.emplace_back([mpPtr, i](std::shared_ptr<BaseMediaElement> me) -> void {
  160. mpPtr->input(i, me);
  161. });
  162. }
  163. }
  164. if (funcs.size() != prevOutputCount_) {
  165. throw(std::runtime_error("previous output not match current input."));
  166. }
  167. size_t j = 0;
  168. for (auto mp : mpsPrev_) {
  169. size_t count = mp->getOutputCount();
  170. for (size_t i = 0; i < count; ++i) {
  171. mp->setOutputHandler(i, funcs[j]);
  172. ++j;
  173. }
  174. }
  175. }
  176. // save to prev
  177. mpsPrev_.clear();
  178. prevOutputCount_ = 0;
  179. for (auto mp : mps) {
  180. mpsPrev_.emplace_back(mp);
  181. prevOutputCount_ += mp->getOutputCount();
  182. }
  183. return init(level + 1, args...);
  184. }
  185. template<typename...Args>
  186. void init(size_t level) {
  187. // last
  188. size_t j = 0;
  189. for (auto mp : mpsPrev_) {
  190. size_t count = mp->getOutputCount();
  191. outputCount_ += count;
  192. for (size_t i = 0; i < count; ++i) {
  193. mp->setOutputHandler(i, [this, j](std::shared_ptr<BaseMediaElement> me) -> void {
  194. if (this->outputHandlers_.find(j) != this->outputHandlers_.end()) {
  195. this->outputHandlers_[j](me);
  196. }
  197. });
  198. ++j;
  199. }
  200. }
  201. }
  202. protected:
  203. size_t inputCount_ = 0;
  204. size_t outputCount_ = 0;
  205. std::vector<std::shared_ptr<BaseMediaProcess> > mpsPrev_;
  206. size_t prevOutputCount_ = 0;
  207. std::vector<std::shared_ptr<BaseMediaProcess> > mps_;
  208. std::function<bool(const std::exception &)> errorHandler_;
  209. std::map<size_t, std::function<void(std::shared_ptr<BaseMediaElement>)> > outputHandlers_;
  210. // input handler can only changed in self or derived class.
  211. std::map<size_t, std::function<void(std::shared_ptr<BaseMediaElement>)> > inputHandlers_;
  212. // generator proxy
  213. std::function<bool()> generator_;
  214. };
  215. class BaseMediaProcessPipe : public BaseMediaProcess {
  216. public:
  217. BaseMediaProcessPipe() {}
  218. template<typename...Args>
  219. explicit BaseMediaProcessPipe(Args...args): BaseMediaProcess(args...) {
  220. assert(getOutputCount() == 1);
  221. assert(getInputCount() == 1);
  222. }
  223. virtual const MediaProcessType getType() const {
  224. return MediaProcessTypePipe;
  225. }
  226. virtual bool produce() {
  227. throw std::runtime_error("not support.");
  228. }
  229. };
  230. class BaseMediaProcessJoin : public BaseMediaProcess {
  231. public:
  232. BaseMediaProcessJoin() {}
  233. template<typename...Args>
  234. explicit BaseMediaProcessJoin(Args...args): BaseMediaProcess(args...) {
  235. assert(getOutputCount() == 1);
  236. }
  237. virtual const MediaProcessType getType() const {
  238. return MediaProcessTypeJoin;
  239. }
  240. virtual bool produce() {
  241. throw std::runtime_error("not support.");
  242. }
  243. };
  244. class BaseMediaProcessSplit : public BaseMediaProcess {
  245. public:
  246. BaseMediaProcessSplit() {}
  247. template<typename...Args>
  248. explicit BaseMediaProcessSplit(Args...args): BaseMediaProcess(args...) {
  249. assert(getInputCount() == 1);
  250. }
  251. virtual const MediaProcessType getType() const {
  252. return MediaProcessTypeSplit;
  253. }
  254. virtual bool produce() {
  255. throw std::runtime_error("not support.");
  256. }
  257. };
  258. class BaseMediaProcessMultiplex : public BaseMediaProcess {
  259. public:
  260. BaseMediaProcessMultiplex() {}
  261. template<typename...Args>
  262. explicit BaseMediaProcessMultiplex(Args...args): BaseMediaProcess(args...) {
  263. }
  264. virtual const MediaProcessType getType() const {
  265. return MediaProcessTypeMultiplex;
  266. }
  267. virtual bool produce() {
  268. throw std::runtime_error("not support.");
  269. }
  270. };
  271. // base 1 output generator
  272. class BaseMediaProcessProducer: public BaseMediaProcess {
  273. public:
  274. BaseMediaProcessProducer() {}
  275. template<typename...Args>
  276. explicit BaseMediaProcessProducer(Args...args): BaseMediaProcess(args...) {
  277. assert(getInputCount() == 0);
  278. }
  279. virtual const MediaProcessType getType() const {
  280. return MediaProcessTypeProducer;
  281. }
  282. virtual void input(const std::string &name, const std::shared_ptr<BaseMediaElement> &mediaElement) {
  283. throw std::runtime_error("not support.");
  284. }
  285. virtual bool produce() {
  286. return false;
  287. }
  288. };
  289. class BaseMediaProcessConsumer : public BaseMediaProcess {
  290. public:
  291. BaseMediaProcessConsumer() {}
  292. template<typename...Args>
  293. explicit BaseMediaProcessConsumer(Args...args): BaseMediaProcess(args...) {
  294. assert(getOutputCount() == 0);
  295. }
  296. virtual const MediaProcessType getType() const {
  297. return MediaProcessTypeConsumer;
  298. }
  299. virtual bool produce() {
  300. throw std::runtime_error("not support.");
  301. }
  302. };
  303. class BaseMediaProcessRunloop : public BaseMediaProcess {
  304. public:
  305. BaseMediaProcessRunloop() {}
  306. template<typename...Args>
  307. explicit BaseMediaProcessRunloop(Args...args): BaseMediaProcess(args...) {
  308. assert(getInputCount() == 0);
  309. assert(getOutputCount() == 0);
  310. }
  311. virtual const MediaProcessType getType() const {
  312. return MediaProcessTypeRunloop;
  313. }
  314. virtual void run() {
  315. // start sub mps
  316. BaseMediaProcess::start();
  317. {
  318. boost::unique_lock<boost::mutex> lock(mutex_);
  319. running_ = true;
  320. }
  321. while (running_ && produce()) {
  322. }
  323. {
  324. boost::unique_lock<boost::mutex> lock(mutex_);
  325. running_ = false;
  326. }
  327. // call sub mps stop
  328. BaseMediaProcess::stop(stopGraceful_);
  329. // wait sun mps go exited
  330. BaseMediaProcess::wait();
  331. }
  332. virtual void start() {
  333. boost::unique_lock<boost::mutex> lock(mutex_);
  334. if (!running_) {
  335. boost::thread t(&BaseMediaProcessRunloop::run, this);
  336. proc_.swap(t);
  337. }
  338. }
  339. virtual void stop(bool graceful = true) {
  340. // stop self thread.
  341. stopGraceful_ = graceful;
  342. {
  343. boost::unique_lock<boost::mutex> lock(mutex_);
  344. if (!running_) {
  345. return;
  346. } else {
  347. running_ = false;
  348. }
  349. }
  350. if (proc_.joinable()) {
  351. proc_.join();
  352. }
  353. }
  354. protected:
  355. bool stopGraceful_ = true;
  356. boost::mutex mutex_;
  357. boost::thread proc_;
  358. bool running_ = false;
  359. };
  360. class BaseMediaProcessThreadedPipe : public BaseMediaProcessPipe {
  361. public:
  362. explicit BaseMediaProcessThreadedPipe(const uint8_t count = 1) : count_(count) {
  363. }
  364. ~BaseMediaProcessThreadedPipe() {
  365. stop(true);
  366. wait();
  367. }
  368. virtual const size_t getInputCount() const {
  369. return 1;
  370. }
  371. virtual const size_t getOutputCount() const {
  372. return 1;
  373. }
  374. virtual void input(const size_t &index, const std::shared_ptr<BaseMediaElement> &mediaElement) {
  375. boost::unique_lock<boost::mutex> lock(mutex_);
  376. while (running_) {
  377. if (me_) {
  378. // wait out event
  379. meCondOut_.wait(lock);
  380. }
  381. if ((!me_) && running_) {
  382. me_ = mediaElement;
  383. meCondIn_.notify_one();
  384. return;
  385. }
  386. }
  387. }
  388. virtual void process(const std::shared_ptr<BaseMediaElement> &mediaElement) {
  389. throw std::runtime_error("not impl.");
  390. }
  391. virtual void start() {
  392. reset();
  393. boost::unique_lock<boost::mutex> lock(mutex_);
  394. running_ = true;
  395. for (uint8_t i = 0; i < count_; ++i) {
  396. threads_.emplace_back(&BaseMediaProcessThreadedPipe::run_, this);
  397. }
  398. }
  399. virtual void stop(bool graceful = true) {
  400. boost::unique_lock<boost::mutex> lock(mutex_);
  401. stopGraceful_ = graceful;
  402. running_ = false;
  403. cond_.notify_all();
  404. meCondOut_.notify_all();
  405. meCondIn_.notify_all();
  406. }
  407. virtual void wait() {
  408. std::vector<boost::thread> ts;
  409. {
  410. boost::unique_lock<boost::mutex> lock(mutex_);
  411. ts.swap(threads_);
  412. }
  413. std::vector<boost::thread>::iterator tEnd = ts.end();
  414. std::vector<boost::thread>::iterator t = ts.begin();
  415. while (t != tEnd) {
  416. if (t->joinable()) {
  417. t->join();
  418. }
  419. ++t;
  420. }
  421. }
  422. virtual void reset() {
  423. stop(true);
  424. wait();
  425. assert(threads_.empty());
  426. boost::unique_lock<boost::mutex> lock(mutex_);
  427. me_ = nullptr;
  428. meCondOut_.notify_one();
  429. }
  430. private:
  431. void run_() {
  432. while (running_) {
  433. std::shared_ptr<BaseMediaElement> currMe(nullptr);
  434. // try pick a media-element from input.
  435. {
  436. boost::unique_lock<boost::mutex> lock(mutex_);
  437. if (!me_) {
  438. meCondIn_.wait(lock);
  439. }
  440. if (running_ && me_) {
  441. me_.swap(currMe);
  442. meCondOut_.notify_one();
  443. }
  444. }
  445. // run with post operation
  446. if (running_ && currMe) {
  447. process(currMe);
  448. // post output operation is single-thread.
  449. std::unique_lock<std::mutex> lock(postRunMutex_);
  450. if (running_) {
  451. if (outputHandlers_.find(0) != outputHandlers_.end()) {
  452. outputHandlers_[0](currMe);
  453. }
  454. }
  455. }
  456. }
  457. // last call for graceful exit.
  458. std::unique_lock<std::mutex> lock(postRunMutex_);
  459. if (stopGraceful_) {
  460. if (me_) {
  461. if (outputHandlers_.find(0) != outputHandlers_.end()) {
  462. outputHandlers_[0](me_);
  463. }
  464. me_ = nullptr;
  465. }
  466. }
  467. }
  468. protected:
  469. bool running_ = false;
  470. // thread count
  471. uint8_t count_;
  472. // global mutex
  473. boost::mutex mutex_;
  474. // can using wait for interrupt
  475. boost::condition_variable cond_;
  476. private:
  477. boost::condition_variable meCondIn_;
  478. boost::condition_variable meCondOut_;
  479. std::shared_ptr<BaseMediaElement> me_ = nullptr;
  480. std::vector<boost::thread> threads_;
  481. std::mutex postRunMutex_;
  482. bool stopGraceful_ = true;
  483. };
  484. class BaseMediaProcessCachePipe : public BaseMediaProcessPipe {
  485. public:
  486. explicit BaseMediaProcessCachePipe(const size_t lowLevel = 0, const size_t highLevel = SIZE_MAX) :
  487. BaseMediaProcessPipe(), lowLevel_(lowLevel), highLevel_(highLevel) {
  488. }
  489. ~BaseMediaProcessCachePipe() {
  490. stop();
  491. wait();
  492. }
  493. virtual const size_t getInputCount() const {
  494. return 1;
  495. }
  496. virtual const size_t getOutputCount() const {
  497. return 1;
  498. }
  499. virtual bool dealHighLevel(const std::shared_ptr<BaseMediaElement> &mediaElement) {
  500. return false;
  501. };
  502. virtual void input(const size_t &index, const std::shared_ptr<BaseMediaElement> &mediaElement) {
  503. boost::unique_lock<boost::mutex> lock(mutex_);
  504. while (running_) {
  505. while (cache_.size() >= highLevel_) {
  506. // block here
  507. if (dealHighLevel(mediaElement)) {
  508. return;
  509. }
  510. enterLowCond_.wait(lock);
  511. }
  512. if (running_ && (cache_.size() < highLevel_)) {
  513. cache_.insert(mediaElement);
  514. if (cache_.size() == 1) {
  515. // first
  516. firstCond_.notify_one();
  517. }
  518. return;
  519. }
  520. }
  521. }
  522. virtual void start() {
  523. reset();
  524. boost::unique_lock<boost::mutex> lock(mutex_);
  525. running_ = true;
  526. boost::thread t(&BaseMediaProcessCachePipe::run_, this);
  527. proc_.swap(t);
  528. }
  529. virtual void stop(bool graceful = true) {
  530. boost::unique_lock<boost::mutex> lock(mutex_);
  531. stopGraceful_ = graceful;
  532. running_ = false;
  533. enterLowCond_.notify_all();
  534. enterLowCond_.notify_all();
  535. firstCond_.notify_all();
  536. }
  537. virtual void wait() {
  538. if (proc_.joinable()) {
  539. proc_.join();
  540. }
  541. }
  542. virtual void reset() {
  543. stop(true);
  544. wait();
  545. boost::unique_lock<boost::mutex> lock(mutex_);
  546. cache_.clear();
  547. }
  548. private:
  549. void run_() {
  550. while (running_) {
  551. std::shared_ptr<BaseMediaElement> me = nullptr;
  552. // pick out
  553. {
  554. boost::unique_lock<boost::mutex> lock(mutex_);
  555. if (cache_.size() > 0) {
  556. std::set<std::shared_ptr<BaseMediaElement> >::iterator x = cache_.begin();
  557. me = *x;
  558. cache_.erase(x);
  559. if (cache_.size() == lowLevel_) {
  560. enterLowCond_.notify_one();
  561. }
  562. } else {
  563. // wait new till 1 second.
  564. firstCond_.wait_for(lock, boost::chrono::seconds(1));
  565. }
  566. }
  567. if (me) {
  568. if (outputHandlers_.find(0) != outputHandlers_.end()) {
  569. outputHandlers_[0](me);
  570. }
  571. }
  572. }
  573. if (stopGraceful_) {
  574. // process remain data.
  575. boost::unique_lock<boost::mutex> lock(mutex_);
  576. if (outputHandlers_.find(0) != outputHandlers_.end()) {
  577. for (auto me : cache_) {
  578. outputHandlers_[0](me);
  579. }
  580. }
  581. cache_.clear();
  582. }
  583. running_ = false;
  584. }
  585. private:
  586. bool running_ = false;
  587. boost::mutex mutex_;
  588. boost::condition_variable enterLowCond_;
  589. boost::condition_variable enterHighCond_;
  590. boost::condition_variable firstCond_;
  591. boost::thread proc_;
  592. size_t lowLevel_;
  593. size_t highLevel_;
  594. std::set<std::shared_ptr<BaseMediaElement> > cache_;
  595. bool stopGraceful_ = true;
  596. };
  597. } // namespace ia10k
  598. #endif // WISEOS_IAE_MP_MEDIA_PROCESS_H_
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注