@MiloXia
2015-03-23T10:38:54.000000Z
字数 13239
阅读 4046
并发
Fork&Join框架的论文:http://gee.cs.oswego.edu/dl/papers/fj.pdf
论文翻译:http://ifeve.com/a-java-fork-join-framework/
Fork/Join框架执行的任务有以下局限性:
Fork/Join框架的核心是由以下两个类:
ForkJoinPool:它实现ExecutorService接口和work-stealing算法。它管理工作线程和提供关于任务的状态和它们执行的信息。
ForkJoinTask: 它是将在ForkJoinPool中执行的任务的基类。它提供在任务中执行fork()和join()操作的机制,并且这两个方法控制任务的状态。通常, 为了实现你的Fork/Join任务,你将实现两个子类的子类的类:RecursiveAction对于没有返回结果的任务和RecursiveTask 对于返回结果的任务。
使用fork/join框架的第一步是编写执行一部分工作的代码。你的代码结构看起来应该与下面所示的伪代码类似:
if (当前这个任务工作量足够小)直接完成这个任务else将这个任务或这部分工作分解成两个部分分别触发(invoke)这两个子任务的执行,并等待结
你需要将这段代码包裹在一个ForkJoinTask的子类中。不过,通常情况下会使用一种更为具体的的类型,或者是RecursiveTask(会返回一个结果),或者是RecursiveAction。
当你的ForkJoinTask子类准备好了,创建一个代表所有需要完成工作的对象,然后将其作为参数传递给一个ForkJoinPool实例的invoke()方法即可。
Fork/Join框架(二)创建一个Fork/Join池 http://ifeve.com/fork-join-2/
Fork/Join框架(四)异步运行任务 http://ifeve.com/fork-join-4/
当你在ForkJoinPool中执行ForkJoinTask时,你可以使用同步或异步方式来实现。
当你使用同步方法,调用这些方法(比如:invokeAll()方法)的任务将被阻塞,直到提交给池的任务完成它的执行。这允许ForkJoinPool类使用work-stealing算法,分配一个新的任务给正在执行睡眠任务的工作线程。反之,当你使用异步方法(比如:fork()方法),这个任务将继续它的执行,所以ForkJoinPool类不能使用work-stealing算法来提高应用程序的性能。在这种情况下,只有当你调用join()或get()方法来等待任务的完成时,ForkJoinPool才能使用work-stealing算法。
http://www.molotang.com/articles/696.html
http://ifeve.com/talk-concurrency-forkjoin/
ActorSystem初始化时
//读取配置文件创建dispatchers(默认的配置文件只有一个dispatcher)val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))//返回默认的dispatcher (可在配置里更改)val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher
new Dispatchers(...)中
//读取配置文件val defaultDispatcherConfig: Config =idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId))
配置文件为:
default-dispatcher {executor=thread-pool-executor.type = "Dispatcher"executor = "default-executor"default-executor {#默认为forkjoinpool jdk7才支持fallback = "fork-join-executor"}# forkjoinpool默认线程数 max(min(cpu线程数 * 3.0, 64), 8)fork-join-executor {parallelism-min = 8parallelism-factor = 3.0parallelism-max = 64}......}
ActorSystem中
val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher
中的defaultGlobalDispatcher方法
def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)
lookup调用了lookupConfigurator(id).dispatcher()
private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {dispatcherConfigurators.get(id) match {case null ⇒// It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup.// That shouldn't happen often and in case it does the actual ExecutorService isn't// created until used, i.e. cheap.val newConfigurator =if (cachingConfig.hasPath(id)) configuratorFrom(config(id))else throw new ConfigurationException(s"Dispatcher [$id] not configured")dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {case null ⇒ newConfiguratorcase existing ⇒ existing}case existing ⇒ existing}}
主要是configuratorFrom方法
private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)cfg.getString("type") match {case "Dispatcher" ⇒ new DispatcherConfigurator(cfg, prerequisites)case "BalancingDispatcher" ⇒// FIXME remove this case in 2.4throw new IllegalArgumentException("BalancingDispatcher is deprecated, use a BalancingPool instead. " +"During a migration period you can still use BalancingDispatcher by specifying the full class name: " +classOf[BalancingDispatcherConfigurator].getName)case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites)case fqn ⇒val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({case exception ⇒throw new ConfigurationException(("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +"make sure it has constructor with [com.typesafe.config.Config] and " +"[akka.dispatch.DispatcherPrerequisites] parameters").format(fqn, cfg.getString("id")), exception)}).get}}
根据配置"type"为"Dispatcher" 则为new DispatcherConfigurator
DispatcherConfigurator类创建了
private val instance = new Dispatcher(this,config.getString("id"),config.getInt("throughput"),config.getNanosDuration("throughput-deadline-time"),configureExecutor(),config.getMillisDuration("shutdown-timeout"))/*** Returns the same dispatcher instance for each invocation*/override def dispatcher(): MessageDispatcher = instance
来看Dispatcher类 "throughput"和"throughput-deadline-time"干嘛用后面会说
这类定义了dispatch方法用来执行mailbox[后面会说到]还有createMailbox创建mailbox
其父类MessageDispatcher定义了将actor加入dispatcher的attach方法
new Dispatcher 类时的configureExecutor()方法创建了forkjoinpool
def configureExecutor(): ExecutorServiceConfigurator = {def configurator(executor: String): ExecutorServiceConfigurator = executor match {case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)case fqcn ⇒val args = List(classOf[Config] -> config,classOf[DispatcherPrerequisites] -> prerequisites)prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({case exception ⇒ throw new IllegalArgumentException(("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],make sure it has an accessible constructor with a [%s,%s] signature""").format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)}).get}config.getString("executor") match {case "default-executor" ⇒ new DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback")))case other ⇒ configurator(other)}}
根据配置直接走new ForkJoinExecutorConfigurator 看看这个类
class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,val parallelism: Int) extends ExecutorServiceFactory {def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing)}final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {val tf = threadFactory match {case m: MonitorableThreadFactory ⇒// add the dispatcher id to the thread namesm.withName(m.name + "-" + id)case other ⇒ other}new ForkJoinExecutorServiceFactory(validate(tf),ThreadPoolConfig.scaledPoolSize(config.getInt("parallelism-min"),config.getDouble("parallelism-factor"),config.getInt("parallelism-max")))}
new ForkJoinExecutorServiceFactory 提供了线程池的factory
读取了几个计算初始线程池大小的参数 会创建 AkkaForkJoinPool
继续回到configureExecutor()方法的
case "default-executor" ⇒ new DefaultExecutorServiceConfigurator
来看看DefaultExecutorServiceConfigurator
val provider: ExecutorServiceFactoryProvider =prerequisites.defaultExecutionContext match {case Some(ec) ⇒prerequisites.eventStream.publish(....)new AbstractExecutorService with ExecutorServiceFactory with ExecutorServiceFactoryProvider {def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = thisdef createExecutorService: ExecutorService = thisdef shutdown(): Unit = ()def isTerminated: Boolean = falsedef awaitTermination(timeout: Long, unit: TimeUnit): Boolean = falsedef shutdownNow(): ju.List[Runnable] = ju.Collections.emptyList()def execute(command: Runnable): Unit = ec.execute(command)def isShutdown: Boolean = false}case None ⇒ fallback}
provider被初始化为AbstractExecutorService的匿名实例 根据配置为ForkJoinPool
下面看看actorOf 创建Actor时的逻辑
try {val dispatcher = system.dispatchers.lookup(props2.dispatcher)val mailboxType = system.mailboxes.getMailboxType(props2, dispatcher.configurator.config)if (async) new RepointableActorRef(system, props2, dispatcher, mailboxType, supervisor, path).initialize(async)else new LocalActorRef(system, props2, dispatcher, mailboxType, supervisor, path)} catch {case NonFatal(e) ⇒ throw new ConfigurationException(s"configuration problem while creating [$path] with dispatcher [${props2.dispatcher}] and mailbox [${props2.mailbox}]", e)}
跟进任何一种ref 来看看RepointableActorRef
def newCell(old: UnstartedCell): Cell =new ActorCell(system, this, props, dispatcher, supervisor).init(sendSupervise = false, mailboxType)
ActorCell的init
final def init(sendSupervise: Boolean, mailboxType: MailboxType): this.type = {/** Create the mailbox and enqueue the Create() message to ensure that* this is processed before anything else.*/val mbox = dispatcher.createMailbox(this, mailboxType)/** The mailboxType was calculated taking into account what the MailboxType* has promised to produce. If that was more than the default, then we need* to reverify here because the dispatcher may well have screwed it up.*/// we need to delay the failure to the point of actor creation so we can handle// it properly in the normal wayval actorClass = props.actorClassval createMessage = mailboxType match {case _: ProducesMessageQueue[_] if system.mailboxes.hasRequiredType(actorClass) ⇒val req = system.mailboxes.getRequiredType(actorClass)if (req isInstance mbox.messageQueue) Create(None)else {val gotType = if (mbox.messageQueue == null) "null" else mbox.messageQueue.getClass.getNameCreate(Some(ActorInitializationException(self,s"Actor [$self] requires mailbox type [$req] got [$gotType]")))}case _ ⇒ Create(None)}swapMailbox(mbox)mailbox.setActor(this)// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅mailbox.systemEnqueue(self, createMessage)if (sendSupervise) {// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅parent.sendSystemMessage(akka.dispatch.sysmsg.Supervise(self, async = false))}this}
dispatcher.createMailbox(this, mailboxType) 创建mailbox
mailbox.setActor(this) mailbox注册到当前的actorCell中
当Cell start时
def start(): this.type = {// This call is expected to start off the actor by scheduling its mailbox.dispatcher.attach(this)this}
dispatcher.attach 这个actor
再来看看Mailbox的run方法
override final def run(): Unit = {try {if (!isClosed) { //Volatile read, needed hereprocessAllSystemMessages() //First, deal with any system messagesprocessMailbox() //Then deal with messages}} finally {setAsIdle() //Volatile write, needed heredispatcher.registerForExecution(this, false, false)}}
只看 processMailbox()
@tailrec private final def processMailbox(left: Int = java.lang.Math.max(dispatcher.throughput, 1),deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =if (shouldProcessMessage) {val next = dequeue()if (next ne null) {if (Mailbox.debug) println(actor.self + " processing message " + next)actor invoke nextif (Thread.interrupted())throw new InterruptedException("Interrupted while processing actor messages")processAllSystemMessages()if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))processMailbox(left - 1, deadlineNs)}}
throughput和throughputDeadlineTime在这里用到
ActorCell的父类中Dispatch 的sendMessage 方法定义了mailbox的执行
def sendMessage(msg: Envelope): Unit =try {if (system.settings.SerializeAllMessages) {val unwrapped = (msg.message match {case DeadLetter(wrapped, _, _) ⇒ wrappedcase other ⇒ other}).asInstanceOf[AnyRef]if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {val s = SerializationExtension(system)s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get}}dispatcher.dispatch(this, msg)} catch handleException
回到了dispatcher.dispatch(this, msg)
protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {val mbox = receiver.mailboxmbox.enqueue(receiver.self, invocation)registerForExecution(mbox, true, false)}
看一下registerForExecution
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no racesif (mbox.setAsScheduled()) {try {executorService execute mboxtrue} catch {......
看到 executorService execute mbox
而sendMessage就是!
在actor内部的receive的方法中给一个actor发消息
那么此时相当于ForkJoinWorkerThread在执行executorService execute mbox
此时看到代码
final class AkkaForkJoinPool(parallelism: Int,threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,unhandledExceptionHandler: Thread.UncaughtExceptionHandler)extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics {override def execute(r: Runnable): Unit = {if (r eq null) throw new NullPointerException("The Runnable must not be null")val task =if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[Any]]else new AkkaForkJoinTask(r)Thread.currentThread match {case worker: ForkJoinWorkerThread if worker.getPool eq this ⇒ task.fork()case _ ⇒ super.execute(task)}}def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()}
mailbox会被包成AkkaForkJoinTask
task.fork() 以为这加入当前工作线程的工作队列尾部