python模拟进程状态
代码
from transitions import Machine
from collections import namedtuple
class FsmState:
def enter(self,event,fsm):
pass
def exit(self, fsm):
pass
class New(FsmState):
def enter(self,event,fsm):
print('New')
def exit(self,fsm):
print('Ready')
class Ready(FsmState):
def enter(self,event,fsm):
print('Ready')
def exit(self,event,fsm):
print('Running')
class Running(FsmState):
def enter(self,event,fsm):
print('Running')
def exit_1(self,fsm):
print('Ready')
def exit_2(self,fsm):
print('Waiting')
def exit_3(self,fsm):
print('Terminated')
class Waiting(FsmState):
def enter(self,evevt,fsm):
print('Waiting')
def exit(self,fsm):
print('Ready')
class FsmFinalState(FsmState):
def enter(self,event,fsm):
print('Terminated')
class FsmEvent:
pass
class Admitted(FsmEvent):
pass
class Dispatch(FsmEvent):
pass
class Interrupt(FsmEvent):
pass
class IO_EW(FsmEvent):
pass
class IO_EC(FsmEvent):
pass
class Exit(FsmEvent):
pass
Transaction = namedtuple('Transaction',['prev_state','event'])
class FSM:
def __init__(self,context):
self.context = context
self.state_transaction_table = []
self.global_transaction_table = []
self.current_state = None
self.working_state = FsmState
def add_global_transaction(self,event,end_state):
if not issubclass(end_state,FsmFinalState):
raise FsmException('The state should be FsmFinalState')
self.global_transaction_table.append(Transaction(self.working_state,end_state))
#
def add_transaction(self,prev_state, event, next_state):
if issubclass(prev_state, FsmFinalState):
raise FsmException("It's not allowed to add transaction after Final State Node")
self.state_transaction_table.append(Transaction(prev_state, event, next_state))
def process_event(self, event):
for transaction in self.global_transaction_table:
if isinstance(event, transaction.event):
self.current_state = transaction.next_state()
self.current_state.enter(event, self)
self.clear_transaction_table()
return
for transaction in self.state_transaction_table:
if isinstance(self.current_state, transaction.prev_state) and isinstance(event, transaction.event):
self.current_state.exit(self.context)
self.current_state = transaction.next_state()
self.current_state.enter(event, self)
if isinstance(self.current_state, FsmFinalState):
self.clear_transaction_table()
return
raise FsmException("Transaction not found")
def clear_transaction_table(self):
self.global_transaction_table = []
self.state_transaction_table = []
self.current_state = None
def run(self):
if len(self.state_transaction_table) == 0: return
self.current_state = self.state_transaction_table[0].prev_state()
self.current_state.enter(None, self)
def isRunning(self):
return self.current_state is not None
def next_state(self, event):
for transaction in self.global_transaction_table:
if isinstance(event, transaction.event):
return transaction.next_state
for transaction in self.state_transaction_table:
if isinstance(self.current_state, transaction.prev_state) and isinstance(event, transaction.event):
return transaction.next_state
return None
class FsmException(Exception):
def __init__(self, description):
super().__init__(description)
class ZTJ(FSM):
def __init__(self):
super().__init__(None)
ztj = ZTJ();
ztj.add_transaction(New, Admitted, Ready)
ztj.add_transaction(Ready, Dispatch, Running)
ztj.add_transaction(Running, Interrupt, Ready)
ztj.add_transaction(Running, IO_EW,Waiting)
ztj.add_transaction(Waiting, IO_EC, Ready)
ztj.add_transaction(Running, Exit, FsmFinalState)
ztj.run()
ztj.process_event(Admitted())
ztj.process_event(Dispatch())
ztj.process_event(Interrupt())
ztj.process_event(IO_EW())
ztj.process_event(IO_EC())
ztj.process_event(Exit())
程序报错
不知道该怎么解决
原文地址:https://www.cnblogs.com/lxy2019/p/11961915.html
时间: 2024-10-13 00:55:02