事件溯源【其他模式】

事件溯源

@Slf4j
public class EventSourcing {
    /**
     * 事件溯源模式:
     * Instead of storing just the current state of the data in a domain,
     * use an append-only store to record the full series of actions taken on that data.
     * The store acts as the system of record and can be used to materialize the domain objects.
     * This can simplify tasks in complex domains, by avoiding the need to synchronize the data model
     * and the business domain, while improving performance, scalability, and responsiveness.
     * It can also provide consistency for transactional data, and maintain full audit trails
     * and history that can enable compensating actions
     * 除了只存储领域模型的当前状态外,使用 append-only 存储来记录对该数据的所有修改操作。
     * 该存储充当记录系统,可用于实现域对象的具体化。
     * 这可以通过避免同步数据模型和业务领域模型,来简化复杂领域模型中的任务,同时提高性能、可伸缩性和响应能力。
     * 它还可以为事务数据提供一致性,并保持完整的审计跟踪和修改历史来执行补偿操作。
     */
    /**
     * The constant ACCOUNT OF DAENERYS.
     */
    public static final int ACCOUNT_OF_DAENERYS = 1;
    /**
     * The constant ACCOUNT OF JON.
     */
    public static final int ACCOUNT_OF_JON = 2;

    @Test
    public void all() {
        DomainEventProcessor eventProcessor = new DomainEventProcessor();

        log.info("Running the system first time............");
        eventProcessor.reset();

        log.info("Creating th accounts............");
        eventProcessor
        .process(new AccountCreateEvent(0, new Date().getTime(), ACCOUNT_OF_DAENERYS, "Daenerys Targaryen"));

        eventProcessor.process(new AccountCreateEvent(1, new Date().getTime(), ACCOUNT_OF_JON, "Jon Snow"));

        log.info("Do some money operations............");

        eventProcessor
        .process(new MoneyDepositEvent(2, new Date().getTime(), ACCOUNT_OF_DAENERYS, new BigDecimal("100000")));

        eventProcessor.process(new MoneyDepositEvent(3, new Date().getTime(), ACCOUNT_OF_JON, new BigDecimal("100")));

        eventProcessor.process(new MoneyTransferEvent(4, new Date().getTime(), new BigDecimal("10000"),
                ACCOUNT_OF_DAENERYS, ACCOUNT_OF_JON));

        log.info("...............State:............");
        log.info(AccountAggregate.getAccount(ACCOUNT_OF_DAENERYS).toString());
        log.info(AccountAggregate.getAccount(ACCOUNT_OF_JON).toString());

        log.info("At that point system had a shot down, state in memory is cleared............");
        AccountAggregate.resetState();

        log.info("Recover the system by the events in journal file............");

        eventProcessor = new DomainEventProcessor();
        eventProcessor.recover();

        log.info("...............Recovered State:............");
        log.info(AccountAggregate.getAccount(ACCOUNT_OF_DAENERYS).toString());
        log.info(AccountAggregate.getAccount(ACCOUNT_OF_JON).toString());
    }
}

/**
 * 1)需要持久化的事件抽象
 */
@Data
@RequiredArgsConstructor
abstract class DomainEvent implements Serializable {
    private static final long serialVersionUID = 5922894715338132042L;
    /**
     * 唯一序列号
     */
    private final long sequenceId;
    /**
     * 创建时间
     */
    private final long createdTime;
    /**
     * 实际的事件类型
     */
    private final String eventClassName;
    private boolean realTime = true;

    public abstract void process();
}

class AccountAggregate {
    private static Map<Integer, Account> accounts = new ConcurrentHashMap<>();

    private AccountAggregate() {
    }

    public static void putAccount(Account account) {
        accounts.put(account.getAccountNo(), account);
    }

    public static Account getAccount(int accountNo) {
        final Account account = accounts.get(accountNo);
        if (account == null) {
            return null;
        }
        return account.copy();
    }

    public static void resetState() {
        accounts = new ConcurrentHashMap<>();
    }
}

/**
 * 2)需要持久化的具体事件
 */
@Data
class AccountCreateEvent extends DomainEvent {
    private static final long serialVersionUID = -493304186114851718L;
    private final int accountNo;
    private final String owner;

    public AccountCreateEvent(long sequenceId, long createdTime, int accountNo, String owner) {
        super(sequenceId, createdTime, "AccountCreateEvent");
        this.accountNo = accountNo;
        this.owner = owner;
    }

    @Override
    public void process() {
        Account account = AccountAggregate.getAccount(accountNo);
        if (account != null) {
            throw new RuntimeException("Account already exists");
        }
        account = new Account(accountNo, owner);
        account.handleEvent(this);
    }
}

@Data
class MoneyDepositEvent extends DomainEvent {
    private final BigDecimal money;
    private final int accountNo;

    public MoneyDepositEvent(long sequenceId, long createdTime, int accountNo, BigDecimal money) {
        super(sequenceId, createdTime, "MoneyDepositEvent");
        this.money = money;
        this.accountNo = accountNo;
    }

    @Override
    public void process() {
        final Account account = AccountAggregate.getAccount(accountNo);
        if (account == null) {
            throw new RuntimeException("Account not found");
        }
        account.handleEvent(this);
    }
}

@Data
class MoneyTransferEvent extends DomainEvent {
    private static final long serialVersionUID = -5846383677434713494L;
    private final BigDecimal money;
    private final int accountNoFrom;
    private final int accountNoTo;

    public MoneyTransferEvent(long sequenceId, long createdTime, BigDecimal money, int accountNoFrom, int accountNoTo) {
        super(sequenceId, createdTime, "MoneyTransferEvent");
        this.money = money;
        this.accountNoFrom = accountNoFrom;
        this.accountNoTo = accountNoTo;
    }

    @Override
    public void process() {
        final Account accountFrom = AccountAggregate.getAccount(accountNoFrom);
        if (accountFrom == null) {
            throw new RuntimeException("Account not found " + accountNoFrom);
        }
        final Account accountTo = AccountAggregate.getAccount(accountNoTo);
        if (accountTo == null) {
            throw new RuntimeException("Account not found" + accountTo);
        }

        accountFrom.handleTransferFromEvent(this);
        accountTo.handleTransferToEvent(this);
    }
}

@Data
@Slf4j
class Account {
    private final int accountNo;
    private final String owner;
    private BigDecimal money;

    public Account(int accountNo, String owner) {
        this.accountNo = accountNo;
        this.owner = owner;
        money = BigDecimal.ZERO;
    }

    public Account copy() {
        final Account account = new Account(accountNo, owner);
        account.setMoney(money);
        return account;
    }

    private void depositMoney(BigDecimal money) {
        this.money = this.money.add(money);
    }

    private void withdrawMoney(BigDecimal money) {
        this.money = this.money.subtract(money);
    }

    private void handleDeposit(BigDecimal money, boolean realTime) {
        depositMoney(money);
        AccountAggregate.putAccount(this);
        if (realTime) {
            log.info("Some external api for only realtime execution could be called here.");
        }
    }

    private void handleWithdrawal(BigDecimal money, boolean realTime) {
        if (this.money.compareTo(money) == -1) {
            throw new RuntimeException("Insufficient Account Balance");
        }

        withdrawMoney(money);
        AccountAggregate.putAccount(this);
        if (realTime) {
            log.info("Some external api for only realtime execution could be called here.");
        }
    }

    public void handleEvent(AccountCreateEvent accountCreateEvent) {
        AccountAggregate.putAccount(this);
        if (accountCreateEvent.isRealTime()) {
            log.info("Some external api for only realtime execution could be called here.");
        }
    }

    public void handleEvent(MoneyDepositEvent moneyDepositEvent) {
        handleDeposit(moneyDepositEvent.getMoney(), moneyDepositEvent.isRealTime());
    }

    public void handleTransferFromEvent(MoneyTransferEvent moneyTransferEvent) {
        handleWithdrawal(moneyTransferEvent.getMoney(), moneyTransferEvent.isRealTime());
    }

    public void handleTransferToEvent(MoneyTransferEvent moneyTransferEvent) {
        handleDeposit(moneyTransferEvent.getMoney(), moneyTransferEvent.isRealTime());
    }
}

/**
 * 3)事件回溯
 */
class JsonFileJournal {
    private final File aFile;
    private List<String> events;
    private int index = 0;

    public JsonFileJournal() {
        aFile = new File("Journal.json");
        try {
            events = Files.lines(aFile.toPath()).collect(Collectors.toList());
        } catch (final IOException e) {
            events = Lists.newArrayList();
        }
    }

    public void write(DomainEvent domainEvent) {
        final Gson gson = new Gson();
        JsonElement jsonElement;
        if (domainEvent instanceof AccountCreateEvent) {
            jsonElement = gson.toJsonTree(domainEvent, AccountCreateEvent.class);
        } else if (domainEvent instanceof MoneyDepositEvent) {
            jsonElement = gson.toJsonTree(domainEvent, MoneyDepositEvent.class);
        } else if (domainEvent instanceof MoneyTransferEvent) {
            jsonElement = gson.toJsonTree(domainEvent, MoneyTransferEvent.class);
        } else {
            throw new RuntimeException("Journal Event not recegnized");
        }

        try (Writer output = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(aFile, true), "UTF-8"))) {
            final String eventString = jsonElement.toString();
            output.write(eventString + "\r\n");
        } catch (final IOException e) {
            throw new RuntimeException(e);
        }
    }

    public void reset() {
        aFile.delete();
    }

    public DomainEvent readNext() {
        if (index >= events.size()) {
            return null;
        }
        final String event = events.get(index);
        index++;

        final JsonParser parser = new JsonParser();
        final JsonElement jsonElement = parser.parse(event);
        final String eventClassName = jsonElement.getAsJsonObject().get("eventClassName").getAsString();
        final Gson gson = new Gson();
        DomainEvent domainEvent;
        if (eventClassName.equals("AccountCreateEvent")) {
            domainEvent = gson.fromJson(jsonElement, AccountCreateEvent.class);
        } else if (eventClassName.equals("MoneyDepositEvent")) {
            domainEvent = gson.fromJson(jsonElement, MoneyDepositEvent.class);
        } else if (eventClassName.equals("MoneyTransferEvent")) {
            domainEvent = gson.fromJson(jsonElement, MoneyTransferEvent.class);
        } else {
            throw new RuntimeException("Journal Event not recegnized");
        }

        domainEvent.setRealTime(false);
        return domainEvent;
    }
}

/**
 * 4)事件持久化和回溯的处理器
 */
class DomainEventProcessor {
    private final JsonFileJournal processorJournal = new JsonFileJournal();

    public void process(DomainEvent domainEvent) {
        domainEvent.process();
        processorJournal.write(domainEvent);
    }

    public void reset() {
        processorJournal.reset();
    }

    public void recover() {
        DomainEvent domainEvent;
        while (true) {
            domainEvent = processorJournal.readNext();
            if (domainEvent == null) {
                break;
            } else {
                domainEvent.process();
            }
        }
    }
}

原文地址:https://www.cnblogs.com/zhuxudong/p/10223630.html

时间: 2024-11-03 22:07:44

事件溯源【其他模式】的相关文章

基于Kafka构建事件溯源模式的微服务

概要 本文中我们将讨论如何借助Kafka实现分布式消息管理,使用事件溯源(Event Sourcing)模式实现原子化数据处理,使用CQRS模式(Command-Query Responsibility Segregation )实现查询职责分离,使用消费者群组解决单点故障问题,理解分布式协调框架Zookeeper的运行机制.整个应用的代码实现使用Go语言描述. 第一部分 引子.环境准备.整体设计及实现第二部分 消息消费者及其集群化第三部分 测试驱动开发.Docker部署和持续集成第一部分 引子

ENode框架Conference案例分析系列之 - 事件溯源如何处理重构问题

前言 本文可能对大多数不太了解ENode的朋友来说,理解起来比较费劲,这篇文章主要讲思路,而不是一上来就讲结果.我写文章,总是希望能把自己的思考过程尽量能表达出来,能让大家知道每一个设计背后的思考的东西.我觉得,任何设计的结果可能看起来很高大上,一张图即可,但背后的思考,才是更有价值的东西. 本篇文章想写一下ENode如何处理由于业务需求的变化而导致的模型重构的问题.DDD之所以能解决复杂的业务问题是因为DDD是一种模型驱动的软件设计方法.用领域模型来捕获业务需求,根据业务需求,抽象出满足需求的

MSDN搬运 之 [基于事件的异步模式]

基于事件的异步模式概述 那些同时执行多项任务.但仍能响应用户交互的应用程序通常需要实施一种使用多线程的设计方案.System.Threading 命名空间提供了创建高性能多线程应用程序所必需的所有工具,但要想有效地使用这些工具,需要有丰富的使用多线程软件工程的经验.对于相对简单的多线程应用程序,BackgroundWorker 组件提供了一个简单的解决方案.对于更复杂的异步应用程序,请考虑实现一个符合基于事件的异步模式的类. 基于事件的异步模式具有多线程应用程序的优点,同时隐匿了多线程设计中固有

Linux服务器应急事件溯源报告

Linux服务器应急事件溯源报告 小博博 · 2016/02/18 17:43 Author:Inn0team 0x00 目录 关于目标环境的中间进度检测报告 一:情况概述 二:取证情况 2.1 目标网络情况 2.2 针对xxx服务器中间件的检测 2.3 针对xxx服务器进程及端口的检测 2.4 发现攻击者的攻击操作 三:溯源操作 3.1 关于攻击者的反向检测 四:攻击源确定 4.1 确定攻击入口处 五:安全性建议 关于目标环境的中间进度检测报告 0x01 情况概述 监控软件监控到服务器存在异常

事件溯源如何处理重构问题

事件溯源如何处理重构问题 前言 本文可能对大多数不太了解ENode的朋友来说,理解起来比较费劲,这篇文章主要讲思路,而不是一上来就讲结果.我写文章,总是希望能把自己的思考过程尽量能表达出来,能让大家知道每一个设计背后的思考的东西.我觉得,任何设计的结果可能看起来很高大上,一张图即可,但背后的思考,才是更有价值的东西. 本篇文章想写一下ENode如何处理由于业务需求的变化而导致的模型重构的问题.DDD之所以能解决复杂的业务问题是因为DDD是一种模型驱动的软件设计方法.用领域模型来捕获业务需求,根据

BackgroundWorker实现事件的异步模式

最近在看C#的多线程,现把BackgroundWorker实现事件的异步模式这部分用代码注释的形式写出来了,个人理解,有什么不对的还望指正.. namespace LeranTest{ /// <summary> /// Interaction logic for Window1.xaml /// </summary> public partial class Window1 : Window { /// <summary> /// 使用BackgroundWorker

C#中的异步调用及异步设计模式(三)——基于事件的异步模式

四.基于事件的异步模式(设计层面) 基于事件的C#异步编程模式是比IAsyncResult模式更高级的一种异步编程模式,也被用在更多的场合.该异步模式具有以下优点: ·                  “在后台”执行耗时任务(例如下载和数据库操作),但不会中断您的应用程序. ·                  同时执行多个操作,每个操作完成时都会接到通知(在通知中可以区分是完成了哪个操作). ·                  等待资源变得可用,但不会停止(“挂起”)您的应用程序. ·  

基于事件的异步模式——BackgroundWorker

实现异步处理的方法很多,经常用的有基于委托的方式,今天记录的是基于事件的异步模式.利用BackgroundWorker组件可以很轻松的实现异步处理,并且该组件还支持事件的取消.进度报告等功能.本文以计算两个数X.Y的和为例. 程序界面如下图,其中三个文本框分别为两个加数和处理结果,两个按钮为计算和取消,按钮下方为进度条. 引入BackgroundWorker组件,为DoWork.ProgressChanged.RunWorkerCompleted三个事件指定方法.将WorkerReportsPr

基于事件的异步模式(EAP)

什么是EAP异步编程模式 EAP基于事件的异步模式是.net 2.0提出来的,实现了基于事件的异步模式的类将具有一个或者多个以Async为后缀的方法和对应的Completed事件,并且这些类都支持异步方法的取消.进度报告和报告结果.然而.net中并不是所有的类都支持EAP,总结起来有以下17个类支持EAP异步. System.Object的派生类型: System.Activies.WorkflowInvoke System.Deployment.Application.ApplicationD