[关闭]
@MiloXia 2015-09-09T02:12:10.000000Z 字数 16022 阅读 13205

Akka 学习笔记

akka


容错

容错:不是抓住系统所有的错误并恢复,而是将错误(崩溃)孤立出来,不会导致整个系统崩溃(隔离故障组件),备份组件可以替换崩溃组件(冗余)(可恢复性)
容错方式:Restart, Resume, Stop, Escalate
let it crash原则
崩溃原因:网络,第三方服务,硬件故障
Akka容错:分离业务逻辑(receive)和容错逻辑(supervisorStrategy)
父actor自动成为子actor的supervisor
supervisor不fix子actor,而是简单的呈现如何恢复的一个判断==>
List(Restart, //重启并替换原actor,mailbox消息可继续发送,
//但是接收会暂停至替换完成,重启默认重启所有子actor
Resume, //同一个actor不重启,忽略崩溃,继续处理下一个消息
Stop, //terminated 不再处理任何消息,剩余消息会进入死信信箱
Escalate//交给上层处理
)
Akka提供两种恢复策略:
OneForOneStrategy: 只针对单独actor

  1. override def supervisorStrategy = OneForOneStrategy() {
  2. case _:XXXException => Restart
  3. }

AllForOneStrategy: 对所有节点,一个节点重启,其它节点全部重启
不处理TheadDeath OOM等JVM Error,一直往上抛直到actorSystem顶层actor user guardian 会shutdown ,可通过配置使其优雅shutdown


扩展

scaling up 垂直扩展:某节点可以在单机(单JVM)上运行更多的数量
scaling out 水平扩展:某节点可以在多台机器上运行
Akka 按地址(local, remote)投递message, 可以方便实现scaling out
actor不知道是否是和remote actor通信,local, remote区别在于配置,所以Akka可以透明的从scaling up转向scaling out(仅通过修改几行代码) 所以可以像积木一样随意组装
/--
在单JVM中上锁访问的可变状态(hhtp-session) 在scaling out时,直接写入一个数据库是最简单的方式(同库集成)
但是这意味着修改大量代码;
Akka用不可变消息解决了以上问题
Akka actor 在dispatcher(底层调度执行pool)之上运行,dispatcher和配置直接映射(不同dispatcher类型可被选择),所以在scaling up时,只修改配置,不需要动代码,其次actor和线程不是一一映射导致其非常轻量(比线程占用空间更少)
/---
RPC 缺点 点对点通信不适合大规模集群(网络拓扑结构会很复杂,并伴随负载)
面向消息的中间件可以解决这个问题,but代价是应用层混入消息系统(消息中间件必须跟着应用演变)
Akka 使用分布式编程模型(Actor(抽象了本地和分布式环境))在scaling out时,顶层看起来是一样的(透明)
分布式术语:
节点:通过网络通信的应用
节点角色:不同的节点执行不同的任务
通信协议:消息被编码和解码为特定通信协议的报文用于节点的通信
序列化和反序列化:消息的编码和解码
membership:同一个分布式系统的节点(可动态可静态)
dynamic membership:节点数动态变化
Q:分布式环境难在哪里?
A:时延,内存访问,局部失败和并发性

  1. //Akka remote example
  2. object TestRemote extends App {
  3. //////////////backend
  4. val confBackend =
  5. """
  6. akka {
  7. actor {
  8. provider = "akka.remote.RemoteActorRefProvider"
  9. }
  10. remote {
  11. enabled-transports = ["akka.remote.netty.tcp"]
  12. netty.tcp {
  13. hostname = "127.0.0.1"
  14. port = 2551
  15. }
  16. }
  17. }
  18. """
  19. val configBackend = ConfigFactory parseString confBackend
  20. val backend = ActorSystem("backend", configBackend)//backend listens on address akka.tcp://backend@127.0.0.1:2551
  21. backend.actorOf(Props[ConsoleActor], "console")
  22. //////////////frontend
  23. val confFrontend =
  24. """
  25. akka {
  26. actor {
  27. provider = "akka.remote.RemoteActorRefProvider"
  28. }
  29. remote {
  30. enabled-transports = ["akka.remote.netty.tcp"]
  31. netty.tcp {
  32. hostname = "127.0.0.1"
  33. port = 2552
  34. }
  35. }
  36. }
  37. """
  38. val configFrontend = ConfigFactory parseString confFrontend
  39. val frontend = ActorSystem("frontend", configFrontend)//frontend listens on address akka.tcp://backend@127.0.0.1:2552
  40. val path = "akka.tcp://backend@127.0.0.1:2551/user/console"
  41. val console = frontend.actorSelection(path)
  42. console ! "Hello World"
  43. }
  44. //
  45. class ConsoleActor extends Actor {
  46. def receive = {
  47. case m => println(s"received $m")
  48. }
  49. }

actorSelection
actorSelection方法会在本地创建一个远程actor的代理,代理来处理和监听远端actor的重启消息接收等,可以有容错作用,通过actorSelection直接获取actorRef 也可直接通信,但是远端actor崩溃重启,actorRef不会自动定位(容错不好,性能不错)
Akka 还可以远程部署actor
可通过配置和代码两种方式,远端actor崩溃重启,actorRef不会自动定位
远程部署actor容错:需要自己watch actor, 并且在远端actor重启时,重新deploy and watch
采用非远程部署通过actorSelection获取远程actorRef无法实现watch
通过actorSelection获得远程actorRef方法:

val console = context.system.actorSelection(path)
val actorRef = Await.result(console.resolveOne(), 5.seconds)


Futures

函数式并发
Actor基于消息,他们是长久存在的对象,当某事(消息)发生时作出行为
Future用函数替代对象,是某函数异步执行的结果的占位符

  1. import scala.concurrent._
  2. import ExecutionContext.Implicits.global //提供执行线程池 隐转
  3. val res = future {
  4. 1
  5. } map { r =>
  6. "res=" + r
  7. } foreach { r =>
  8. println(r)
  9. }
  10. //Future[Int] ==> Future[String]
  11. //Future[T]和Option[T]类似

Future 错误处理
future {...}代码块内报异常 后续操作将直接不进行
要想获取错误值用onComplete方法

  1. val fres = future {
  2. throw new Exception("error")
  3. 1
  4. } map { r =>
  5. "res=" + r
  6. } onComplete {
  7. case Success(r) => println(r)
  8. case Failure(NonFatal(e)) => println(e)
  9. }//和ajax类似
  10. while(true){} //需要堵塞

Future 恢复(容错)
通过recover方法 可以定义在发生特定错误时的处理逻辑

  1. val f = future {
  2. throw new IllegalArgumentException("error")
  3. 1
  4. } map { r =>
  5. "res=" + r
  6. } recover {
  7. case e:IllegalArgumentException => 2 //返回某默认值
  8. } onComplete {
  9. case Success(r) => println(r)
  10. case Failure(NonFatal(e)) => e.printStackTrace()
  11. }
  12. while(true){}
  13. ////////
  14. val f1 = doAsyc1()
  15. val f2 = f1.flatMap { r =>
  16. doAsyc2(r).recover {
  17. case e:XXXException => r
  18. }
  19. }
  20. //onSuccess & onFailure
  21. val f: Future[Int] = future {
  22. val source = scala.io.Source.fromFile("myText.txt")
  23. source.toSeq.indexOfSlice("myKeyword")
  24. }
  25. f onSuccess {
  26. case idx => println("The keyword first appears at position: " + idx)
  27. }
  28. f onFailure {
  29. case t => println("Could not process file: " + t.getMessage)
  30. }

Future 组合

  1. //1. firstCompletedOf
  2. val f1 = future{1}
  3. val f2 = future{"res="}
  4. val f3 = Seq(f1, f2)
  5. val f4 = Future.firstCompletedOf(f3) //取先执行完的结果
  6. f4.onComplete {
  7. case Success(r) => println(r)
  8. case Failure(NonFatal(e)) => e.printStackTrace()
  9. }
  10. //2. zip
  11. //Future[Int] zip Future[String] ==> Future[(Int, String)]
  12. val f1 = future{1}
  13. val f2 = future{"res="}
  14. f1 zip f2 foreach(println) //等待一起执行完 并组合结果
  15. ===> (1,res=)
  16. val f3 = f1 zip f2 map { case(i,s) =>
  17. s+i
  18. }
  19. //3. sequence traverse
  20. //Seq[Future[Int]] ==> Future[Seq[Int]] 多值映射为单值
  21. val f = Seq(1,2,3) map { i =>
  22. future{i}
  23. }
  24. Future.sequence(f) foreach println
  25. /////////
  26. val f = Future.traverse(Seq(1,2,3)) { i =>
  27. future{i}
  28. }
  29. f foreach println
  30. //4. fold
  31. val f = Seq(f1, f2)
  32. Future.fold(f)(0) { (sum:Int, i:Int) =>
  33. ...
  34. }
  35. //5.andThen
  36. val allposts = mutable.Set[String]()
  37. future {
  38. session.getRecentPosts
  39. } andThen {
  40. posts => allposts ++= posts
  41. } andThen {
  42. posts =>
  43. clearAll()
  44. for (post <- allposts) render(post)
  45. }
  46. /**
  47. * 组合器andThen的用法是出于纯粹的side-effecting目的。
  48. * 经andThen返回的新Future无论原Future成功或失败都会返回与原Future一模一样
  49. * 的结果。一旦原Future完成并返回结果,andThen后跟的代码块就会被调用,
  50. * 且新Future将返回与原Future一样的结果,这确保了多个andThen调用的顺序执行
  51. */
  52. //6. for
  53. val f1 = future { connection.getCurrentValue(USD) }
  54. val f2 = future { connection.getCurrentValue(CHF) }
  55. val f3 = for {
  56. usd <- f1
  57. chf <- f2
  58. if isProfitable(usd, chf)
  59. } yield connection.buy(amount, chf)
  60. f3 onSuccess {
  61. case _ => println("Purchased " + amount + " CHF")
  62. }
  63. //f3只有当f1和f2都完成计算以后才能完成--
  64. //它以其他两个Future的计算值为前提所以它自己的计算不能更早的开始。

与Akka的结合 主要通过 akka.pattern包来实现
1.ask
actor.ask / ?

  1. import akka.pattern.ask
  2. implicit val timeout = Timeout(5 seconds)
  3. val f = (actor ? Message).mapTo[Int]
  4. f foreach println

2.pipe
用于actor 处理Future的结果

  1. val f = (actor1 ? Message).pipeTo(actor2)
  2. //把上一个结果传递给下一个actor; actor2 接收的消息是值 不是Future引用

Await, Promise with Future

  1. val f = future {
  2. 1
  3. } map { r =>
  4. "res=" + r
  5. } recover {
  6. case e:Exception => 2
  7. }
  8. val i = Await.result(f, 5.seconds) //同步等待 直到futrue执行结束 返回值
  9. //promise是一个可写的,可以实现一个future的单一赋值容器
  10. val p = promise[T]
  11. val f = p.future
  12. val prod = future { //生产者
  13. val r = calcResult()
  14. p success r //此时f被写入值
  15. //do other things 同时 下面第19行代码的回调可以执行了
  16. }
  17. val cons = future {
  18. //do something...
  19. f onSuccess { //获取写入值
  20. case r => doSomethingWithRes(r)
  21. }
  22. }

future help doc url: http://docs.scala-lang.org/overviews/core/futures.html
or zh: https://code.csdn.net/DOC_Scala/chinese_scala_offical_document/file/Futures-and-Promises-cn.md


Akka配置

--加载
----默认加载applaction.conf, 代码不写则加载akka-actor包下的reference.conf
----加载配置:val sys = ActorSystem("sys",ConfigFactory.load("sys"))
----获取配置:sys.setting.config.getString("myApp.name")

  1. """sys.conf"""
  2. myApp {
  3. name = "mySystem"
  4. }

--为不同子系统定义配置
----1.提取共享配置:baseConfig.conf
----2.子系统include "baseConfig" 并定义自己的部分 或覆盖公共部分

--akka应用日志
----1.在Actor中用 val log = Logging(context.system, this)
----或者:with ActorLogging
----2.配置

  1. akka {
  2. event-handlers = ["akka.event.Logging$DefaultLogger"] #仅仅输出到STDOUT
  3. loglevel = "DEBUG"
  4. }

----自定义日志handler

  1. class MyEventListener extends Actor {
  2. def receive = {
  3. case InistializeLogger(_) => //系统消息
  4. sender ! LoggerInitialized
  5. case Error(cause, logSource, logClass, message) =>
  6. println("ERROR " + message)
  7. case Warning(logSource, logClass, message) =>
  8. println("WARN " + message)
  9. case Info(logSource, logClass, message) =>
  10. println("INFO " + message)
  11. case Debug(logSource, logClass, message) =>
  12. println("DEBUG " + message)
  13. }
  14. }

使用slf4j的eventHandler

  1. akka {
  2. event-handlers = ["akka.event.slf4j.Slf4jEventHandler"]
  3. loglevel = "DEBUG"
  4. }

--配置akka日志

  1. akka {
  2. loglevel = DEBUG #*必须设置成DEBUG 下面的debug才可以用
  3. log-config-on-start = on #启动时显示用了哪个配置文件
  4. debug {
  5. receive = on #记录actor 接收的消息(user-level级)由akka.event.LoggingReceive处理
  6. autoreceive = on #记录所有自动接收的消息(Kill, PoisonPill
  7. lifecycle = on #记录actor lifecycle changes
  8. fsm = on #状态机相关
  9. event-stream = on #记录eventSteam (subscribe/unsubscribe)
  10. }
  11. remote {
  12. log-sent-messages = on #记录出站的消息
  13. log-received-messages = on #记录进站的消息
  14. }
  15. }
  1. class MyActor extends Actor with ActorLogging {
  2. def receive = LoggingReceive { //记录接收消息
  3. cae ... => ...
  4. }
  5. }

系统架构 与 Akka

1.管道和过滤器
--管道Pips:一个进程或线程将处理结果传给下一个进程做额外处理的能力
(Unix |符)
--Pipeline:多个管道组合,大都为串行,akka提供并行
--过滤器:传递给下一个proccess之前的验证逻辑
--Pip&Filter模式:input -pip-> check -pip-> check -> output
每个Filter包含3部分:进站pipe,处理器,出站pipe
进站消息和出站消息必须是一样的(交给下一个处理),所以过滤器的顺序是可以对调的,
--Akka的实现

  1. class Filter1(pipe: ActorRef) extend Actor {
  2. def receive = {
  3. case msg:Message =>
  4. if...cond1
  5. pipe ! msg
  6. }
  7. }
  8. class Filter2(pipe: ActorRef) extend Actor {
  9. def receive = {
  10. case msg:Message =>
  11. if...cond2
  12. pipe ! msg
  13. }
  14. }

2.发散聚合模式
并行的处理问题分 分发器和聚合器两部分 map reduce
Akka 实现 Scatter & Gather 模式

  1. class Filter1(pipe: ActorRef) extend Actor {
  2. def receive = {
  3. case msg:Message =>
  4. val res1 = proccess...
  5. pipe ! res1
  6. }
  7. }
  8. class Filter2(pipe: ActorRef) extend Actor {
  9. def receive = {
  10. case msg:Message =>
  11. val res2 = proccess...
  12. pipe ! res2
  13. }
  14. }
  15. //Scatter
  16. class RecipientList(recipientList: Seq[ActorRef]) extend Actor {
  17. def receive = { //分发给不同的收件人(pipe)
  18. case msg:AnyRef => recipientList.foreach(_ ! msg)
  19. }
  20. }
  21. //Gather 会缓存消息,当消息都接收完了之后再处理,并发给下一个process
  22. class Aggregator(timeout:Duration, pipe:ActorRef) extends Actor {
  23. val msgs = new ListBuffer[Mssage] //缓存消息
  24. def receive = {
  25. case msg:Message => {
  26. msgs.find(_.id == msg.id) match { //当有两条一样时发送
  27. case Some(existMsg) => {
  28. pipe ! existMsg
  29. msgs -= existMsg
  30. }
  31. case None => msgs += msg
  32. }
  33. }
  34. }
  35. }

Message Channels

1.点对点 Actor默认投递方式
单链式:sender -3-2-1-> p2pchannel -3-2-1-> receiver保证消息顺序不会乱
多接收者:sender -3-2-1-> p2pchannel 1->receiver1 | 2->receiver2 | 3->receiver3 每个接收者只处理一种消息
2.发布订阅
概述:发给多个接收者,并且不知道接收者是谁,channel负责跟踪接收者,多个接收者处理同个消息,接收者的数量可动态变动(unSubscribe/Subscribe) 消息有序
场景:发货和库存更新 需要同一份订单消息,并且可并行执行
EventStream: 支持发布订阅channel, 支持接收者动态变动, 可看成多个Publish-Subscribe channels的管理者
Actor接收任何来自EventStream的消息,不需要额外编码,只用receive方法就行
注册和注销自己:

  1. system.eventStream.subscribe(slef, classOf[Message])
  2. system.eventStream.unsubscribe(slef, classOf[Message])

发布消息

  1. system.eventStream.publish(msg)

可订阅多种消息,注销时需要一一注销
2.5. EventBus 一个概念而已
3.自定义EventBus
需要实现三种实体:
Event 消息
Subscriber (提供被注册的能力,EventStream里面就是ActorRef)
Classifier(选择订阅者分发事件,EventStream里面Classifier 就是消息的类型)
Akka有三种可组合的traits 可用来追踪订阅者
LookupClassification:基本分类器,维护了订阅者集合,实现classify方法来从事件中提取classifier
SubchannelClassification: 它希望监听者不只是叶子节点,实现有层级的结构
ScanningClassification:最复杂的一个
他们实现了unSubscribe/Subscribe方法,但是有别的方法需要实现
classify(event:Event):Classifier //从事件中提取分类器
compareSubscribers(a:Subscriber, b:Subscriber):Int //订阅者的比较器
publish(event:Event, subscriber:Subscriber) //
mapSize:Int //返回不同classifier的数目

  1. class MessageBus extends EventBus with LookupClassification {
  2. type Event = Message
  3. type Classifier = Boolean //自定义 这里以消息的number字段是不是>1
  4. def mapSize = 2 //true,false 两种
  5. //按类型来,就不需要自定义了
  6. protected def classify(event:MessageBus#Event) = {
  7. event.number > 1
  8. }
  9. }
  10. //test
  11. val bus = new MessageBus
  12. bus.subscribe(actorRef1, false) //监听 < =1
  13. bus.subscribe(actorRef2, true) //监听 > 1
  14. bus.publish(Message(1))
  15. bus.publish(Message(2))
  1. DeadLetter(死信) channel
    概念:只有失败的消息才会进入,监听这里可以帮助发现系统的问题,不可以通过它发送消息
    死信:所有不能被处理或者投递(达到)的消息
    Akka用EventStream去实现死信队列
  1. system.eventStream.subscribe(monitorRef, classOf[DeadKetter])
  2. actor1 ! PoisonPill //自杀
  3. actor1 ! Message //接收不到
  4. val dead = monitorRef.expectMsgType[DeadLetter]
  5. dead.message
  6. dead.sender //发送人
  7. dead.recipient //收信人
  8. //当Actor不知道怎么处理消息时可以显示发送给deadLetter
  9. system.deadLetters ! msg

5.Cuaranteed deliver channel (属于点对点channel)
概念:保证所有消息发送和投递 分几个等级,但是akka不保证消息100%投递成功
构建系统时,我们需要考虑怎样的程度的担保是足够的
一般规则:消息至多投递一次
Akka保证消息投递一次或者投递失败(不是很好) 但是有两种解决方式:
a. 发送本地消息不太会失败(单JVM发送消息只是方法调用,失败就是报异常了,此时会有别的策略去处理(容错),消息确实不需要到达了),所以单JVM消息投递是可靠的
b. remote actors, 非常可能丢失消息,尤其是在不可靠网络之上,ReliableProxy被用来解决这个问题,它使得发送消息如发送local message一样可靠。
唯一的顾虑是发送和接收者所在JVM
Q: How does the ReliableProxy work?
A: 当开启ReliableProxy(Actor) 在不同的节点间创建了一个隧道
{client-node: sender -> ReliableProxy} ---> {server-node: egress -> service }
Egress是一个被ReliableProxy启动的Actor 两个Actor都实现了check resend 功能去跟踪哪个消息被远程接收者接收;
当消息投递失败时,ReliableProxy会重试,直到Egress接收到,Egress转发给真正的接收者;
当目标actor终止时,本地ReliableProxy将会终止
ReliableProxy限制:只能正对一个接收者,并且单向(若要回发,会再启两个代理)

  1. val remoteActorRef = system.actorFor(path)
  2. val proxy = system.actorOf(Props(new ReliableProxy(remoteActorRef,500.millis)), "proxy")
  3. proxy ! Message
  4. //akka 2.3.8之后不可用system.actorFor方法,只能用actorSelection方法,而Selection底层就是用的Proxy

STM (借鉴Clojure的东西)

  1. val s = Ref(Set(1,2,3))
  2. val reservedS = atomic { implicit tx = {
  3. val head = s().head
  4. s() = s().tail
  5. head
  6. }}

源码分析

actors
address --> ActorRef 替换直接引用(内存引用)
mailboxes

ActorCell.newActor 会委托props.newActor创建,并将behaviorStack(为毛是栈?为了实现become和unbecome)head 设置为instance.receive。
actorRef ! Message ; 调用ActorCell.tell --> dispatcher.dispath(分发)-->将消息放入ActorCell的队列mbox.enqueue里,并executor.execute(mbox) -->执行mbox的run方法(mailbox被分发到某线程上)-->先处理系统消息(mailBox.processAllSystemMessages),再处理用户消息(mailBox.processMailbox) --> dequeue队列获得消息--> actor invoke next --> ActorCell.recevieMessage(msg) --> Actor.aroundReceive(behaviorStack.head, msg)执行receive方法
...

集群

目标:全自动的管理actor集群, 负载均衡,故障处理
目前的features:
--membership:容错的membership
--负载均衡:根据路由算法 路由actors的message
--节点分区:节点可以有自己的角色,使路由器可以配置为只给某角色的节点发消息
--Partition points(分区点):一个ActorSystem 可以有一部分子树在别的节点上(目前只有top level actors可以这样)
不提供: 状态冗余,重分区,重负载
最适用:单一目的的数据处理应用(图像处理,实时数据分析)

种子节点(是一种节点角色):
a.最基本的用于启动集群的节点(其实是冗余的控制节点); b.没有任何的actors,是个纯的节点;
c.是其它节点的中间联系人
d.要启动akka cluster 必须配置一系列的种子节点,并且第一个种子节点有特殊的角色,First seed Node必须最先启动;
e.其它种子节点可以和第一种子节点一起启动,它们会等待第一种子节点启动好
f.其它成功join启动集群后,第一种子节点可以安全的离开集群
注:种子节点不是必须的,你可以手动启动一个节点,让它自己加入自己编程集群,这样就是手动需要了解地址等

配置:

  1. akka {
  2. loglevel = INFO
  3. stdout-loglevel = INFO
  4. event-handlers = ["akka.event.Logging$DefaultLogger"]
  5. actor { #修改provider
  6. provider = "akka.cluster.ClusterActorRefProvider"
  7. }
  8. remote {
  9. enabled-transports = ["akka.remote.netty.tcp"]
  10. log-remote-lifecycle-events = off
  11. netty.tcp {
  12. hostname = ""
  13. host = ${HOST} #每个节点的主机
  14. port = ${PORT} #每个节点的监听端口
  15. }
  16. }
  17. cluster {
  18. seed-nodes = ["akka.tcp://words@127.0.0.1:2551",#First seed Node
  19. "akka.tcp://words@127.0.0.1:2552", #actorsystem名称必须相同
  20. "akka.tcp://words@127.0.0.1:2553"] #种子节点列表
  21. roles = ["seed"]
  22. }
  23. }

此时启动三个JVM配置不同的port,就创建三个seed node了,并且自动组成cluster

  1. //first seed node
  2. val seedConfig = ConfigFactory.load("seed")
  3. val seedSystem = ActorSystem("words", seedConfig)
  4. //启动后自己join自己
  5. //状态变化:Join -> Up
  6. //同样的代码启动其它两个种子节点
  7. //first seed node 离开集群
  8. val address = Cluster(seedSystem).selfAddress
  9. Cluster(seedSystem).leave(address)
  10. //状态变为Leaving -> Exiting -> Unreachable

在first seed node Exiting状态时,seed node2自动变成leader(处理Join请求)
节点的状态(Joining,Up...)变化通知,会在所有节点间传递

Gossip(闲聊) 协议:
在节点之间传递节点状态的协议,每个节点描述自己的状态和它看到的别的节点的状态,最后节点的状态会收敛成一致状态,每次收敛之后都可以确定Leader

first seed node退出后,不能Cluster(seedSystem).join(selfAddress),而是应该重启

  1. seedSystem.shutdown
  2. val seedSystem = ActorSystem("words", seedConfig)
  3. //一个actorsystem只能加入cluster一次,但是可以以相同的配置启动一个新actorsystem

Akka Cluster会探测Unreachable的节点,并卸载(down),leader节点在Unreachable无法正常工作,可以用down方法,卸载任意一个节点

  1. val address = Address("akka.tcp", "words", "127.0.0.1", 2551)
  2. Cluster(seedSystem).down(address)

节点状态转换:
-join-> [Joining] -leader action-> [Up] -leave-> [Leaving] -leader action->
[Exiting] -leader action-> [Unreachable] -down-> [Down] --> Removed

订阅Cluster Domain Events

  1. class ClusterDomainEventListener extends Actor with ActorLogging {
  2. Cluster(context.system).subscribe(self, classOf[ClusterDomainEvent])
  3. def receive ={
  4. case MemberUp(member) => log.info(s"$member UP.")
  5. case MemberExited(member) => log.info(s"$member EXITED.")
  6. case MemberRemoved(m, previousState) =>
  7. if(previousState == MemberStatus.Exiting) {
  8. log.info(s"Member $m gracefully exited, REMOVED.")
  9. } else {
  10. log.info(s"$m downed after unreachable, REMOVED.")
  11. }
  12. case UnreachableMember(m) => log.info(s"$m UNREACHABLE")
  13. case ReachableMember(m) => log.info(s"$m REACHABLE")
  14. case s: CurrentClusterState => log.info(s"cluster state: $s")
  15. }
  16. override def postStop(): Unit = {
  17. Cluster(context.system).unsubscribe(self)
  18. super.postStop()
  19. }
  20. }

为MemberUp事件配置最小数量节点

  1. #在master上配置
  2. role {
  3. worker.min-nr-of-members = 2 #当worker节点数超过两个时 处罚 memberup事件
  4. }
  1. object Main extends App {
  2. val config = ConfigFactory.load()
  3. val system = ActorSystem("words", config)
  4. println(s"Starting node with roles: ${Cluster(system).selfRoles}")
  5. val roles = system.settings.config
  6. .getStringList("akka.cluster.roles")
  7. if(roles.contains("master")) { //当是master节点时 注册事件
  8. Cluster(system).registerOnMemberUp { //worker节点2个时触发
  9. val receptionist = system.actorOf(Props[JobReceptionist],
  10. "receptionist")
  11. println("Master node is ready.")
  12. }
  13. }
  14. }

Routers 集群的路由
和用本地的Router是一样的,用路由去worker node上创建worker actor

  1. trait CreateWorkerRouter { this: Actor =>
  2. def createWorkerRouter: ActorRef = {
  3. context.actorOf(
  4. ClusterRouterPool(BroadcastPool(10), //用Pool来创建 广播路由
  5. ClusterRouterPoolSettings(
  6. totalInstances = 1000, //集群中最多多少个
  7. maxInstancesPerNode = 20, //每个节点最多多少个
  8. allowLocalRoutees = false, //不在本地节点创建, 只在worker节点上创建
  9. useRole = None
  10. )
  11. ).props(Props[JobWorker]),
  12. name = "worker-router")
  13. }
  14. }

编写master

  1. class JobMaster extends Actor
  2. with ActorLogging
  3. with CreateWorkerRouter {
  4. // inside the body of the JobMaster actor..
  5. val router = createWorkerRouter
  6. def receive = idle
  7. def idle: Receive = {
  8. case StartJob(jobName, text) =>
  9. textParts = text.grouped(10).toVector //分割文本
  10. val cancel = system.scheduler.schedule( 0 millis, #定期给路由发
  11. 1000 millis,
  12. router,
  13. Work(jobName, self))
  14. //发Work消息创建worker
  15. become(working(jobName, sender, cancel))
  16. }
  17. // more code

//TODO

Persistence

当有状态的actors崩溃时,做数据恢复
点对点消息保证一次投递

Scheduler

JDK ScheduledThreadPoolExecutor 通过DelayedWorkQueue 是一个PriorityQueue
(take 是会堵塞的)优先级为Delay时间, 每次添加task都会重排序,时间短排前面,调度时根据Delay时间判断是否执行
Akka Scheduler 则通过Netty HashedWheelTimer ,通过设置Tick Duration,扫一次Task队列,(会Thread.sleep(sleepMs))
哪个性能更好??? 不知道

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注