[关闭]
@lupnfer 2017-05-17T13:39:05.000000Z 字数 13725 阅读 570

在此处输入标题

未分类


在此输入正文

  1. /*******************************************************************************
  2. Copyright (c) 2015, Uniview Tech, RD ISF. All rights reserved.
  3. --------------------------------------------------------------------------------
  4. $Id: media_process.h 3912 2016-10-24 11:57:31Z l03366 $
  5. Product: IA10K
  6. Project: IAE
  7. Created:
  8. Authors: jibin 00081
  9. Description: IA Engine Media Process
  10. --------------------------------------------------------------------------------
  11. ChangeLog
  12. DATE NAME DESCRIPTION
  13. --------------------------------------------------------------------------------
  14. 2016-09-12 jibin initial
  15. *******************************************************************************/
  16. #ifndef WISEOS_IAE_MP_MEDIA_PROCESS_H_
  17. #define WISEOS_IAE_MP_MEDIA_PROCESS_H_
  18. #include <string>
  19. #include <set>
  20. #include <vector>
  21. #include <map>
  22. #include <cstddef>
  23. #include <iostream>
  24. #include <fstream>
  25. #include <mutex>
  26. #include "boost/thread.hpp"
  27. #include "boost/thread/condition_variable.hpp"
  28. #include "boost/thread/mutex.hpp"
  29. namespace wiseos {
  30. namespace ia10k {
  31. class BaseMediaElement;
  32. using MediaProcessType = enum {
  33. MediaProcessTypePipe = 1,
  34. MediaProcessTypeJoin = 2,
  35. MediaProcessTypeSplit = 3,
  36. MediaProcessTypeMultiplex = 4,
  37. MediaProcessTypeProducer = 5,
  38. MediaProcessTypeConsumer = 6,
  39. MediaProcessTypeRunloop = 7,
  40. };
  41. class MediaProcessInterface {
  42. public:
  43. MediaProcessInterface() {}
  44. virtual const MediaProcessType getType() const = 0;
  45. virtual const size_t getInputCount() const = 0;
  46. virtual const size_t getOutputCount() const = 0;
  47. virtual void input(const size_t &index, const std::shared_ptr<BaseMediaElement> &mediaElement) = 0;
  48. virtual void setOutputHandler(const size_t &index,
  49. std::function<void(std::shared_ptr<BaseMediaElement>)> outputHandler) = 0;
  50. // for generator only, return continue flag, if true continue, else break.
  51. virtual bool produce() = 0;
  52. // stop current produce/input
  53. virtual void stop(bool graceful) = 0;
  54. // init start operation if needed
  55. virtual void start() = 0;
  56. // wait finish
  57. virtual void wait() = 0;
  58. // reset to start status if needed
  59. virtual void reset() = 0;
  60. // error handle for produce/input.
  61. virtual void setErrorHandler(std::function<bool(const std::exception &)> errorHandler) = 0;
  62. };
  63. class BaseMediaProcess : public MediaProcessInterface {
  64. public:
  65. BaseMediaProcess() : errorHandler_(nullptr), generator_(nullptr) {}
  66. template<typename...Args>
  67. explicit BaseMediaProcess(Args... args) :BaseMediaProcess() {
  68. init(0, args...);
  69. }
  70. virtual const size_t getInputCount() const {
  71. return inputCount_;
  72. }
  73. virtual const size_t getOutputCount() const {
  74. return outputCount_;
  75. }
  76. virtual void setErrorHandler(std::function<bool(const std::exception &)> errorHandler) {
  77. errorHandler_ = errorHandler;
  78. }
  79. virtual void input(const size_t &index, const std::shared_ptr<BaseMediaElement> &mediaElement) {
  80. return inputHandlers_[index](mediaElement);
  81. }
  82. virtual void setOutputHandler(const size_t &index,
  83. std::function<void(std::shared_ptr<BaseMediaElement>)> outputHandler) {
  84. outputHandlers_[index] = outputHandler;
  85. }
  86. virtual bool produce() {
  87. if (generator_) {
  88. return generator_();
  89. }
  90. else {
  91. throw std::runtime_error("not impl.");
  92. }
  93. }
  94. virtual void stop(bool graceful = true) {
  95. auto it = mps_.begin();
  96. auto itEnd = mps_.end();
  97. while (it != itEnd) {
  98. it->get()->stop(graceful);
  99. if (graceful) {
  100. it->get()->wait();
  101. }
  102. ++it;
  103. }
  104. }
  105. virtual void start() {
  106. auto it = mps_.rbegin();
  107. auto itEnd = mps_.rend();
  108. while (it != itEnd) {
  109. it->get()->start();
  110. ++it;
  111. }
  112. }
  113. virtual void wait() {
  114. auto it = mps_.rbegin();
  115. auto itEnd = mps_.rend();
  116. while (it != itEnd) {
  117. it->get()->wait();
  118. ++it;
  119. }
  120. }
  121. virtual void reset() {
  122. auto it = mps_.begin();
  123. auto itEnd = mps_.end();
  124. while (it != itEnd) {
  125. it->get()->reset();
  126. ++it;
  127. }
  128. }
  129. private:
  130. template<typename...Args>
  131. void init(size_t level, std::shared_ptr<BaseMediaProcess> mp, Args... args) {
  132. if (level == 0) {
  133. // check if start with a generator
  134. if (mp->getInputCount() == 0) {
  135. BaseMediaProcess *mpPtr = mp.get();
  136. generator_ = [mpPtr]() -> bool {
  137. return mpPtr->produce();
  138. };
  139. }
  140. }
  141. std::vector<std::shared_ptr<BaseMediaProcess> > mps({ mp });
  142. return init(level, mps, args ...);
  143. }
  144. template<typename...Args>
  145. void init(size_t level, std::vector<std::shared_ptr<BaseMediaProcess> > mps, Args... args) {
  146. if (level == 0) {
  147. // collect input
  148. for (auto mp : mps) {
  149. // hold it in memory.
  150. mps_.emplace_back(mp);
  151. BaseMediaProcess *mpPtr = mp.get();
  152. size_t count = mp->getInputCount();
  153. for (size_t i = 0; i < count; ++i) {
  154. inputHandlers_[inputCount_] = [mpPtr, i](std::shared_ptr<BaseMediaElement> me) -> void {
  155. return mpPtr->input(i, me);
  156. };
  157. ++inputCount_;
  158. }
  159. }
  160. }
  161. else {
  162. // prev.output -> curr.input
  163. std::vector<std::function<void(std::shared_ptr<BaseMediaElement>)>> funcs;
  164. for (auto mp : mps) {
  165. // hold it in memory
  166. mps_.emplace_back(mp);
  167. BaseMediaProcess *mpPtr = mp.get();
  168. size_t count = mp->getInputCount();
  169. for (size_t i = 0; i < count; ++i) {
  170. funcs.emplace_back([mpPtr, i](std::shared_ptr<BaseMediaElement> me) -> void {
  171. mpPtr->input(i, me);
  172. });
  173. }
  174. }
  175. if (funcs.size() != prevOutputCount_) {
  176. throw(std::runtime_error("previous output not match current input."));
  177. }
  178. size_t j = 0;
  179. for (auto mp : mpsPrev_) {
  180. size_t count = mp->getOutputCount();
  181. for (size_t i = 0; i < count; ++i) {
  182. mp->setOutputHandler(i, funcs[j]);
  183. ++j;
  184. }
  185. }
  186. }
  187. // save to prev
  188. mpsPrev_.clear();
  189. prevOutputCount_ = 0;
  190. for (auto mp : mps) {
  191. mpsPrev_.emplace_back(mp);
  192. prevOutputCount_ += mp->getOutputCount();
  193. }
  194. return init(level + 1, args...);
  195. }
  196. template<typename...Args>
  197. void init(size_t level) {
  198. // last
  199. size_t j = 0;
  200. for (auto mp : mpsPrev_) {
  201. size_t count = mp->getOutputCount();
  202. outputCount_ += count;
  203. for (size_t i = 0; i < count; ++i) {
  204. mp->setOutputHandler(i, [this, j](std::shared_ptr<BaseMediaElement> me) -> void {
  205. if (this->outputHandlers_.find(j) != this->outputHandlers_.end()) {
  206. this->outputHandlers_[j](me);
  207. }
  208. });
  209. ++j;
  210. }
  211. }
  212. }
  213. protected:
  214. size_t inputCount_ = 0;
  215. size_t outputCount_ = 0;
  216. std::vector<std::shared_ptr<BaseMediaProcess> > mpsPrev_;
  217. size_t prevOutputCount_ = 0;
  218. std::vector<std::shared_ptr<BaseMediaProcess> > mps_;
  219. std::function<bool(const std::exception &)> errorHandler_;
  220. std::map<size_t, std::function<void(std::shared_ptr<BaseMediaElement>)> > outputHandlers_;
  221. // input handler can only changed in self or derived class.
  222. std::map<size_t, std::function<void(std::shared_ptr<BaseMediaElement>)> > inputHandlers_;
  223. // generator proxy
  224. std::function<bool()> generator_;
  225. };
  226. class BaseMediaProcessPipe : public BaseMediaProcess {
  227. public:
  228. BaseMediaProcessPipe() {}
  229. template<typename...Args>
  230. explicit BaseMediaProcessPipe(Args...args) : BaseMediaProcess(args...) {
  231. assert(getOutputCount() == 1);
  232. assert(getInputCount() == 1);
  233. }
  234. virtual const MediaProcessType getType() const {
  235. return MediaProcessTypePipe;
  236. }
  237. virtual bool produce() {
  238. throw std::runtime_error("not support.");
  239. }
  240. };
  241. class BaseMediaProcessJoin : public BaseMediaProcess {
  242. public:
  243. BaseMediaProcessJoin() {}
  244. template<typename...Args>
  245. explicit BaseMediaProcessJoin(Args...args) : BaseMediaProcess(args...) {
  246. assert(getOutputCount() == 1);
  247. }
  248. virtual const MediaProcessType getType() const {
  249. return MediaProcessTypeJoin;
  250. }
  251. virtual bool produce() {
  252. throw std::runtime_error("not support.");
  253. }
  254. };
  255. class BaseMediaProcessSplit : public BaseMediaProcess {
  256. public:
  257. BaseMediaProcessSplit() {}
  258. template<typename...Args>
  259. explicit BaseMediaProcessSplit(Args...args) : BaseMediaProcess(args...) {
  260. assert(getInputCount() == 1);
  261. }
  262. virtual const MediaProcessType getType() const {
  263. return MediaProcessTypeSplit;
  264. }
  265. virtual bool produce() {
  266. throw std::runtime_error("not support.");
  267. }
  268. };
  269. class BaseMediaProcessMultiplex : public BaseMediaProcess {
  270. public:
  271. BaseMediaProcessMultiplex() {}
  272. template<typename...Args>
  273. explicit BaseMediaProcessMultiplex(Args...args) : BaseMediaProcess(args...) {
  274. }
  275. virtual const MediaProcessType getType() const {
  276. return MediaProcessTypeMultiplex;
  277. }
  278. virtual bool produce() {
  279. throw std::runtime_error("not support.");
  280. }
  281. };
  282. // base 1 output generator
  283. class BaseMediaProcessProducer : public BaseMediaProcess {
  284. public:
  285. BaseMediaProcessProducer() {}
  286. template<typename...Args>
  287. explicit BaseMediaProcessProducer(Args...args) : BaseMediaProcess(args...) {
  288. assert(getInputCount() == 0);
  289. }
  290. virtual const MediaProcessType getType() const {
  291. return MediaProcessTypeProducer;
  292. }
  293. virtual void input(const std::string &name, const std::shared_ptr<BaseMediaElement> &mediaElement) {
  294. throw std::runtime_error("not support.");
  295. }
  296. virtual bool produce() {
  297. return false;
  298. }
  299. };
  300. class BaseMediaProcessConsumer : public BaseMediaProcess {
  301. public:
  302. BaseMediaProcessConsumer() {}
  303. template<typename...Args>
  304. explicit BaseMediaProcessConsumer(Args...args) : BaseMediaProcess(args...) {
  305. assert(getOutputCount() == 0);
  306. }
  307. virtual const MediaProcessType getType() const {
  308. return MediaProcessTypeConsumer;
  309. }
  310. virtual bool produce() {
  311. throw std::runtime_error("not support.");
  312. }
  313. };
  314. class BaseMediaProcessRunloop : public BaseMediaProcess {
  315. public:
  316. BaseMediaProcessRunloop() {}
  317. template<typename...Args>
  318. explicit BaseMediaProcessRunloop(Args...args) : BaseMediaProcess(args...) {
  319. assert(getInputCount() == 0);
  320. assert(getOutputCount() == 0);
  321. }
  322. virtual const MediaProcessType getType() const {
  323. return MediaProcessTypeRunloop;
  324. }
  325. virtual void run() {
  326. // start sub mps
  327. BaseMediaProcess::start();
  328. {
  329. boost::unique_lock<boost::mutex> lock(mutex_);
  330. running_ = true;
  331. }
  332. while (running_ && produce()) {
  333. }
  334. {
  335. boost::unique_lock<boost::mutex> lock(mutex_);
  336. running_ = false;
  337. }
  338. // call sub mps stop
  339. BaseMediaProcess::stop(stopGraceful_);
  340. // wait sun mps go exited
  341. BaseMediaProcess::wait();
  342. }
  343. virtual void start() {
  344. boost::unique_lock<boost::mutex> lock(mutex_);
  345. if (!running_) {
  346. boost::thread t(&BaseMediaProcessRunloop::run, this);
  347. proc_.swap(t);
  348. }
  349. }
  350. virtual void stop(bool graceful = true) {
  351. // stop self thread.
  352. stopGraceful_ = graceful;
  353. {
  354. boost::unique_lock<boost::mutex> lock(mutex_);
  355. if (!running_) {
  356. return;
  357. }
  358. else {
  359. running_ = false;
  360. }
  361. }
  362. if (proc_.joinable()) {
  363. proc_.join();
  364. }
  365. }
  366. protected:
  367. bool stopGraceful_ = true;
  368. boost::mutex mutex_;
  369. boost::thread proc_;
  370. bool running_ = false;
  371. };
  372. class BaseMediaProcessThreadedPipe : public BaseMediaProcessPipe {
  373. public:
  374. explicit BaseMediaProcessThreadedPipe(const uint8_t count = 1) : count_(count) {
  375. }
  376. ~BaseMediaProcessThreadedPipe() {
  377. stop(true);
  378. wait();
  379. }
  380. virtual const size_t getInputCount() const {
  381. return 1;
  382. }
  383. virtual const size_t getOutputCount() const {
  384. return 1;
  385. }
  386. virtual void input(const size_t &index, const std::shared_ptr<BaseMediaElement> &mediaElement) {
  387. boost::unique_lock<boost::mutex> lock(mutex_);
  388. while (running_) {
  389. if (me_) {
  390. // wait out event
  391. meCondOut_.wait(lock);
  392. }
  393. if ((!me_) && running_) {
  394. me_ = mediaElement;
  395. meCondIn_.notify_one();
  396. return;
  397. }
  398. }
  399. }
  400. virtual void process(const std::shared_ptr<BaseMediaElement> &mediaElement) {
  401. throw std::runtime_error("not impl.");
  402. }
  403. virtual void start() {
  404. reset();
  405. boost::unique_lock<boost::mutex> lock(mutex_);
  406. running_ = true;
  407. for (uint8_t i = 0; i < count_; ++i) {
  408. threads_.emplace_back(&BaseMediaProcessThreadedPipe::run_, this);
  409. }
  410. }
  411. virtual void stop(bool graceful = true) {
  412. boost::unique_lock<boost::mutex> lock(mutex_);
  413. stopGraceful_ = graceful;
  414. running_ = false;
  415. cond_.notify_all();
  416. meCondOut_.notify_all();
  417. meCondIn_.notify_all();
  418. }
  419. virtual void wait() {
  420. std::vector<boost::thread> ts;
  421. {
  422. boost::unique_lock<boost::mutex> lock(mutex_);
  423. ts.swap(threads_);
  424. }
  425. std::vector<boost::thread>::iterator tEnd = ts.end();
  426. std::vector<boost::thread>::iterator t = ts.begin();
  427. while (t != tEnd) {
  428. if (t->joinable()) {
  429. t->join();
  430. }
  431. ++t;
  432. }
  433. }
  434. virtual void reset() {
  435. stop(true);
  436. wait();
  437. assert(threads_.empty());
  438. boost::unique_lock<boost::mutex> lock(mutex_);
  439. me_ = nullptr;
  440. meCondOut_.notify_one();
  441. }
  442. private:
  443. void run_() {
  444. while (running_) {
  445. std::shared_ptr<BaseMediaElement> currMe(nullptr);
  446. // try pick a media-element from input.
  447. {
  448. boost::unique_lock<boost::mutex> lock(mutex_);
  449. if (!me_) {
  450. meCondIn_.wait(lock);
  451. }
  452. if (running_ && me_) {
  453. me_.swap(currMe);
  454. meCondOut_.notify_one();
  455. }
  456. }
  457. // run with post operation
  458. if (running_ && currMe) {
  459. process(currMe);
  460. // post output operation is single-thread.
  461. std::unique_lock<std::mutex> lock(postRunMutex_);
  462. if (running_) {
  463. if (outputHandlers_.find(0) != outputHandlers_.end()) {
  464. outputHandlers_[0](currMe);
  465. }
  466. }
  467. }
  468. }
  469. // last call for graceful exit.
  470. std::unique_lock<std::mutex> lock(postRunMutex_);
  471. if (stopGraceful_) {
  472. if (me_) {
  473. if (outputHandlers_.find(0) != outputHandlers_.end()) {
  474. outputHandlers_[0](me_);
  475. }
  476. me_ = nullptr;
  477. }
  478. }
  479. }
  480. protected:
  481. bool running_ = false;
  482. // thread count
  483. uint8_t count_;
  484. // global mutex
  485. boost::mutex mutex_;
  486. // can using wait for interrupt
  487. boost::condition_variable cond_;
  488. private:
  489. boost::condition_variable meCondIn_;
  490. boost::condition_variable meCondOut_;
  491. std::shared_ptr<BaseMediaElement> me_ = nullptr;
  492. std::vector<boost::thread> threads_;
  493. std::mutex postRunMutex_;
  494. bool stopGraceful_ = true;
  495. };
  496. class BaseMediaProcessCachePipe : public BaseMediaProcessPipe {
  497. public:
  498. explicit BaseMediaProcessCachePipe(const size_t lowLevel = 0, const size_t highLevel = SIZE_MAX) :
  499. BaseMediaProcessPipe(), lowLevel_(lowLevel), highLevel_(highLevel) {
  500. }
  501. ~BaseMediaProcessCachePipe() {
  502. stop();
  503. wait();
  504. }
  505. virtual const size_t getInputCount() const {
  506. return 1;
  507. }
  508. virtual const size_t getOutputCount() const {
  509. return 1;
  510. }
  511. virtual bool dealHighLevel(const std::shared_ptr<BaseMediaElement> &mediaElement) {
  512. return false;
  513. };
  514. virtual void input(const size_t &index, const std::shared_ptr<BaseMediaElement> &mediaElement) {
  515. boost::unique_lock<boost::mutex> lock(mutex_);
  516. while (running_) {
  517. while (cache_.size() >= highLevel_) {
  518. // block here
  519. if (dealHighLevel(mediaElement)) {
  520. return;
  521. }
  522. enterLowCond_.wait(lock);
  523. }
  524. if (running_ && (cache_.size() < highLevel_)) {
  525. cache_.insert(mediaElement);
  526. if (cache_.size() == 1) {
  527. // first
  528. firstCond_.notify_one();
  529. }
  530. return;
  531. }
  532. }
  533. }
  534. virtual void start() {
  535. reset();
  536. boost::unique_lock<boost::mutex> lock(mutex_);
  537. running_ = true;
  538. boost::thread t(&BaseMediaProcessCachePipe::run_, this);
  539. proc_.swap(t);
  540. }
  541. virtual void stop(bool graceful = true) {
  542. boost::unique_lock<boost::mutex> lock(mutex_);
  543. stopGraceful_ = graceful;
  544. running_ = false;
  545. enterLowCond_.notify_all();
  546. enterLowCond_.notify_all();
  547. firstCond_.notify_all();
  548. }
  549. virtual void wait() {
  550. if (proc_.joinable()) {
  551. proc_.join();
  552. }
  553. }
  554. virtual void reset() {
  555. stop(true);
  556. wait();
  557. boost::unique_lock<boost::mutex> lock(mutex_);
  558. cache_.clear();
  559. }
  560. private:
  561. void run_() {
  562. while (running_) {
  563. std::shared_ptr<BaseMediaElement> me = nullptr;
  564. // pick out
  565. {
  566. boost::unique_lock<boost::mutex> lock(mutex_);
  567. if (cache_.size() > 0) {
  568. std::set<std::shared_ptr<BaseMediaElement> >::iterator x = cache_.begin();
  569. me = *x;
  570. cache_.erase(x);
  571. if (cache_.size() == lowLevel_) {
  572. enterLowCond_.notify_one();
  573. }
  574. }
  575. else {
  576. // wait new till 1 second.
  577. firstCond_.wait_for(lock, boost::chrono::seconds(1));
  578. }
  579. }
  580. if (me) {
  581. if (outputHandlers_.find(0) != outputHandlers_.end()) {
  582. outputHandlers_[0](me);
  583. }
  584. }
  585. }
  586. if (stopGraceful_) {
  587. // process remain data.
  588. boost::unique_lock<boost::mutex> lock(mutex_);
  589. if (outputHandlers_.find(0) != outputHandlers_.end()) {
  590. for (auto me : cache_) {
  591. outputHandlers_[0](me);
  592. }
  593. }
  594. cache_.clear();
  595. }
  596. running_ = false;
  597. }
  598. private:
  599. bool running_ = false;
  600. boost::mutex mutex_;
  601. boost::condition_variable enterLowCond_;
  602. boost::condition_variable enterHighCond_;
  603. boost::condition_variable firstCond_;
  604. boost::thread proc_;
  605. size_t lowLevel_;
  606. size_t highLevel_;
  607. std::set<std::shared_ptr<BaseMediaElement> > cache_;
  608. bool stopGraceful_ = true;
  609. };
  610. } // namespace ia10k
  611. }
  612. #endif // WISEOS_IAE_MP_MEDIA_PROCESS_H_
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注