1、测试目标:证实Akka确保Actor的每个实例都运行在自己的轻量级线程里,并拥有自己的队列,保证先进先出的处理每一条消息。
2、akka中actor既是发射器又是接收器,发消息和收消息时必经过自己的onReceive方法。
3、如果使用从spring获取的同一实例测试,得出每个actor实例是自包含一个队列的,发和收的顺序一致。
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(); ctx.scan("com.anyvape.common.akka"); ctx.refresh(); // get hold of the actor system ActorSystem system = ctx.getBean(ActorSystem.class); // use the Spring Extension to create props for a named actor bean ActorRef greeter = system.actorOf( CountingProvider.get(system).props("CountingActor"), "counter"); ActorRef greetPrinter = system.actorOf( CountingProvider.get(system).props("GreetPrinter"), "greetPrinter"); for(int i=0;i<100;i++){ greeter.tell(new Greeting(String.valueOf(i)), greetPrinter); }
4、同一actor类的多个actor实例向同一actor实例发消息测试。测试时在接受方加一断点,等到发送方发送完毕,打开断点,发现接受方打印顺序和发送顺序一致。
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(); ctx.scan("com.anyvape.common.akka"); ctx.refresh(); // get hold of the actor system ActorSystem system = ctx.getBean(ActorSystem.class); // use the Spring Extension to create props for a named actor bean ActorRef greeter = system.actorOf( CountingProvider.get(system).props("CountingActor"), "counter"); ActorRef greetPrinter = system.actorOf( CountingProvider.get(system).props("GreetPrinter"), "greetPrinter"); for(int i=0;i<100;i++){ greeter = system.actorOf( CountingProvider.get(system).props("CountingActor"), "counter".concat(String.valueOf(i))); greeter.tell(new Greeting(String.valueOf(i)), greetPrinter); }
5、代码
spring的注解配置类
import akka.actor.ActorSystem; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import static com.anyvape.common.akka.producer.CountingActorFactory.CountingProvider; /** * The application configuration. */ @Configuration class AppConfiguration { // the application context is needed to initialize the Akka Spring Extension @Autowired private ApplicationContext applicationContext; /** * Actor system singleton for this application. */ @Bean public ActorSystem actorSystem() { ActorSystem system = ActorSystem.create("AkkaJavaSpring"); // initialize the application context in the Akka Spring Extension CountingProvider.get(system).initialize(applicationContext); return system; } }
从spring管理上下文获取actor的代理类
import akka.actor.Actor; import akka.actor.IndirectActorProducer; import org.springframework.context.ApplicationContext; /** * 一个代理类,从spring上下文获取对象,实例必须实现生产actor的接口方法和定义生产类类型的接口方法 * An actor producer that lets Spring create the Actor instances. */ public class ActorProducer implements IndirectActorProducer { final ApplicationContext applicationContext; final String actorBeanName; public ActorProducer(ApplicationContext applicationContext, String actorBeanName) { this.applicationContext = applicationContext; this.actorBeanName = actorBeanName; } @Override public Actor produce() { return (Actor) applicationContext.getBean(actorBeanName); } @Override public Class<? extends Actor> actorClass() { return (Class<? extends Actor>) applicationContext.getType(actorBeanName); } }
工厂类,使用上面的代理生成actor
import akka.actor.AbstractExtensionId; import akka.actor.ExtendedActorSystem; import akka.actor.Extension; import akka.actor.Props; import org.springframework.context.ApplicationContext; /** * 调用ActorProducer创建actor * An Akka Extension to provide access to Spring managed Actor Beans. */ public class CountingActorFactory extends AbstractExtensionId<CountingActorFactory.SpringExt> { /** * The identifier used to access the SpringExtension. */ public static CountingActorFactory CountingProvider = new CountingActorFactory(); /** * Is used by Akka to instantiate the Extension identified by this * ExtensionId, internal use only. */ @Override public SpringExt createExtension(ExtendedActorSystem system) { return new SpringExt(); } /** * The Extension implementation. */ public static class SpringExt implements Extension { private volatile ApplicationContext applicationContext; /** * Used to initialize the Spring application context for the extension. * @param applicationContext */ public void initialize(ApplicationContext applicationContext) { this.applicationContext = applicationContext; } /** * Create a Props for the specified actorBeanName using the * SpringActorProducer class. * * @param actorBeanName The name of the actor bean to create Props for * @return a Props that will create the named actor bean using Spring */ public Props props(String actorBeanName) { return Props.create(ActorProducer.class, applicationContext, actorBeanName); } } }
角色1
import java.util.Random; import akka.actor.UntypedActor; import javax.inject.Inject; import javax.inject.Named; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import com.anyvape.common.akka.service.CountingService; /** * 异步统计用户对这个关键词的使用频率 * * @note The scope here is prototype since we want to create a new actor * instance for use of this bean. */ @Component("CountingActor") public class CountingActor extends UntypedActor { public static class Count {} public static class Get {} @Autowired CountingService countingService; private int count = 0; @Override public void onReceive(Object message) throws Exception { if (message instanceof Count) { count = countingService.increment(count); System.out.println("--------"+count); //Thread.sleep(new Random().nextInt(400) + 1); } else if (message instanceof Get) { getSender().tell(count, getSelf()); } else { unhandled(message); } } }
角色2
import java.util.Random; import com.anyvape.common.akka.Hello2.Greeting; import akka.actor.UntypedActor; public class GreetPrinter extends UntypedActor { public void onReceive(Object message) { if (message instanceof Greeting) System.out.println("-------------re:"+((Greeting) message).message); try { Thread.sleep(new Random().nextInt(5*1000) + 1); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } getSender().tell(new com.anyvape.common.akka.Hello2.End(((Greeting) message).message), getSelf()); } }
时间: 2024-10-05 10:11:46