akka入门-热插拔(become和unbecome)

Akka支持在运行时对角色消息循环 (例如它的的实现)进行实时替换: 在角色中调用getContext.become 方法。 热替换的代码被存在一个栈中,可以被pushed(replacing 或 adding 在顶部)和popped。

become一个特别好的例子是用它来实现一个有限状态机。

使用Become/Unbecome特性还可以很方便的实现状态转换机。

1.动态替换方法

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.Procedure;

public class UntypedActorSwapper {

  public static class Swap {
    public static Swap SWAP = new Swap();

    private Swap() {}
  }

  public static class Swapper extends UntypedActor {
    LoggingAdapter log = Logging.getLogger(getContext().system(), this );

    public void onReceive(Object message ) {
      if ( message == Swap. SWAP ) {
        log.info( "Hi" );
        getContext().become( new Procedure<Object>() {
          public void apply(Object arg0) throws Exception {
            log.info( "Ho" );
            getContext().unbecome(); // resets the latest 'become'
          }
        }, false ); // this signals stacking of the new behavior
      } else {
        unhandled( message);
      }
    }
  }

  public static void main(String... args) {
    ActorSystem system = ActorSystem. create( "MySystem");
    ActorRef swap = system .actorOf(Props.create(Swapper. class ));
    swap.tell(Swap. SWAP , ActorRef.noSender()); // logs Hi
    swap.tell(Swap. SWAP , ActorRef.noSender()); // logs Ho
    swap.tell(Swap. SWAP , ActorRef.noSender()); // logs Hi
    swap.tell(Swap. SWAP , ActorRef.noSender()); // logs Ho
    swap.tell(Swap. SWAP , ActorRef.noSender()); // logs Hi
    swap.tell(Swap. SWAP , ActorRef.noSender()); // logs Ho
  }

}

输出结果:

[INFO] [05/18/2015 22:49:10.122] [MySystem-akka.actor.default-dispatcher-2] [akka://MySystem/user/$a] Hi
[INFO] [05/18/2015 22:49:10.123] [MySystem-akka.actor.default-dispatcher-2] [akka://MySystem/user/$a] Ho
[INFO] [05/18/2015 22:49:10.123] [MySystem-akka.actor.default-dispatcher-2] [akka://MySystem/user/$a] Hi
[INFO] [05/18/2015 22:49:10.123] [MySystem-akka.actor.default-dispatcher-2] [akka://MySystem/user/$a] Ho
[INFO] [05/18/2015 22:49:10.123] [MySystem-akka.actor.default-dispatcher-2] [akka://MySystem/user/$a] Hi
[INFO] [05/18/2015 22:49:10.123] [MySystem-akka.actor.default-dispatcher-2] [akka://MySystem/user/$a] Ho

2.实现经典的哲学见就餐问题

哲学家就餐问题可以这样表述,假设有五位哲学家围坐在一张圆形餐桌旁,做以下两件事情之一:吃饭,或者思考。吃东西的时候,他们就停止思考,思考的时候也停止吃东西。餐桌中间有一大碗意大利面,每两个哲学家之间有一只餐叉。因为用一只餐叉很难吃到意大利面,所以假设哲学家必须用两只餐叉吃东西。他们只能使用自己左右手边的那两只餐叉。哲学家就餐问题有时也用米饭和筷子而不是意大利面和餐叉来描述,因为很明显,吃米饭必须用两根筷子。

以下程序中黑客饰演哲学家,筷子扮演餐叉。

import static com.center.akka.become.Messages. Eat;
import static com.center.akka.become.Messages. Think;
import static java.util.concurrent.TimeUnit. MILLISECONDS;
import static java.util.concurrent.TimeUnit. SECONDS;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import scala.PartialFunction;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.japi.pf.FI.TypedPredicate;
import akka.japi.pf.FI.UnitApply;
import akka.japi.pf.ReceiveBuilder;

import com.center.akka.become.Messages.Busy;
import com.center.akka.become.Messages.Put;
import com.center.akka.become.Messages.Take;
import com.center.akka.become.Messages.Taken;

public class DiningHakkersOnBecome {

  /*
   * 筷子是一个Actor,它可以被拿到或者放下
   */
  public static class Chopstick extends AbstractActor {

    // 当一双筷子被一个黑客拿到后一根后,将会拒绝另外一个黑客拿另外一根,同时拥有筷子的黑客需要放回拿到的一根筷子
    PartialFunction<Object, BoxedUnit> takenBy( final ActorRef hakker ) {
      return ReceiveBuilder. match(Take. class, new UnitApply<Take>() {
        public void apply(Take t) throws Exception {
          t. hakker.tell( new Busy(self()), self());
        }
      }).match(Put. class , new TypedPredicate<Put>() {
        public boolean defined(Put p) {
          return p .hakker == hakker ;
        }
      }, new UnitApply<Put>() {
        public void apply(Put p) throws Exception {
          context().become( available );
        }
      }).build();
    }

    // 筷子可用状态
    PartialFunction<Object, BoxedUnit> available = ReceiveBuilder.match(Take. class, new UnitApply<Take>() {
      public void apply(Take t ) throws Exception {
        context().become(takenBy( t. hakker));
        t. hakker.tell( new Taken(self()), self());
      }
    }).build();

    // 设置初始状态可用
    public Chopstick() {
      receive( available );
    }
  }

  public static class Hakker extends AbstractActor {
    private String name;
    private ActorRef left;
    private ActorRef right;

    public Hakker( final String name , ActorRef left, ActorRef right ) {
      this. name = name;
      this. left = left;
      this. right = right;
      // 设置默认首先去思考
      receive(ReceiveBuilder. matchEquals( Think, new UnitApply<Object>() {
        public void apply(Object i) throws Exception {
          System. out .println(String.format( "%s starts to think", name));
          startThinking(Duration. create(5, SECONDS));
        }
      }).build());
    }

    public Hakker() {
      receive(ReceiveBuilder. matchEquals( Think, new UnitApply<Object>() {
        public void apply(Object i) throws Exception {
          System. out .println(String.format( "%s starts to think", name));
          startThinking(Duration. create(5, SECONDS));
        }
      }).build());
    }

    // 吃行为:当黑客吃饭时,他会决定开始思考,并且放下手中的筷子。
    PartialFunction<Object, BoxedUnit> eating = ReceiveBuilder.matchEquals( Think, new UnitApply<Object>() {
      public void apply(Object i ) throws Exception {
        left.tell( new Put(self()), self());
        right.tell( new Put(self()), self());
        System. out .println(String.format( "%s puts down his chopsticks and starts to think", name));
        startThinking(Duration. create(5, SECONDS));
      }
    }).build();

    // 等待另外筷子行为:当黑客等待另外一根筷子时他可以获得另外一根即可吃饭啦。如果另外一根筷子被占用,则他会放弃手中已有的筷子继续去思考怎么能拿到筷子去吃饭!!
    PartialFunction<Object, BoxedUnit> waitingFor( final ActorRef chopstickToWaitFor , final ActorRef otherChopstick ) {
      return ReceiveBuilder. match(Taken. class, new TypedPredicate<Taken>() {
        public boolean defined(Taken t) {
          return t .chopstick == chopstickToWaitFor ;
        }

      }, new UnitApply<Taken>() {
        public void apply(Taken i) throws Exception {
          System. out .println(String.format( "%s has picked up %s and %s and starts to eat", name, left .path().name(), right .path().name()));
          context().become( eating);
          context().system().scheduler().scheduleOnce(Duration.create(5,SECONDS ), self(), Think, context().system().dispatcher(), self());
        }
      }).match(Busy. class , new UnitApply<Busy>() {
        public void apply(Busy b) throws Exception {
          otherChopstick .tell(new Put(self()), self());
          startThinking(Duration. create(10, MILLISECONDS ));
        }
      }).build();
    }

    // 放弃筷子行为:如果抓筷子没抓到,则继续去思考
    PartialFunction<Object, BoxedUnit> deniedAChopstick = ReceiveBuilder.match(Taken. class, new UnitApply<Taken>() {
      public void apply(Taken t ) throws Exception {
        t. chopstick .tell(new Put(self()), self());
        startThinking(Duration. create(10, MILLISECONDS));
      }
    }).match(Busy. class, new UnitApply<Busy>() {
      public void apply(Busy b ) throws Exception {
        startThinking(Duration. create(10, MILLISECONDS));
      }
    }).build();

    // 饿行为:当一个黑客饿的时候,他会努力捡筷子去吃饭;当他拿到一根筷子时会等待去拿另外一根筷子;如果尝试拿一根筷子失败他会等待下一次抓筷子的响应。
    PartialFunction<Object, BoxedUnit> hungry = ReceiveBuilder.match(Taken. class, new TypedPredicate<Taken>() {

      public boolean defined(Taken t ) {
        return t .chopstick == left ;
      }
    }, new UnitApply<Taken>() {
      public void apply(Taken t ) throws Exception {
        context().become(waitingFor( right, left));
      }
    }).match(Taken. class, new TypedPredicate<Taken>() {
      public boolean defined(Taken t ) {
        return t .chopstick == right ;
      }
    }, new UnitApply<Taken>() {
      public void apply(Taken t ) throws Exception {
        context().become(waitingFor( left, right));
      }
    }).match(Busy. class, new UnitApply<Busy>() {
      public void apply(Busy b ) throws Exception {
        context().become( deniedAChopstick );
      }
    }).build();

    // 思考行为:当黑客去思考时,会饿,然后去捡筷子吃饭
    PartialFunction<Object, BoxedUnit> thinking = ReceiveBuilder.matchEquals( Eat, new UnitApply<Object>() {
      public void apply(Object i ) throws Exception {
        context().become( hungry);
        left.tell( new Take(self()), self());
        right.tell( new Take(self()), self());
      }
    }

    ).build();

    private void startThinking(FiniteDuration duration ) {
      // 设置当前行为是思考
      context().become( thinking);
      // 定时过一段时间后去吃饭
      context().system().scheduler().scheduleOnce(duration , self(), Eat , context().system().dispatcher(), self());
    }
  }

  public static void main(String[] args) {
    ActorSystem system = ActorSystem. create();
    // 创建5双筷子
    ActorRef[] chopsticks = new ActorRef[5];
    for ( int i = 0; i < 5; i++)
      chopsticks[ i] = system .actorOf(Props.create(Chopstick. class ), "Chopstick" + i );

    // 创建5个黑客,并分配筷子
    List<String> names = Arrays. asList( "Ghosh", "Boner" , "Klang" , "Krasser" , "Manie" );
    List<ActorRef> hakkers = new ArrayList<ActorRef>();
    int i = 0;
    for (String name : names) {
      hakkers .add(system .actorOf(Props. create(Hakker. class, name , chopsticks [i ], chopsticks [(i + 1) % 5])));
      i++;
    }

    // 启动,黑客的初始状态是去思考
    for (ActorRef ar : hakkers) {
      ar.tell( Think, ActorRef. noSender());
    }

  }

}

输出结果:

Ghosh starts to think
Manie starts to think
Krasser starts to think
Boner starts to think
Klang starts to think
Klang has picked up Chopstick2 and Chopstick3 and starts to eat
Manie has picked up Chopstick4 and Chopstick0 and starts to eat
Klang puts down his chopsticks and starts to think
Boner has picked up Chopstick1 and Chopstick2 and starts to eat
Manie puts down his chopsticks and starts to think
Krasser has picked up Chopstick3 and Chopstick4 and starts to eat
Boner puts down his chopsticks and starts to think
Ghosh has picked up Chopstick0 and Chopstick1 and starts to eat
Krasser puts down his chopsticks and starts to think
Klang has picked up Chopstick2 and Chopstick3 and starts to eat
......
时间: 2024-08-06 11:51:07

akka入门-热插拔(become和unbecome)的相关文章

akka入门-远程调用

akka远程调用有两种形式: 一种是查找远程Actors,一种是创建远程Actors. 公用的类: import java.io.Serializable; public class Op { public interface MathOp extends Serializable { } public interface MathResult extends Serializable { } static class Add implements MathOp { private static

akka入门-简介

为什么使用akka akka是Actor模型的实现.Actors为我们提供了以下优点: 1)对并发/并行程序的简单的.高级别的抽象. 2)异步.非阻塞.高性能的事件驱动编程模型. 3)非常轻量的事件驱动处理. akka提供了容错性 使用"let-it-crash"语义和监管者树形结构来实现容错.非常适合编写永不停机.自愈合的高容错系统.监管者树形结构可以跨多个JVM来提供真正的高容错系统. 位置透明性 Akka的所有元素都为分布式环境而设计:所有actor都仅通过发送消息进行互操作,所

akka入门-简单示例

以下程序演示了akka的一个简单的示例.创建Actor去处理一条命令,通过消息传递的方式进行交互. 我使用的akka版本和相关jar包参见pom文件: <project xmlns= "http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation= "http://maven.apache.org/

akka入门-调用子Actor处理消息

程序演示了父子结构的Actor处理消息.父Actor接收到消息后调用子Actor处理. 1.创建父子Actor import java.util.UUID; import com.center.akka.simple.command.Command; import com.center.akka.simple.event.Event; import akka.actor.ActorRef; import akka.actor.Props; import akka.actor.UntypedAct

akka入门-对处理器状态进行持久化

该话题涉及几个概念: 处理器.信道.事件源.日志.状态. 消息可以持久化,通常当前的状态保存在内存中(内存镜像),而事件源机制可以通过重播接收到的消息(在应用程序正常启动或崩溃后)恢复当前(或历史)的状态. Eventsourced 实现了预写日志(write-ahead log ,WAL)用于跟踪一个Actor 所接收消息,并通过回放记录的消息来恢复其状态. 处理器是一个有状态的Actor,它将收到的消息记录在日志里(持久化). 1.创建处理器状态对象 import java.io.Seria

akka入门-基于信道进行消息可靠传输

程序的演示场景是:处理器发送命令,接收者接收到消息后进行处理并且对发送方发送消息确认表明已经成功收到消息.如果没有发送确认则表明该消息没有被接收并正确处理.失败消息会到达死信箱,系统下次启动时后继续发送死信箱中的发送失败的消息. 1.创建信道回复命令对象 import com.center.akka.simple.command.Command; public class ChannelReply { private Command command; private long sequenceN

akka入门-消息派发器

Akka MessageDispatcher是维持Akka Actor "运作"的部分, 可以说它是整个机器的引擎. 在没有为 Actor作配置的情况下,一个 ActorSystem 将有一个缺省的派发器. 缺省派发器是可配置的,缺省情况下是一个使用"fork-join-executor"的 Dispatcher , 在大多数情况下拥有非常好的性能. 1.为 Actor 指定派发器 在application.conf文件中配置 my-dispatcher { # D

hello akka入门示例

AKKA 是一款基于actor模型实现的 并发处理框架.基于事件驱动的并发处理模型,每一个actor拥有自己的属性和操作,这样就避免了通常情况下因为多个线程之间要共享属性(数据)而是用锁机制的处理.这种机制在scala,cloure 语言中应用的很好,将操作和属性放在一个独立的单元中进行处理,从而提高并发处理的能力. 下面用一个最简单的helloword作为进入akka世界的开始. 功能描述: 实现通过一个actior发送消息到另一个actor然后将处理结果返回,感觉很简单类似两个类的方法调用,

Akka入门实例

Akka 是一个用 Scala 编写的库,用于简化编写容错的.高可伸缩性的 Java 和 Scala 的 Actor 模型应用. Actor模型并非什么新鲜事物,它由Carl Hewitt于上世纪70年代早期提出,目的是为了解决分布式编程中一系列的编程问题.其特点如下: 系统中的所有事物都可以扮演一个Actor Actor之间完全独立 在收到消息时Actor所采取的所有动作都是并行的,在一个方法中的动作没有明确的顺序 Actor由标识和当前行为描述 Actor可能被分成原始(primitive)