工作流Activiti框架的事务和并发!流程引擎中异步和排他操作详细解析

2022-01-22 10:01:55 浏览数 (1)

事务和并发

异步操作

  • Activiti通过事务方式执行流程,可以根据需求定制
  • Activiti处理事务:
    • 如果触发了Activiti的操作(开始流程,完成任务,触发流程继续执行),activiti会推进流程,直到每个分支都进入等待状态
    • 抽象的说,会从流程图执行深度优先搜索,如果每个分支都遇到等待状态,就会返回
    • 等待状态是稍后需要执行任务,Activiti会把当前状态保存到数据库中,然后等待下一次触发
    • 触发可能来自外部,比如用户任务或接收到一个消息,也可能来自Activiti本身(定时器事件)

    流程包含用户任务,服务任务和定时器事件 完成用户任务和校验地址是在同一个工作单元中,两者的成功和失败是原子性的.意味着如果服务任务抛出异常,要回滚当前事务,这样流程会退回到用户任务,用户任务就依然在数据库里 这就是activiti默认的行为.在(1)中应用或客户端线程完成任务.这会执行服务,流程推进,直到遇到一个等待状态,就是定时器(2),然后它会返回给调用者(3),并提交事务(如果事务是由Activiti开启的)

  • 有时需要自定义控制流程中事务的边界,把业务逻辑包裹在一起.这就需要使用异步执行:

完成了用户任务,生成一个发票,把发票发送给客户 生成发票不在同一个工作单元内了.如果生成发票出错不需要对用户任务进行回滚 Activiti实现的是完成用户任务(1),提交事务,返回给调用者应用.然后在后台的线程中,异步执行生成发票. 后台线程就是Activiti的Job执行器(一个线程池)周期对数据库的Job进行扫描:当到达"generate invoice"任务,为Activiti创建一个稍后执行的Job"消息",并保存到数据库.Job会被Job执行器获取并执行.也会给本地Job执行器一个提醒,告诉有一个新Job,来增加性能

  • 要想使用这个特性,要使用activiti:async="true" 扩展
代码语言:javascript复制
<serviceTask id="service1" name="Generate Invoice" activiti:class="my.custom.Delegate" activiti:async="true" />
  • activiti:async可以使用到以下BPMN任务类型中:
    • task
    • serviceTask,
    • scriptTask
    • businessRuleTask
    • sendTask
    • receiveTask
    • userTask
    • subProcess
    • callActivity
  • 对于userTask,receiveTask和其他等待状态,异步执行的作用是让开始流程监听器运行在一个单独的线程或者事务中

排他任务

  • 从Activiti 5.9开始 ,JobExecutor能保证同一个流程实例中的Job不会并发执行
排他任务的产生背景
  • 一个并行网关,后面有三个服务任务,都设置为异步执行:
    • 这样会添加三个job到数据库里.一旦job进入数据库,就可以被jobExecutor执行了.JobExecutor会获取job,代理到工作线程的线程池中,在那里真正执行job
    • 就是说,使用异步执行,可以把任务分配给这个线程池(在集群环境,可能会使用多个线程池)
    • 产生一致性问题:
      • 考虑一下服务任务后的汇聚:当服务任务完成后,到达并发汇聚节点,需要决定是等待其他分支,还是继续向下执行
      • 就是说,对每个到达并行汇聚的分支,都需要判断是继续还是等待其他分支的一个或多个分支
  • 为什么会产生这样的问题:
    • 因为服务任务配置成使用异步执行,可能相关的job都在同一时间被获取,被JobExecutor分配给不同的工作线程执行
    • 结果是,三个单独的服务执行使用的事务在到达并发汇聚时可能重叠:
      • 如果出现了这个问题,这些事务是互相不可见的,其他事务同时到达了相同的并发汇聚,假设都在等待其他分支
      • 然而,每个事务都假设在等待其他分支,所以没有分支会越过并发汇聚继续执行,流程实例会一直在等待状态,无法继续执行
  • Activiti解决这个问题方式:
    • Activiti使用了乐观锁:
      • 当基于判断的数据看起来不是最新的时候 (因为其他事务可能在提交之前进行了修改,会在每个事务里增加数据库同一行的版本),这个时候,第一个提交的事务会成功,其他会因为乐观锁异常导致失败
      • 这就解决了上面流程的问题:
        • 如果多个分支同步到达并行汇聚,会假设都在登录,并增加父流程的版本号(流程实例)然后尝试提交
        • 第一个分支会成功提交,其他分支会因为乐观锁导致失败
        • 因为流程是被job触发的,Activiti会尝试在等待一段时间后尝试执行同一个job,这段时间可以同步网关的状态
  • Activiti乐观锁是一个很好的解决方案吗?
    • 乐观锁允许Activiti避免非一致性,确定流程不会"堵在汇聚网关": 或者所有分支都通过网关,或者数据库中的job正在尝试通过
    • 虽然这是一个对于持久性和一致性的完美解决方案,但对于上层来说不一定是期望的行为:
      • Activiti只会对同一个job重试估计次数(默认配置为3).之后,job还会在数据库里,但是不会再重试了.意味着这个操作必须手工执行job的触发
      • 如果job有非事务方面的效果,不会因为失败的事务回滚:如果“预定演唱会门票”服务没有与Activiti共享事务,重试job可能导致我们预定了过多门票
  • 针对这些问题,在Activiti中推出了新的概念:排他job
排他Job
  • 对于一个流程实例,排他任务不能同时执行两个
  • 考虑上面的流程:如果我们把服务任务申请为排他任务,JobExecutor会保证对应的job不会并发执行.
  • 会保证无论什么时候获取一个流程实例的排他任务,都会把同一个流程实例的其他任务都取出来,放在同一个工作线程中执行.保证job是顺序执行的
  • 从activiti 5.9开始,排他任务已经是默认配置.所以异步执行和定时器事件默认都是排他任务
  • 如果你想把job设置为非排他,可以使用activiti:exclusive="false" 进行配置:
代码语言:javascript复制
<serviceTask id="service" activiti:expression="${myService.performBooking(hotel, dates)}" activiti:async="true" activiti:exclusive="false" />
  • 排他任务没有性能问题:
    • 在高负载的情况下性能是个问题,高负载意味着JobExecutor的所有工作线程都一直在忙碌着
    • 使用排他任务,Activiti可以简单的分布不同的负载.排他任务意味着同一个流程实例的异步执行会由相同的线程顺序执行
    • 但是要考虑:如果有多个流程实例时.所有其他流程实例的job也会分配给其他线程同步执行
    • 意味着虽然Activiti不会同时执行一个流程实例的排他job,但是还会同步执行多个流程实例的异步执行
    • 通过一个总体的预测,在大多数场景下,排他任务都会让单独的实例运行的更迅速.而且,对于同一流程实例中的job,需要用到的数据也会利用执行的集群节点的缓存.如果任务没有在同一个节点执行,数据就必须每次从数据库重新读取了

流程实例授权

  • 默认所有人在部署的流程定义上启动一个新流程实例,通过流程初始化授权功能定义的用户和组,web客户端可以限制哪些用户可以启动一个新流程实例
  • Activiti引擎不会校验授权定义: 这个功能只是为减轻web客户端开发者实现校验规则的难度
  • 设置方法与用户任务用户分配类似,用户或组可以使用activiti:potentialStarter标签分配为流程的默认启动者:
代码语言:javascript复制
 <process id="potentialStarter">
     <extensionElements>
       <activiti:potentialStarter>
         <resourceAssignmentExpression>
           <formalExpression>group2, group(group3), user(user3)</formalExpression>
         </resourceAssignmentExpression>
       </activiti:potentialStarter>
     </extensionElements>
   <startEvent id="theStart"/>
   ...

user(user3)是直接引用了用户user3,group(group3)是引用了组group3.如果没显示设置,默为群组

  • 也可以使用process标签的属性activiti:candidateStarterUsersactiviti:candidateStarterGroups
代码语言:javascript复制
 <process id="potentialStarter" activiti:candidateStarterUsers="user1, user2"
                                activiti:candidateStarterGroups="group1">
      ...

可以同时使用这两个属性

  • 定义流程初始化授权后,开发者可以使用如下方法获得授权定义.可以获得给定的用户能够启动哪些流程定义:
代码语言:javascript复制
 processDefinitions = repositoryService.createProcessDefinitionQuery().startableByUser("userxxx").list();
  • 可以获得指定流程定义设置的潜在启动者对应的IdentityLink:
代码语言:javascript复制
 identityLinks = repositoryService.getIdentityLinksForProcessDefinition("processDefinitionId"); 
  • 获得可以启动给定流程的用户列表的示例:
代码语言:javascript复制
  List<User> authorizedUsers =  identityService().createUserQuery().potentialStarter("processDefinitionId").list(); 
  • 获得可以启动给定流程配置的群组的示例:
代码语言:javascript复制
 List<Group> authorizedGroups =  identityService().createGroupQuery().potentialStarter("processDefinitionId").list();  

数据对象

  • BPMN提供了一种功能,可以在流程定义或子流程中定义数据对象
  • 根据BPMN规范,流程定义可以包含复杂XML结构,可以导入XSD定义
  • 对于Activiti来说 ,作为Activiti首次支持的数据对象, 可以支持如下的XSD类型
代码语言:javascript复制
	<dataObject id="dObj1" name="StringTest" itemSubjectRef="xsd:string"/>
代码语言:javascript复制
	<dataObject id="dObj2" name="BooleanTest" itemSubjectRef="xsd:boolean"/>
代码语言:javascript复制
	<dataObject id="dObj3" name="DateTest" itemSubjectRef="xsd:datetime"/>
代码语言:javascript复制
	<dataObject id="dObj4" name="DoubleTest" itemSubjectRef="xsd:double"/>
代码语言:javascript复制
	<dataObject id="dObj5" name="IntegerTest" itemSubjectRef="xsd:int"/>
代码语言:javascript复制
	<dataObject id="dObj6" name="LongTest" itemSubjectRef="xsd:long"/>
  • 数据对象定义会自动转换为流程变量,名称与name属性对应
  • 除了数据对象的定义之外,Activiti支持使用扩展元素来为这个变量赋予默认值:
代码语言:javascript复制
<process id="dataObjectScope" name="Data Object Scope" isExecutable="true">
          <dataObject id="dObj123" name="StringTest123" itemSubjectRef="xsd:string">
            <extensionElements>
              <activiti:value>Testing123</activiti:value>
            </extensionElements>
          </dataObject>

0 人点赞