[关闭]
@MiloXia 2015-03-23T10:38:54.000000Z 字数 13239 阅读 3713

Java Fork Join框架

并发


论文

Fork&Join框架的论文:http://gee.cs.oswego.edu/dl/papers/fj.pdf
论文翻译:http://ifeve.com/a-java-fork-join-framework/

Work−Stealing

局限性

Fork/Join框架执行的任务有以下局限性:

核心类

Fork/Join框架的核心是由以下两个类:

ForkJoinPool:它实现ExecutorService接口和work-stealing算法。它管理工作线程和提供关于任务的状态和它们执行的信息。
ForkJoinTask: 它是将在ForkJoinPool中执行的任务的基类。它提供在任务中执行fork()和join()操作的机制,并且这两个方法控制任务的状态。通常, 为了实现你的Fork/Join任务,你将实现两个子类的子类的类:RecursiveAction对于没有返回结果的任务和RecursiveTask 对于返回结果的任务。

基本使用方法

使用fork/join框架的第一步是编写执行一部分工作的代码。你的代码结构看起来应该与下面所示的伪代码类似:

  1. if (当前这个任务工作量足够小)
  2. 直接完成这个任务
  3. else
  4. 将这个任务或这部分工作分解成两个部分
  5. 分别触发(invoke)这两个子任务的执行,并等待结

你需要将这段代码包裹在一个ForkJoinTask的子类中。不过,通常情况下会使用一种更为具体的的类型,或者是RecursiveTask(会返回一个结果),或者是RecursiveAction。
当你的ForkJoinTask子类准备好了,创建一个代表所有需要完成工作的对象,然后将其作为参数传递给一个ForkJoinPool实例的invoke()方法即可。

详细例子

Fork/Join框架(二)创建一个Fork/Join池 http://ifeve.com/fork-join-2/
Fork/Join框架(四)异步运行任务 http://ifeve.com/fork-join-4/

异步运行任务

当你在ForkJoinPool中执行ForkJoinTask时,你可以使用同步或异步方式来实现。
当你使用同步方法,调用这些方法(比如:invokeAll()方法)的任务将被阻塞,直到提交给池的任务完成它的执行。这允许ForkJoinPool类使用work-stealing算法,分配一个新的任务给正在执行睡眠任务的工作线程。反之,当你使用异步方法(比如:fork()方法),这个任务将继续它的执行,所以ForkJoinPool类不能使用work-stealing算法来提高应用程序的性能。在这种情况下,只有当你调用join()或get()方法来等待任务的完成时,ForkJoinPool才能使用work-stealing算法。

jdk源码解析

http://www.molotang.com/articles/696.html
http://ifeve.com/talk-concurrency-forkjoin/

Akka ActorSystem与ForkJoinPool

ActorSystem初始化时

  1. //读取配置文件创建dispatchers(默认的配置文件只有一个dispatcher)
  2. val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
  3. threadFactory, eventStream, scheduler, dynamicAccess, settings, mailboxes, defaultExecutionContext))
  4. //返回默认的dispatcher (可在配置里更改)
  5. val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher

new Dispatchers(...)中

  1. //读取配置文件
  2. val defaultDispatcherConfig: Config =
  3. idConfig(DefaultDispatcherId).withFallback(settings.config.getConfig(DefaultDispatcherId))

配置文件为:

  1. default-dispatcher {
  2. executor=thread-pool-executor.
  3. type = "Dispatcher"
  4. executor = "default-executor"
  5. default-executor {
  6. #默认为forkjoinpool jdk7才支持
  7. fallback = "fork-join-executor"
  8. }
  9. # forkjoinpool默认线程数 max(min(cpu线程数 * 3.0, 64), 8)
  10. fork-join-executor {
  11. parallelism-min = 8
  12. parallelism-factor = 3.0
  13. parallelism-max = 64
  14. }
  15. ......
  16. }

ActorSystem中
val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher
中的defaultGlobalDispatcher方法

  1. def defaultGlobalDispatcher: MessageDispatcher = lookup(DefaultDispatcherId)

lookup调用了lookupConfigurator(id).dispatcher()

  1. private def lookupConfigurator(id: String): MessageDispatcherConfigurator = {
  2. dispatcherConfigurators.get(id) match {
  3. case null
  4. // It doesn't matter if we create a dispatcher configurator that isn't used due to concurrent lookup.
  5. // That shouldn't happen often and in case it does the actual ExecutorService isn't
  6. // created until used, i.e. cheap.
  7. val newConfigurator =
  8. if (cachingConfig.hasPath(id)) configuratorFrom(config(id))
  9. else throw new ConfigurationException(s"Dispatcher [$id] not configured")
  10. dispatcherConfigurators.putIfAbsent(id, newConfigurator) match {
  11. case null newConfigurator
  12. case existing existing
  13. }
  14. case existing existing
  15. }
  16. }

主要是configuratorFrom方法

  1. private def configuratorFrom(cfg: Config): MessageDispatcherConfigurator = {
  2. if (!cfg.hasPath("id")) throw new ConfigurationException("Missing dispatcher 'id' property in config: " + cfg.root.render)
  3. cfg.getString("type") match {
  4. case "Dispatcher" new DispatcherConfigurator(cfg, prerequisites)
  5. case "BalancingDispatcher"
  6. // FIXME remove this case in 2.4
  7. throw new IllegalArgumentException("BalancingDispatcher is deprecated, use a BalancingPool instead. " +
  8. "During a migration period you can still use BalancingDispatcher by specifying the full class name: " +
  9. classOf[BalancingDispatcherConfigurator].getName)
  10. case "PinnedDispatcher" new PinnedDispatcherConfigurator(cfg, prerequisites)
  11. case fqn
  12. val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
  13. prerequisites.dynamicAccess.createInstanceFor[MessageDispatcherConfigurator](fqn, args).recover({
  14. case exception
  15. throw new ConfigurationException(
  16. ("Cannot instantiate MessageDispatcherConfigurator type [%s], defined in [%s], " +
  17. "make sure it has constructor with [com.typesafe.config.Config] and " +
  18. "[akka.dispatch.DispatcherPrerequisites] parameters")
  19. .format(fqn, cfg.getString("id")), exception)
  20. }).get
  21. }
  22. }

根据配置"type"为"Dispatcher" 则为new DispatcherConfigurator
DispatcherConfigurator类创建了

  1. private val instance = new Dispatcher(
  2. this,
  3. config.getString("id"),
  4. config.getInt("throughput"),
  5. config.getNanosDuration("throughput-deadline-time"),
  6. configureExecutor(),
  7. config.getMillisDuration("shutdown-timeout"))
  8. /**
  9. * Returns the same dispatcher instance for each invocation
  10. */
  11. override def dispatcher(): MessageDispatcher = instance

来看Dispatcher类 "throughput"和"throughput-deadline-time"干嘛用后面会说
这类定义了dispatch方法用来执行mailbox[后面会说到]还有createMailbox创建mailbox
其父类MessageDispatcher定义了将actor加入dispatcher的attach方法
new Dispatcher 类时的configureExecutor()方法创建了forkjoinpool

  1. def configureExecutor(): ExecutorServiceConfigurator = {
  2. def configurator(executor: String): ExecutorServiceConfigurator = executor match {
  3. case null | "" | "fork-join-executor" new ForkJoinExecutorConfigurator(config.getConfig("fork-join-executor"), prerequisites)
  4. case "thread-pool-executor" new ThreadPoolExecutorConfigurator(config.getConfig("thread-pool-executor"), prerequisites)
  5. case fqcn
  6. val args = List(
  7. classOf[Config] -> config,
  8. classOf[DispatcherPrerequisites] -> prerequisites)
  9. prerequisites.dynamicAccess.createInstanceFor[ExecutorServiceConfigurator](fqcn, args).recover({
  10. case exception throw new IllegalArgumentException(
  11. ("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
  12. make sure it has an accessible constructor with a [%s,%s] signature""")
  13. .format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]), exception)
  14. }).get
  15. }
  16. config.getString("executor") match {
  17. case "default-executor" new DefaultExecutorServiceConfigurator(config.getConfig("default-executor"), prerequisites, configurator(config.getString("default-executor.fallback")))
  18. case other configurator(other)
  19. }
  20. }

根据配置直接走new ForkJoinExecutorConfigurator 看看这个类

  1. class ForkJoinExecutorServiceFactory(val threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
  2. val parallelism: Int) extends ExecutorServiceFactory {
  3. def createExecutorService: ExecutorService = new AkkaForkJoinPool(parallelism, threadFactory, MonitorableThreadFactory.doNothing)
  4. }
  5. final def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = {
  6. val tf = threadFactory match {
  7. case m: MonitorableThreadFactory
  8. // add the dispatcher id to the thread names
  9. m.withName(m.name + "-" + id)
  10. case other other
  11. }
  12. new ForkJoinExecutorServiceFactory(
  13. validate(tf),
  14. ThreadPoolConfig.scaledPoolSize(
  15. config.getInt("parallelism-min"),
  16. config.getDouble("parallelism-factor"),
  17. config.getInt("parallelism-max")))
  18. }

new ForkJoinExecutorServiceFactory 提供了线程池的factory
读取了几个计算初始线程池大小的参数 会创建 AkkaForkJoinPool
继续回到configureExecutor()方法的
case "default-executor" ⇒ new DefaultExecutorServiceConfigurator
来看看DefaultExecutorServiceConfigurator

  1. val provider: ExecutorServiceFactoryProvider =
  2. prerequisites.defaultExecutionContext match {
  3. case Some(ec)
  4. prerequisites.eventStream.publish(....)
  5. new AbstractExecutorService with ExecutorServiceFactory with ExecutorServiceFactoryProvider {
  6. def createExecutorServiceFactory(id: String, threadFactory: ThreadFactory): ExecutorServiceFactory = this
  7. def createExecutorService: ExecutorService = this
  8. def shutdown(): Unit = ()
  9. def isTerminated: Boolean = false
  10. def awaitTermination(timeout: Long, unit: TimeUnit): Boolean = false
  11. def shutdownNow(): ju.List[Runnable] = ju.Collections.emptyList()
  12. def execute(command: Runnable): Unit = ec.execute(command)
  13. def isShutdown: Boolean = false
  14. }
  15. case None fallback
  16. }

provider被初始化为AbstractExecutorService的匿名实例 根据配置为ForkJoinPool

下面看看actorOf 创建Actor时的逻辑

  1. try {
  2. val dispatcher = system.dispatchers.lookup(props2.dispatcher)
  3. val mailboxType = system.mailboxes.getMailboxType(props2, dispatcher.configurator.config)
  4. if (async) new RepointableActorRef(system, props2, dispatcher, mailboxType, supervisor, path).initialize(async)
  5. else new LocalActorRef(system, props2, dispatcher, mailboxType, supervisor, path)
  6. } catch {
  7. case NonFatal(e) throw new ConfigurationException(
  8. s"configuration problem while creating [$path] with dispatcher [${props2.dispatcher}] and mailbox [${props2.mailbox}]", e)
  9. }

跟进任何一种ref 来看看RepointableActorRef

  1. def newCell(old: UnstartedCell): Cell =
  2. new ActorCell(system, this, props, dispatcher, supervisor).init(sendSupervise = false, mailboxType)

ActorCell的init

  1. final def init(sendSupervise: Boolean, mailboxType: MailboxType): this.type = {
  2. /*
  3. * Create the mailbox and enqueue the Create() message to ensure that
  4. * this is processed before anything else.
  5. */
  6. val mbox = dispatcher.createMailbox(this, mailboxType)
  7. /*
  8. * The mailboxType was calculated taking into account what the MailboxType
  9. * has promised to produce. If that was more than the default, then we need
  10. * to reverify here because the dispatcher may well have screwed it up.
  11. */
  12. // we need to delay the failure to the point of actor creation so we can handle
  13. // it properly in the normal way
  14. val actorClass = props.actorClass
  15. val createMessage = mailboxType match {
  16. case _: ProducesMessageQueue[_] if system.mailboxes.hasRequiredType(actorClass)
  17. val req = system.mailboxes.getRequiredType(actorClass)
  18. if (req isInstance mbox.messageQueue) Create(None)
  19. else {
  20. val gotType = if (mbox.messageQueue == null) "null" else mbox.messageQueue.getClass.getName
  21. Create(Some(ActorInitializationException(self,
  22. s"Actor [$self] requires mailbox type [$req] got [$gotType]")))
  23. }
  24. case _ Create(None)
  25. }
  26. swapMailbox(mbox)
  27. mailbox.setActor(this)
  28. // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
  29. mailbox.systemEnqueue(self, createMessage)
  30. if (sendSupervise) {
  31. // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
  32. parent.sendSystemMessage(akka.dispatch.sysmsg.Supervise(self, async = false))
  33. }
  34. this
  35. }

dispatcher.createMailbox(this, mailboxType) 创建mailbox
mailbox.setActor(this) mailbox注册到当前的actorCell中
当Cell start时

  1. def start(): this.type = {
  2. // This call is expected to start off the actor by scheduling its mailbox.
  3. dispatcher.attach(this)
  4. this
  5. }

dispatcher.attach 这个actor

再来看看Mailbox的run方法

  1. override final def run(): Unit = {
  2. try {
  3. if (!isClosed) { //Volatile read, needed here
  4. processAllSystemMessages() //First, deal with any system messages
  5. processMailbox() //Then deal with messages
  6. }
  7. } finally {
  8. setAsIdle() //Volatile write, needed here
  9. dispatcher.registerForExecution(this, false, false)
  10. }
  11. }

只看 processMailbox()

  1. @tailrec private final def processMailbox(
  2. left: Int = java.lang.Math.max(dispatcher.throughput, 1),
  3. deadlineNs: Long = if (dispatcher.isThroughputDeadlineTimeDefined == true) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0L): Unit =
  4. if (shouldProcessMessage) {
  5. val next = dequeue()
  6. if (next ne null) {
  7. if (Mailbox.debug) println(actor.self + " processing message " + next)
  8. actor invoke next
  9. if (Thread.interrupted())
  10. throw new InterruptedException("Interrupted while processing actor messages")
  11. processAllSystemMessages()
  12. if ((left > 1) && ((dispatcher.isThroughputDeadlineTimeDefined == false) || (System.nanoTime - deadlineNs) < 0))
  13. processMailbox(left - 1, deadlineNs)
  14. }
  15. }

throughput和throughputDeadlineTime在这里用到
ActorCell的父类中Dispatch 的sendMessage 方法定义了mailbox的执行

  1. def sendMessage(msg: Envelope): Unit =
  2. try {
  3. if (system.settings.SerializeAllMessages) {
  4. val unwrapped = (msg.message match {
  5. case DeadLetter(wrapped, _, _) wrapped
  6. case other other
  7. }).asInstanceOf[AnyRef]
  8. if (!unwrapped.isInstanceOf[NoSerializationVerificationNeeded]) {
  9. val s = SerializationExtension(system)
  10. s.deserialize(s.serialize(unwrapped).get, unwrapped.getClass).get
  11. }
  12. }
  13. dispatcher.dispatch(this, msg)
  14. } catch handleException

回到了dispatcher.dispatch(this, msg)

  1. protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope): Unit = {
  2. val mbox = receiver.mailbox
  3. mbox.enqueue(receiver.self, invocation)
  4. registerForExecution(mbox, true, false)
  5. }

看一下registerForExecution

  1. protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
  2. if (mbox.canBeScheduledForExecution(hasMessageHint, hasSystemMessageHint)) { //This needs to be here to ensure thread safety and no races
  3. if (mbox.setAsScheduled()) {
  4. try {
  5. executorService execute mbox
  6. true
  7. } catch {
  8. ......

看到 executorService execute mbox
而sendMessage就是!
在actor内部的receive的方法中给一个actor发消息
那么此时相当于ForkJoinWorkerThread在执行executorService execute mbox
此时看到代码

  1. final class AkkaForkJoinPool(parallelism: Int,
  2. threadFactory: ForkJoinPool.ForkJoinWorkerThreadFactory,
  3. unhandledExceptionHandler: Thread.UncaughtExceptionHandler)
  4. extends ForkJoinPool(parallelism, threadFactory, unhandledExceptionHandler, true) with LoadMetrics {
  5. override def execute(r: Runnable): Unit = {
  6. if (r eq null) throw new NullPointerException("The Runnable must not be null")
  7. val task =
  8. if (r.isInstanceOf[ForkJoinTask[_]]) r.asInstanceOf[ForkJoinTask[Any]]
  9. else new AkkaForkJoinTask(r)
  10. Thread.currentThread match {
  11. case worker: ForkJoinWorkerThread if worker.getPool eq this task.fork()
  12. case _ super.execute(task)
  13. }
  14. }
  15. def atFullThrottle(): Boolean = this.getActiveThreadCount() >= this.getParallelism()
  16. }

mailbox会被包成AkkaForkJoinTask
task.fork() 以为这加入当前工作线程的工作队列尾部

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注