@lupnfer
2017-05-17T13:39:05.000000Z
字数 13725
阅读 643
未分类
在此输入正文
/*******************************************************************************Copyright (c) 2015, Uniview Tech, RD ISF. All rights reserved.--------------------------------------------------------------------------------$Id: media_process.h 3912 2016-10-24 11:57:31Z l03366 $Product: IA10KProject: IAECreated:Authors: jibin 00081Description: IA Engine Media Process--------------------------------------------------------------------------------ChangeLogDATE NAME DESCRIPTION--------------------------------------------------------------------------------2016-09-12 jibin initial*******************************************************************************/#ifndef WISEOS_IAE_MP_MEDIA_PROCESS_H_#define WISEOS_IAE_MP_MEDIA_PROCESS_H_#include <string>#include <set>#include <vector>#include <map>#include <cstddef>#include <iostream>#include <fstream>#include <mutex>#include "boost/thread.hpp"#include "boost/thread/condition_variable.hpp"#include "boost/thread/mutex.hpp"namespace wiseos {namespace ia10k {class BaseMediaElement;using MediaProcessType = enum {MediaProcessTypePipe = 1,MediaProcessTypeJoin = 2,MediaProcessTypeSplit = 3,MediaProcessTypeMultiplex = 4,MediaProcessTypeProducer = 5,MediaProcessTypeConsumer = 6,MediaProcessTypeRunloop = 7,};class MediaProcessInterface {public:MediaProcessInterface() {}virtual const MediaProcessType getType() const = 0;virtual const size_t getInputCount() const = 0;virtual const size_t getOutputCount() const = 0;virtual void input(const size_t &index, const std::shared_ptr<BaseMediaElement> &mediaElement) = 0;virtual void setOutputHandler(const size_t &index,std::function<void(std::shared_ptr<BaseMediaElement>)> outputHandler) = 0;// for generator only, return continue flag, if true continue, else break.virtual bool produce() = 0;// stop current produce/inputvirtual void stop(bool graceful) = 0;// init start operation if neededvirtual void start() = 0;// wait finishvirtual void wait() = 0;// reset to start status if neededvirtual void reset() = 0;// error handle for produce/input.virtual void setErrorHandler(std::function<bool(const std::exception &)> errorHandler) = 0;};class BaseMediaProcess : public MediaProcessInterface {public:BaseMediaProcess() : errorHandler_(nullptr), generator_(nullptr) {}template<typename...Args>explicit BaseMediaProcess(Args... args) :BaseMediaProcess() {init(0, args...);}virtual const size_t getInputCount() const {return inputCount_;}virtual const size_t getOutputCount() const {return outputCount_;}virtual void setErrorHandler(std::function<bool(const std::exception &)> errorHandler) {errorHandler_ = errorHandler;}virtual void input(const size_t &index, const std::shared_ptr<BaseMediaElement> &mediaElement) {return inputHandlers_[index](mediaElement);}virtual void setOutputHandler(const size_t &index,std::function<void(std::shared_ptr<BaseMediaElement>)> outputHandler) {outputHandlers_[index] = outputHandler;}virtual bool produce() {if (generator_) {return generator_();}else {throw std::runtime_error("not impl.");}}virtual void stop(bool graceful = true) {auto it = mps_.begin();auto itEnd = mps_.end();while (it != itEnd) {it->get()->stop(graceful);if (graceful) {it->get()->wait();}++it;}}virtual void start() {auto it = mps_.rbegin();auto itEnd = mps_.rend();while (it != itEnd) {it->get()->start();++it;}}virtual void wait() {auto it = mps_.rbegin();auto itEnd = mps_.rend();while (it != itEnd) {it->get()->wait();++it;}}virtual void reset() {auto it = mps_.begin();auto itEnd = mps_.end();while (it != itEnd) {it->get()->reset();++it;}}private:template<typename...Args>void init(size_t level, std::shared_ptr<BaseMediaProcess> mp, Args... args) {if (level == 0) {// check if start with a generatorif (mp->getInputCount() == 0) {BaseMediaProcess *mpPtr = mp.get();generator_ = [mpPtr]() -> bool {return mpPtr->produce();};}}std::vector<std::shared_ptr<BaseMediaProcess> > mps({ mp });return init(level, mps, args ...);}template<typename...Args>void init(size_t level, std::vector<std::shared_ptr<BaseMediaProcess> > mps, Args... args) {if (level == 0) {// collect inputfor (auto mp : mps) {// hold it in memory.mps_.emplace_back(mp);BaseMediaProcess *mpPtr = mp.get();size_t count = mp->getInputCount();for (size_t i = 0; i < count; ++i) {inputHandlers_[inputCount_] = [mpPtr, i](std::shared_ptr<BaseMediaElement> me) -> void {return mpPtr->input(i, me);};++inputCount_;}}}else {// prev.output -> curr.inputstd::vector<std::function<void(std::shared_ptr<BaseMediaElement>)>> funcs;for (auto mp : mps) {// hold it in memorymps_.emplace_back(mp);BaseMediaProcess *mpPtr = mp.get();size_t count = mp->getInputCount();for (size_t i = 0; i < count; ++i) {funcs.emplace_back([mpPtr, i](std::shared_ptr<BaseMediaElement> me) -> void {mpPtr->input(i, me);});}}if (funcs.size() != prevOutputCount_) {throw(std::runtime_error("previous output not match current input."));}size_t j = 0;for (auto mp : mpsPrev_) {size_t count = mp->getOutputCount();for (size_t i = 0; i < count; ++i) {mp->setOutputHandler(i, funcs[j]);++j;}}}// save to prevmpsPrev_.clear();prevOutputCount_ = 0;for (auto mp : mps) {mpsPrev_.emplace_back(mp);prevOutputCount_ += mp->getOutputCount();}return init(level + 1, args...);}template<typename...Args>void init(size_t level) {// lastsize_t j = 0;for (auto mp : mpsPrev_) {size_t count = mp->getOutputCount();outputCount_ += count;for (size_t i = 0; i < count; ++i) {mp->setOutputHandler(i, [this, j](std::shared_ptr<BaseMediaElement> me) -> void {if (this->outputHandlers_.find(j) != this->outputHandlers_.end()) {this->outputHandlers_[j](me);}});++j;}}}protected:size_t inputCount_ = 0;size_t outputCount_ = 0;std::vector<std::shared_ptr<BaseMediaProcess> > mpsPrev_;size_t prevOutputCount_ = 0;std::vector<std::shared_ptr<BaseMediaProcess> > mps_;std::function<bool(const std::exception &)> errorHandler_;std::map<size_t, std::function<void(std::shared_ptr<BaseMediaElement>)> > outputHandlers_;// input handler can only changed in self or derived class.std::map<size_t, std::function<void(std::shared_ptr<BaseMediaElement>)> > inputHandlers_;// generator proxystd::function<bool()> generator_;};class BaseMediaProcessPipe : public BaseMediaProcess {public:BaseMediaProcessPipe() {}template<typename...Args>explicit BaseMediaProcessPipe(Args...args) : BaseMediaProcess(args...) {assert(getOutputCount() == 1);assert(getInputCount() == 1);}virtual const MediaProcessType getType() const {return MediaProcessTypePipe;}virtual bool produce() {throw std::runtime_error("not support.");}};class BaseMediaProcessJoin : public BaseMediaProcess {public:BaseMediaProcessJoin() {}template<typename...Args>explicit BaseMediaProcessJoin(Args...args) : BaseMediaProcess(args...) {assert(getOutputCount() == 1);}virtual const MediaProcessType getType() const {return MediaProcessTypeJoin;}virtual bool produce() {throw std::runtime_error("not support.");}};class BaseMediaProcessSplit : public BaseMediaProcess {public:BaseMediaProcessSplit() {}template<typename...Args>explicit BaseMediaProcessSplit(Args...args) : BaseMediaProcess(args...) {assert(getInputCount() == 1);}virtual const MediaProcessType getType() const {return MediaProcessTypeSplit;}virtual bool produce() {throw std::runtime_error("not support.");}};class BaseMediaProcessMultiplex : public BaseMediaProcess {public:BaseMediaProcessMultiplex() {}template<typename...Args>explicit BaseMediaProcessMultiplex(Args...args) : BaseMediaProcess(args...) {}virtual const MediaProcessType getType() const {return MediaProcessTypeMultiplex;}virtual bool produce() {throw std::runtime_error("not support.");}};// base 1 output generatorclass BaseMediaProcessProducer : public BaseMediaProcess {public:BaseMediaProcessProducer() {}template<typename...Args>explicit BaseMediaProcessProducer(Args...args) : BaseMediaProcess(args...) {assert(getInputCount() == 0);}virtual const MediaProcessType getType() const {return MediaProcessTypeProducer;}virtual void input(const std::string &name, const std::shared_ptr<BaseMediaElement> &mediaElement) {throw std::runtime_error("not support.");}virtual bool produce() {return false;}};class BaseMediaProcessConsumer : public BaseMediaProcess {public:BaseMediaProcessConsumer() {}template<typename...Args>explicit BaseMediaProcessConsumer(Args...args) : BaseMediaProcess(args...) {assert(getOutputCount() == 0);}virtual const MediaProcessType getType() const {return MediaProcessTypeConsumer;}virtual bool produce() {throw std::runtime_error("not support.");}};class BaseMediaProcessRunloop : public BaseMediaProcess {public:BaseMediaProcessRunloop() {}template<typename...Args>explicit BaseMediaProcessRunloop(Args...args) : BaseMediaProcess(args...) {assert(getInputCount() == 0);assert(getOutputCount() == 0);}virtual const MediaProcessType getType() const {return MediaProcessTypeRunloop;}virtual void run() {// start sub mpsBaseMediaProcess::start();{boost::unique_lock<boost::mutex> lock(mutex_);running_ = true;}while (running_ && produce()) {}{boost::unique_lock<boost::mutex> lock(mutex_);running_ = false;}// call sub mps stopBaseMediaProcess::stop(stopGraceful_);// wait sun mps go exitedBaseMediaProcess::wait();}virtual void start() {boost::unique_lock<boost::mutex> lock(mutex_);if (!running_) {boost::thread t(&BaseMediaProcessRunloop::run, this);proc_.swap(t);}}virtual void stop(bool graceful = true) {// stop self thread.stopGraceful_ = graceful;{boost::unique_lock<boost::mutex> lock(mutex_);if (!running_) {return;}else {running_ = false;}}if (proc_.joinable()) {proc_.join();}}protected:bool stopGraceful_ = true;boost::mutex mutex_;boost::thread proc_;bool running_ = false;};class BaseMediaProcessThreadedPipe : public BaseMediaProcessPipe {public:explicit BaseMediaProcessThreadedPipe(const uint8_t count = 1) : count_(count) {}~BaseMediaProcessThreadedPipe() {stop(true);wait();}virtual const size_t getInputCount() const {return 1;}virtual const size_t getOutputCount() const {return 1;}virtual void input(const size_t &index, const std::shared_ptr<BaseMediaElement> &mediaElement) {boost::unique_lock<boost::mutex> lock(mutex_);while (running_) {if (me_) {// wait out eventmeCondOut_.wait(lock);}if ((!me_) && running_) {me_ = mediaElement;meCondIn_.notify_one();return;}}}virtual void process(const std::shared_ptr<BaseMediaElement> &mediaElement) {throw std::runtime_error("not impl.");}virtual void start() {reset();boost::unique_lock<boost::mutex> lock(mutex_);running_ = true;for (uint8_t i = 0; i < count_; ++i) {threads_.emplace_back(&BaseMediaProcessThreadedPipe::run_, this);}}virtual void stop(bool graceful = true) {boost::unique_lock<boost::mutex> lock(mutex_);stopGraceful_ = graceful;running_ = false;cond_.notify_all();meCondOut_.notify_all();meCondIn_.notify_all();}virtual void wait() {std::vector<boost::thread> ts;{boost::unique_lock<boost::mutex> lock(mutex_);ts.swap(threads_);}std::vector<boost::thread>::iterator tEnd = ts.end();std::vector<boost::thread>::iterator t = ts.begin();while (t != tEnd) {if (t->joinable()) {t->join();}++t;}}virtual void reset() {stop(true);wait();assert(threads_.empty());boost::unique_lock<boost::mutex> lock(mutex_);me_ = nullptr;meCondOut_.notify_one();}private:void run_() {while (running_) {std::shared_ptr<BaseMediaElement> currMe(nullptr);// try pick a media-element from input.{boost::unique_lock<boost::mutex> lock(mutex_);if (!me_) {meCondIn_.wait(lock);}if (running_ && me_) {me_.swap(currMe);meCondOut_.notify_one();}}// run with post operationif (running_ && currMe) {process(currMe);// post output operation is single-thread.std::unique_lock<std::mutex> lock(postRunMutex_);if (running_) {if (outputHandlers_.find(0) != outputHandlers_.end()) {outputHandlers_[0](currMe);}}}}// last call for graceful exit.std::unique_lock<std::mutex> lock(postRunMutex_);if (stopGraceful_) {if (me_) {if (outputHandlers_.find(0) != outputHandlers_.end()) {outputHandlers_[0](me_);}me_ = nullptr;}}}protected:bool running_ = false;// thread countuint8_t count_;// global mutexboost::mutex mutex_;// can using wait for interruptboost::condition_variable cond_;private:boost::condition_variable meCondIn_;boost::condition_variable meCondOut_;std::shared_ptr<BaseMediaElement> me_ = nullptr;std::vector<boost::thread> threads_;std::mutex postRunMutex_;bool stopGraceful_ = true;};class BaseMediaProcessCachePipe : public BaseMediaProcessPipe {public:explicit BaseMediaProcessCachePipe(const size_t lowLevel = 0, const size_t highLevel = SIZE_MAX) :BaseMediaProcessPipe(), lowLevel_(lowLevel), highLevel_(highLevel) {}~BaseMediaProcessCachePipe() {stop();wait();}virtual const size_t getInputCount() const {return 1;}virtual const size_t getOutputCount() const {return 1;}virtual bool dealHighLevel(const std::shared_ptr<BaseMediaElement> &mediaElement) {return false;};virtual void input(const size_t &index, const std::shared_ptr<BaseMediaElement> &mediaElement) {boost::unique_lock<boost::mutex> lock(mutex_);while (running_) {while (cache_.size() >= highLevel_) {// block hereif (dealHighLevel(mediaElement)) {return;}enterLowCond_.wait(lock);}if (running_ && (cache_.size() < highLevel_)) {cache_.insert(mediaElement);if (cache_.size() == 1) {// firstfirstCond_.notify_one();}return;}}}virtual void start() {reset();boost::unique_lock<boost::mutex> lock(mutex_);running_ = true;boost::thread t(&BaseMediaProcessCachePipe::run_, this);proc_.swap(t);}virtual void stop(bool graceful = true) {boost::unique_lock<boost::mutex> lock(mutex_);stopGraceful_ = graceful;running_ = false;enterLowCond_.notify_all();enterLowCond_.notify_all();firstCond_.notify_all();}virtual void wait() {if (proc_.joinable()) {proc_.join();}}virtual void reset() {stop(true);wait();boost::unique_lock<boost::mutex> lock(mutex_);cache_.clear();}private:void run_() {while (running_) {std::shared_ptr<BaseMediaElement> me = nullptr;// pick out{boost::unique_lock<boost::mutex> lock(mutex_);if (cache_.size() > 0) {std::set<std::shared_ptr<BaseMediaElement> >::iterator x = cache_.begin();me = *x;cache_.erase(x);if (cache_.size() == lowLevel_) {enterLowCond_.notify_one();}}else {// wait new till 1 second.firstCond_.wait_for(lock, boost::chrono::seconds(1));}}if (me) {if (outputHandlers_.find(0) != outputHandlers_.end()) {outputHandlers_[0](me);}}}if (stopGraceful_) {// process remain data.boost::unique_lock<boost::mutex> lock(mutex_);if (outputHandlers_.find(0) != outputHandlers_.end()) {for (auto me : cache_) {outputHandlers_[0](me);}}cache_.clear();}running_ = false;}private:bool running_ = false;boost::mutex mutex_;boost::condition_variable enterLowCond_;boost::condition_variable enterHighCond_;boost::condition_variable firstCond_;boost::thread proc_;size_t lowLevel_;size_t highLevel_;std::set<std::shared_ptr<BaseMediaElement> > cache_;bool stopGraceful_ = true;};} // namespace ia10k}#endif // WISEOS_IAE_MP_MEDIA_PROCESS_H_