python分布式事务方案(二)基于消息最终一致性

2019-08-02 11:08:43 浏览数 (1)

python分布式事务方案(二)基于消息最终一致性

上一章采用的是tcc方案,但是在进行批量操作时,比如说几百台主机一起分配策略时,会执行很长时间,这时体验比较差。 由于zabbix隐藏域后台,而这个慢主要是集中在调用zabbix接口,这里我们就基于消息最终一致性来进行优化 消息一致性方案是通过消息中间件保证上、下游应用数据操作的一致性。基本思路是将本地操作和发送消息放在一个事务中,保证本地操作和消息发送要么两者都成功或者都失败。下游应用向消息系统订阅该消息,收到消息后执行相应操作。

本地消息表是一种简化版的方案,将数据库中的表来作为消息中间件。 本地消息表这种实现方式应该是业界使用最多的,其核心思想是将分布式事务拆分成本地事务进行处理,这种思路是来源于ebay。我们可以从下面的流程图中看出其中的一些细节:

基本思路就是:

消息生产方,需要额外建一个消息表,并记录消息发送状态。消息表和业务数据要在一个事务里提交,也就是说他们要在一个数据库里面。然后消息会经过MQ发送到消息的消费方。如果消息发送失败,会进行重试发送。

消息消费方,需要处理这个消息,并完成自己的业务逻辑。此时如果本地事务处理成功,表明已经处理成功了,如果处理失败,那么就会重试执行。如果是业务上面的失败,可以给生产方发送一个业务补偿消息,通知生产方进行回滚等操作。

生产方和消费方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

这种方案遵循BASE理论,采用的是最终一致性,笔者认为是这几种方案里面比较适合实际业务场景的,即不会出现像2PC那样复杂的实现(当调用链很长的时候,2PC的可用性是非常低的),也不会像TCC那样可能出现确认或者回滚不了的情况。

  • 优点: 一种非常经典的实现,避免了分布式事务,实现了最终一致性。在 .NET中 有现成的解决方案。
  • 缺点: 消息表会耦合到业务系统中,如果没有封装好的解决方案,会有很多杂活需要处理。

下面是实现步骤:

1、先创建本地消息表

代码语言:javascript复制
MESSAGE_STATUS={
    'active':'active',
    'fail':'fail',
    'success':'success'
}

class Message(models.Model):
    topic = models.CharField(max_length=50, blank=True)
    event_module = models.CharField(max_length=50, blank=True,null=True)
    event_fun= models.CharField(max_length=30, blank=True,null=True)
    params=models.TextField(null=True)
    remark=models.CharField(max_length=300, blank=True,null=True)
    status = models.CharField(max_length=20, blank=True)
    exec_count=models.SmallIntegerField(null=True)
    error_msg = models.TextField(null=True)
    created_at = models.DateTimeField(auto_now_add=True)
    updated_at = models.DateTimeField(auto_now=True)

    index_together = ('status','exec_count') #联合索引

    def __unicode__(self):
        return '%s' % self.remark

    def __str__(self):
        return '%s' % self.remark

2、定义生产者api 这里提前定义了mysql和rabbix两种消息存储方式

代码语言:javascript复制
from models import Message
from serializers import MessageSerializer

def event_add(message):
    MysqlQueue().add(message)

class MessageQueue():
    def __init__(self):
        pass

    def add(self,message):
        pass


class MysqlQueue(MessageQueue):

    def add(self,message):
        message["status"]="active"
        message["exec_count"]=0
        serializer=MessageSerializer(data=message)
        serializer.is_valid(raise_exception=True)
        serializer.save()

class RabbitQueue(MessageQueue):

    def add(self,message):
        pass

3、定义消费者api,这里使用定时任务框架celery

代码语言:javascript复制
import json
from models import Message
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)


class MessageConsumer:
    def receive(self, topic=None):
        queryset = Message.objects.filter(exec_count__lt=5).exclude(status='success')
        if topic:
            queryset = queryset.filter(topic=topic)
        messages = queryset.order_by('id').all()
        for message in messages:
            try:
                m = __import__(message.event_module, fromlist=True)
                if hasattr(m, message.event_fun):
                    target_func = getattr(m, message.event_fun)
                    logger.info(message.params)
                    target_func(json.loads(message.params))
                    message.status='success'
                    message.exec_count=message.exec_count   1
                    message.save()
                else:
                    logger.error("can not find function "   message.event_fun)
                    message.status='fail'
                    message.exec_count=message.exec_count   1
                    message.error_msg="can not find function"   message.event_fun
                    message.save()
            except Exception ,e:
                logger.error("exec message fail,id:"   str(message.id) ",cause by " e.message)
                logger.exception(e)
                message.status='fail'
                message.exec_count=message.exec_count   1
                message.error_msg=e.message
                message.save()

4、定义定时任务,这里如果已经有一个定时任务在跑,则直接跳过

代码语言:javascript复制
exec_flag=False
@shared_task(ignore_result=True)
def reveive_event_message():
    global exec_flag
    if exec_flag:
        logger.warning("exists a tast exec reveive_event_message")
        return
    exec_flag=True
    logger.info("reveive_event_message start")
    MessageConsumer().receive()
    logger.info("reveive_event_message end")
    exec_flag=False

5、下面定义业务调用

代码语言:javascript复制
def add_message(event_fun,params,remark):
    event_message = dict()
    event_message["topic"] = "topci"
    event_message["event_module"] = "callback path"
    event_message["event_fun"] =event_fun
    event_message["params"] = json.dumps(params)
    event_message["remark"] =remark
    logger.debug(event_message)
    event_add(event_message)

def create(self, request, *args, **kwargs):
    '''
    policy add
    '''
    assets = request.data["data"]
    try:
        with transaction.atomic():
            #save policy
            for  ;;
               #发送消息
               add_message("async_update_zabbix_trigger",params,"update trigger ")
    except rest_framework_serializers.ValidationError, e:
        logger.exception(e)
        raise

6、定义回调方法,这里由于是使用python可以直接传方法名,就可以进行回调 比如说创建定时器

代码语言:javascript复制
def async_create_zabbix_trigger(params):
    client = ZabbixClientProxy()
    host_id = get_zabbix_host_by_uuid(uuid)
    zabbix_items = get_zabbix_items(host_id)
    if zabbix_items is None or len(zabbix_items) == 0:
        return
    condition = alert_models.ConditionItem.objects.get(id=params["condition_id"])
    condition.alert_duration = params["alert_duration"]
    condition.item_threshold = params["item_threshold"]
    triggers = create_zabbix_trigger(client, asset, zabbix_items, condition, uuid)
    serializer = policy_serializers.ConditionTriggerSerializer(data=triggers, many=True)
    serializer.is_valid(raise_exception=True)
    serializer.save()

这里可以配置一个最大重试次数,如果超过就不会进行重试,这时就会发送邮件通知管理员进行手工重试,来达到最终一致性

0 人点赞