天天看点

Openstack Masakari task流程源码分析虚拟机实例故障处理流程节点故障处理流程

masakari来源于日语板斧,Openstack的一个实现VM HA的开源项目。目前masakari支持下面3种故障恢复:

  1. provisioning process down
  2. VM process down
  3. nova-compute host failure

    本文将对以上三种故障处理流程的代码做详细分析。

masakari架构图:

Openstack Masakari task流程源码分析虚拟机实例故障处理流程节点故障处理流程

进程故障处理流程

所谓的进程就是计算节点的关键进程和服务,如

libvirt

,

nova-compute

,

masakari-instancemonitor

,

masakari-hostmonitor

,

sshd

, 一旦检测到进程异常,执行下面的流程。

该流程分为2步

第一步,disable计算节点的nova服务,让新创建的vm不会被调度到该节点。

第二步,监测第一步的disable动作是否被有效执行

masakari\engine\drivers\taskflow\process_failure.py

class DisableComputeNodeTask(base.MasakariTask):
    def __init__(self, context, novaclient, **kwargs):
        kwargs['requires'] = ["process_name", "host_name"]
        super(DisableComputeNodeTask, self).__init__(context,
                                                     novaclient,
                                                     **kwargs)

    def execute(self, process_name, host_name):
        msg = "Disabling compute service on host: '%s'" % host_name
        self.update_details(msg)

        if not self.novaclient.is_service_down(self.context, host_name,
                                               process_name):
            # disable compute node on given host
            self.novaclient.enable_disable_service(self.context, host_name)
            msg = "Disabled compute service on host: '%s'" % host_name
            self.update_details(msg, 1.0)
        else:
            msg = ("Skipping recovery for process %(process_name)s as it is "
                   "already disabled") % {'process_name': process_name}
            LOG.info(msg)
            self.update_details(msg, 1.0)           

复制

给出

Disabling compute service on host <hostname>

消息后,判断host的计算服务是否down了,若没有执行

enable_disable_service

方法,执行完成该

diasble

动作后,给出

Disabled compute service on host <hostname>

消息;若已经down了,只给出

Skipping recovery for process %(process_name)s as it is already disabled

上面的DisableComputeNodeTask流程用了2个调用nova的方法

is_service_down

enable_disable_service

masakari\compute\nova.py

@translate_nova_exception
def is_service_down(self, context, host_name, binary):
    """Check whether service is up or down on given host."""
    nova = novaclient(context)
    service = nova.services.list(host=host_name, binary=binary)[0]
    return service.status == 'disabled'           

复制

方法

is_service_down

调用novaclient模块的

services.list

查询该

host_name

service

信息,返回

service.status

是否为

disabled

masakari\compute\nova.py

def enable_disable_service(self, context, host_name, enable=False,                           reason=None):
    """Enable or disable the service specified by nova service id."""
    nova = novaclient(context)
    service = nova.services.list(host=host_name, binary='nova-compute')[0]

    if not enable:
        LOG.info('Disable nova-compute on %s', host_name)
        if reason:
            nova.services.disable_log_reason(service.id, reason)
        else:
            nova.services.disable(service.id)
    else:
        LOG.info('Enable nova-compute on %s', host_name)
        nova.services.enable(service.id)           

复制

方法

enable_disable_service

调用novaclient模块的

services.list

查询该

host_name

service

信息,若

service.status

enable

则调用novaclient模块的

services.disable

disable服务(若有disable的原因,则加上reason参数);若是

disable

则调用novaclient模块的

services.enable

enable服务。

masakari\engine\drivers\taskflow\process_failure.py

class ConfirmComputeNodeDisabledTask(base.MasakariTask):
    def __init__(self, context, novaclient, **kwargs):
        kwargs['requires'] = ["process_name", "host_name"]
        super(ConfirmComputeNodeDisabledTask, self).__init__(context,
                                                             novaclient,
                                                             **kwargs)

    def execute(self, process_name, host_name):
        def _wait_for_disable():
            service_disabled = self.novaclient.is_service_down(
                self.context, host_name, process_name)
            if service_disabled:
                raise loopingcall.LoopingCallDone()

        periodic_call = loopingcall.FixedIntervalLoopingCall(
            _wait_for_disable)
        try:
            msg = "Confirming compute service is disabled on host: '%s'" % (
                host_name)
            self.update_details(msg)

            # add a timeout to the periodic call.
            periodic_call.start(interval=CONF.verify_interval)
            etimeout.with_timeout(
                CONF.wait_period_after_service_update,
                periodic_call.wait)

            msg = "Confirmed compute service is disabled on host: '%s'" % (
                host_name)
            self.update_details(msg, 1.0)
        except etimeout.Timeout:
            msg = "Failed to disable service %(process_name)s" % {
                'process_name': process_name
            }
            self.update_details(msg, 1.0)
            raise exception.ProcessRecoveryFailureException(
                message=msg)
        finally:
            # stop the periodic call, in case of exceptions or Timeout.
            periodic_call.stop()           

复制

这段代码是一段监控动作的固定代码,做的事情就是调用Openstack的公共模块

oslo_service

,每隔一段时间去监测之前的

DisableComputeNodeTask

动作是否完成,是否超时。下面的几个其他流程的监控代码类似。

masakari/setup.cfg

文件对task名称的定义:

masakari.task_flow.tasks =
    disable_compute_service_task = masakari.engine.drivers.taskflow.host_failure:DisableComputeServiceTask
    confirm_compute_node_disabled_task = masakari.engine.drivers.taskflow.process_failure:ConfirmComputeNodeDisabledTask
           

复制

配置文件对流程的定义:

cfg.Opt('process_failure_recovery_tasks',
            type=types.Dict(
                bounds=False,
                value_type=types.List(bounds=True,
                                      item_type=types.String(quotes=True))),
            default={'pre': ['disable_compute_node_task'],
                     'main': ['confirm_compute_node_disabled_task'],
                     'post': []},
            help=("""
This option allows operator to customize tasks to be executed for process
failure recovery workflow.

Provide list of strings reflecting to the task classes that should be included
to the process failure recovery workflow. The full classname path of all task
classes should be defined in the 'masakari.task_flow.tasks' of setup.cfg and
these classes may be implemented by OpenStack Masaskari project team, deployer
or third party.

By default below two tasks will be part of this config option:-
1. disable_compute_node_task
2. confirm_compute_node_disabled_task

The allowed values for this option is comma separated dictionary of object
names in between ``{`` and ``}``."""))
]
           

复制

虚拟机实例故障处理流程

masakari\engine\drivers\taskflow\instance_failure.py

class StopInstanceTask(base.MasakariTask):
    def __init__(self, context, novaclient, **kwargs):
        kwargs['requires'] = ["instance_uuid"]
        super(StopInstanceTask, self).__init__(context,
                                               novaclient,
                                               **kwargs)

    def execute(self, instance_uuid):
        """Stop the instance for recovery."""
        instance = self.novaclient.get_server(self.context, instance_uuid)

        # If an instance is not HA_Enabled and "process_all_instances" config
        # option is also disabled, then there is no need to take any recovery
        # action.
        if not CONF.instance_failure.process_all_instances and not (
                strutils.bool_from_string(
                    instance.metadata.get('HA_Enabled', False))):
            msg = ("Skipping recovery for instance: %(instance_uuid)s as it is"
                   " not Ha_Enabled") % {'instance_uuid': instance_uuid}
            LOG.info(msg)
            self.update_details(msg, 1.0)
            raise exception.SkipInstanceRecoveryException()

        vm_state = getattr(instance, 'OS-EXT-STS:vm_state')
        if vm_state in ['paused', 'rescued']:
            msg = ("Recovery of instance '%(instance_uuid)s' is ignored as it "
                   "is in '%(vm_state)s' state.") % {
                'instance_uuid': instance_uuid, 'vm_state': vm_state
            }
            LOG.warning(msg)
            self.update_details(msg, 1.0)
            raise exception.IgnoreInstanceRecoveryException(msg)

        if vm_state != 'stopped':
            if vm_state == 'resized':
                self.novaclient.reset_instance_state(
                    self.context, instance.id, 'active')

            msg = "Stopping instance: %s" % instance_uuid
            self.update_details(msg)

            self.novaclient.stop_server(self.context, instance.id)

        def _wait_for_power_off():
            new_instance = self.novaclient.get_server(self.context,
                                                      instance_uuid)
            vm_state = getattr(new_instance, 'OS-EXT-STS:vm_state')
            if vm_state == 'stopped':
                raise loopingcall.LoopingCallDone()

        periodic_call = loopingcall.FixedIntervalLoopingCall(
            _wait_for_power_off)

        try:
            # add a timeout to the periodic call.
            periodic_call.start(interval=CONF.verify_interval)
            etimeout.with_timeout(CONF.wait_period_after_power_off,
                                  periodic_call.wait)
            msg = "Stopped instance: '%s'" % instance_uuid
            self.update_details(msg, 1.0)
        except etimeout.Timeout:
            msg = "Failed to stop instance %(instance)s" % {
                'instance': instance.id
            }
            self.update_details(msg, 1.0)
            raise exception.InstanceRecoveryFailureException(
                message=msg)
        finally:
            # stop the periodic call, in case of exceptions or Timeout.
            periodic_call.stop()           

复制

参数

process_all_instances

true

,则所有的vm实例都执行,若该参数为

false

,则是否执行实例要根据实例的

metadata key 'HA_Enabled=True' or not。

instance_failure_options = [
    cfg.BoolOpt('process_all_instances',
                default=False,
                help="""
Operators can decide whether all instances or only those instances which
contain metadata key 'HA_Enabled=True' should be taken into account to
recover from instance failure events. When set to True, it will execute
instance failure recovery actions for an instance irrespective of whether
that particular instance contains metadata key 'HA_Enabled=True' or not.
When set to False, it will only execute instance failure recovery actions
for an instance which contain metadata key 'HA_Enabled=True'."""),
]
           

复制

如果参数

process_all_instances

false

且vm实例的

metadata:HA_Enabled为false

not CONF.instance_failure.process_all_instances and not (strutils.bool_from_string(instance.metadata.get('HA_Enabled', False)))

,则跳过恢复该实例流程。

下面是vm几种状态的特殊处理,比如若是[‘paused’, ‘rescued’],若状态是!= ‘stopped’且== ‘resized’。

再下面就是调用

novaclient

模块的

novaclient.stop_server

去stop vm实例,再调用Openstack的公共模块

oslo_service

,每隔一段时间去监测之前的

_wait_for_power_off

动作是否完成,是否超时。

masakari\engine\drivers\taskflow\instance_failure.py

class StartInstanceTask(base.MasakariTask):
    def __init__(self, context, novaclient, **kwargs):
        kwargs['requires'] = ["instance_uuid"]
        super(StartInstanceTask, self).__init__(context,
                                                novaclient,
                                                **kwargs)

    def execute(self, instance_uuid):
        """Start the instance."""
        msg = "Starting instance: '%s'" % instance_uuid
        self.update_details(msg)

        instance = self.novaclient.get_server(self.context, instance_uuid)
        vm_state = getattr(instance, 'OS-EXT-STS:vm_state')
        if vm_state == 'stopped':
            self.novaclient.start_server(self.context, instance.id)
            msg = "Instance started: '%s'" % instance_uuid
            self.update_details(msg, 1.0)
        else:
            msg = ("Invalid state for Instance %(instance)s. Expected state: "
                   "'STOPPED', Actual state: '%(actual_state)s'") % {
                'instance': instance_uuid,
                'actual_state': vm_state
            }
            self.update_details(msg, 1.0)
            raise exception.InstanceRecoveryFailureException(
                message=msg)           

复制

若vm实例状态时

stopped

,则调用novaclient模块的

novaclient.start_server

方法启动vm实例,否则抛出

exception.InstanceRecoveryFailureException

masakari\engine\drivers\taskflow\instance_failure.py

class ConfirmInstanceActiveTask(base.MasakariTask):
    def __init__(self, context, novaclient, **kwargs):
        kwargs['requires'] = ["instance_uuid"]
        super(ConfirmInstanceActiveTask, self).__init__(context,
                                                        novaclient,
                                                        **kwargs)

    def execute(self, instance_uuid):
        def _wait_for_active():
            new_instance = self.novaclient.get_server(self.context,
                                                      instance_uuid)
            vm_state = getattr(new_instance, 'OS-EXT-STS:vm_state')
            if vm_state == 'active':
                raise loopingcall.LoopingCallDone()

        periodic_call = loopingcall.FixedIntervalLoopingCall(
            _wait_for_active)
        try:
            msg = "Confirming instance '%s' vm_state is ACTIVE" % instance_uuid
            self.update_details(msg)

            # add a timeout to the periodic call.
            periodic_call.start(interval=CONF.verify_interval)
            etimeout.with_timeout(CONF.wait_period_after_power_on,
                                  periodic_call.wait)

            msg = "Confirmed instance '%s' vm_state is ACTIVE" % instance_uuid
            self.update_details(msg, 1.0)
        except etimeout.Timeout:
            msg = "Failed to start instance %(instance)s" % {
                'instance': instance_uuid
            }
            self.update_details(msg, 1.0)
            raise exception.InstanceRecoveryFailureException(
                message=msg)
        finally:
            # stop the periodic call, in case of exceptions or Timeout.
            periodic_call.stop()           

复制

调用Openstack的公共模块

oslo_service

,每隔一段时间去监测之前的

_wait_for_active

动作是否完成,是否超时。

masakari/setup.cfg

文件对task名称的定义:

stop_instance_task = masakari.engine.drivers.taskflow.instance_failure:StopInstanceTask
start_instance_task = masakari.engine.drivers.taskflow.instance_failure:StartInstanceTask
confirm_instance_active_task = masakari.engine.drivers.taskflow.instance_failure:ConfirmInstanceActiveTask
           

复制

配置文件对流程的定义:

cfg.Opt('instance_failure_recovery_tasks',
            type=types.Dict(
                bounds=False,
                value_type=types.List(bounds=True,
                                      item_type=types.String(quotes=True))),
            default={'pre': ['stop_instance_task'],
                     'main': ['start_instance_task'],
                     'post': ['confirm_instance_active_task']},
            help=("""
This option allows operator to customize tasks to be executed for instance
failure recovery workflow.

Provide list of strings reflecting to the task classes that should be included
to the instance failure recovery workflow. The full classname path of all task
classes should be defined in the 'masakari.task_flow.tasks' of setup.cfg and
these classes may be implemented by OpenStack Masaskari project team, deployer
or third party.

By default below three tasks will be part of this config option:-
1. stop_instance_task
2. start_instance_task
3. confirm_instance_active_task

The allowed values for this option is comma separated dictionary of object
names in between ``{`` and ``}``.""")),
]
           

复制

节点故障处理流程

masakari\engine\drivers\taskflow\host_failure.py

class DisableComputeServiceTask(base.MasakariTask):
    def __init__(self, context, novaclient, **kwargs):
        kwargs['requires'] = ["host_name"]
        super(DisableComputeServiceTask, self).__init__(context, novaclient,
                                                        **kwargs)

    def execute(self, host_name):
        msg = "Disabling compute service on host: '%s'" % host_name
        self.update_details(msg)
        self.novaclient.enable_disable_service(self.context, host_name)
        # Sleep until nova-compute service is marked as disabled.
        log_msg = ("Sleeping %(wait)s sec before starting recovery "
               "thread until nova recognizes the node down.")
        LOG.info(log_msg, {'wait': CONF.wait_period_after_service_update})
        eventlet.sleep(CONF.wait_period_after_service_update)
        msg = "Disabled compute service on host: '%s'" % host_name
        self.update_details(msg, 1.0)           

复制

DisableComputeServiceTask

实现

disable

计算节点的计算服务

masakari\engine\drivers\taskflow\host_failure.py

class PrepareHAEnabledInstancesTask(base.MasakariTask):
    """Get all HA_Enabled instances."""
    default_provides = set(["instance_list"])

    def __init__(self, context, novaclient, **kwargs):
        kwargs['requires'] = ["host_name"]
        super(PrepareHAEnabledInstancesTask, self).__init__(context,
                                                            novaclient,
                                                            **kwargs)

    def execute(self, host_name):
        def _filter_instances(instance_list):
            ha_enabled_instances = []
            non_ha_enabled_instances = []

            for instance in instance_list:
                is_instance_ha_enabled = strutils.bool_from_string(
                    instance.metadata.get('HA_Enabled', False))
                if CONF.host_failure.ignore_instances_in_error_state and (
                        getattr(instance, "OS-EXT-STS:vm_state") == "error"):
                    if is_instance_ha_enabled:
                        msg = ("Ignoring recovery of HA_Enabled instance "
                               "'%(instance_id)s' as it is in 'error' state."
                               ) % {'instance_id': instance.id}
                        LOG.info(msg)
                        self.update_details(msg, 0.4)
                    continue

                if is_instance_ha_enabled:
                    ha_enabled_instances.append(instance)
                else:
                    non_ha_enabled_instances.append(instance)

            msg = "Total HA Enabled instances count: '%d'" % len(
                ha_enabled_instances)
            self.update_details(msg, 0.6)

            if CONF.host_failure.evacuate_all_instances:
                msg = ("Total Non-HA Enabled instances count: '%d'" % len(
                    non_ha_enabled_instances))
                self.update_details(msg, 0.7)

                ha_enabled_instances.extend(non_ha_enabled_instances)

                msg = ("All instances (HA Enabled/Non-HA Enabled) should be "
                       "considered for evacuation. Total count is: '%d'") % (
                    len(ha_enabled_instances))
                self.update_details(msg, 0.8)

            return ha_enabled_instances

        msg = "Preparing instances for evacuation"
        self.update_details(msg)

        instance_list = self.novaclient.get_servers(self.context, host_name)

        msg = ("Total instances running on failed host '%(host_name)s' is "
               "%(instance_list)d") % {'host_name': host_name,
                                       'instance_list': len(instance_list)}
        self.update_details(msg, 0.3)

        instance_list = _filter_instances(instance_list)

        if not instance_list:
            msg = ("Skipped host '%s' recovery as no instances needs to be "
                   "evacuated" % host_name)
            self.update_details(msg, 1.0)
            LOG.info(msg)
            raise exception.SkipHostRecoveryException(message=msg)

        # List of instance UUID
        instance_list = [instance.id for instance in instance_list]

        msg = "Instances to be evacuated are: '%s'" % ','.join(instance_list)
        self.update_details(msg, 1.0)

        return {
            "instance_list": instance_list,
        }           

复制

PrepareHAEnabledInstancesTask

是疏散前的准备,准备哪些实例是需要疏散的。

具体实现:调用

novaclient

novaclient.get_servers

方法获得故障节点

hostname

上的所有的vm实例,存放在

instance_list

列表,调用过滤方法

_filter_instances

过滤后的vm实例,存放在

instance_list

列表,获取

instance_list

列表中的所有的

instance

id

再存放于

instance_list

列表。

_filter_instances

方法对host上所有的实例做了过滤,返回需要疏散的实例。

具体实现:

(1)先将所有实例分为三类,一类

CONF.host_failure.ignore_instances_in_error_state

设定为

true

且为

error

状态的实例,这一类直接跳过;第二类metadata包含

HA_Enabled

的实例,保存在数组

ha_enabled_instances

中;第三类是metadata不包含HA_Enabled的实例,保存在数组

non_ha_enabled_instances

中。

(2)若

CONF.host_failure.evacuate_all_instances

设定为

true

,则返回保存在数组

ha_enabled_instances

non_ha_enabled_instances

合并后的数组,若

CONF.host_failure.evacuate_all_instances

设定为

false

,则只返回

ha_enabled_instances

数组。

masakari\engine\drivers\taskflow\host_failure.py

class EvacuateInstancesTask(base.MasakariTask):

    def __init__(self, context, novaclient, **kwargs):
        kwargs['requires'] = ["host_name", "instance_list"]
        self.update_host_method = kwargs['update_host_method']
        super(EvacuateInstancesTask, self).__init__(context, novaclient,
                                                    **kwargs)

    def _get_state_and_host_of_instance(self, context, instance):
        new_instance = self.novaclient.get_server(context, instance.id)
        instance_host = getattr(new_instance,
                                "OS-EXT-SRV-ATTR:hypervisor_hostname")
        old_vm_state = getattr(instance, "OS-EXT-STS:vm_state")
        new_vm_state = getattr(new_instance, "OS-EXT-STS:vm_state")

        return (old_vm_state, new_vm_state, instance_host)

    def _stop_after_evacuation(self, context, instance):
        def _wait_for_stop_confirmation():
            old_vm_state, new_vm_state, instance_host = (
                self._get_state_and_host_of_instance(context, instance))

            if new_vm_state == 'stopped':
                raise loopingcall.LoopingCallDone()

        periodic_call_stopped = loopingcall.FixedIntervalLoopingCall(
            _wait_for_stop_confirmation)

        try:
            self.novaclient.stop_server(context, instance.id)
            # confirm instance is stopped after recovery
            periodic_call_stopped.start(interval=CONF.verify_interval)
            etimeout.with_timeout(
                CONF.wait_period_after_power_off,
                periodic_call_stopped.wait)
        except etimeout.Timeout:
            with excutils.save_and_reraise_exception():
                periodic_call_stopped.stop()
                msg = ("Instance '%(uuid)s' is successfully evacuated but "
                       "failed to stop.") % {'uuid': instance.id}
                LOG.warning(msg)
        else:
            periodic_call_stopped.stop()

    def _evacuate_and_confirm(self, context, instance, host_name,                              failed_evacuation_instances,                              reserved_host=None):
        # Before locking the instance check whether it is already locked
        # by user, if yes don't lock the instance
        instance_already_locked = self.novaclient.get_server(
            context, instance.id).locked

        if not instance_already_locked:
            # lock the instance so that until evacuation and confirmation
            # is not complete, user won't be able to perform any actions
            # on the instance.
            self.novaclient.lock_server(context, instance.id)

        def _wait_for_evacuation_confirmation():
            old_vm_state, new_vm_state, instance_host = (
                self._get_state_and_host_of_instance(context, instance))
            if instance_host != host_name:
                if ((old_vm_state == 'error' and
                    new_vm_state == 'active') or
                        old_vm_state == new_vm_state):
                    raise loopingcall.LoopingCallDone()

        def _wait_for_evacuation():
            periodic_call = loopingcall.FixedIntervalLoopingCall(
                _wait_for_evacuation_confirmation)

            try:
                # add a timeout to the periodic call.
                periodic_call.start(interval=CONF.verify_interval)
                etimeout.with_timeout(
                    CONF.wait_period_after_evacuation,
                    periodic_call.wait)
            except etimeout.Timeout:
                # Instance is not evacuated in the expected time_limit.
                failed_evacuation_instances.append(instance.id)
            else:
                # stop the periodic call, in case of exceptions or
                # Timeout.
                periodic_call.stop()

        try:
            vm_state = getattr(instance, "OS-EXT-STS:vm_state")
            task_state = getattr(instance, "OS-EXT-STS:task_state")

            # Nova evacuates an instance only when vm_state is in active,
            # stopped or error state. If an instance is in other than active,
            # error and stopped vm_state, masakari resets the instance state
            # to *error* so that the instance can be evacuated.
            stop_instance = True
            if vm_state not in ['active', 'error', 'stopped']:
                self.novaclient.reset_instance_state(context, instance.id)
                instance = self.novaclient.get_server(context, instance.id)
                power_state = getattr(instance, "OS-EXT-STS:power_state")
                if vm_state == 'resized' and power_state != SHUTDOWN:
                    stop_instance = False

            elif task_state is not None:
                # Nova fails evacuation when the instance's task_state is not
                # none. In this case, masakari resets the instance's vm_state
                # to 'error' and task_state to none.
                self.novaclient.reset_instance_state(context, instance.id)
                instance = self.novaclient.get_server(context, instance.id)
                if vm_state == 'active':
                    stop_instance = False

            # evacuate the instance
            self.novaclient.evacuate_instance(context, instance.id,
                                              target=reserved_host)

            _wait_for_evacuation()

            if vm_state != 'active':
                if stop_instance:
                    self._stop_after_evacuation(self.context, instance)
                    # If the instance was in 'error' state before failure
                    # it should be set to 'error' after recovery.
                    if vm_state == 'error':
                        self.novaclient.reset_instance_state(
                            context, instance.id)
        except etimeout.Timeout:
            # Instance is not stop in the expected time_limit.
            failed_evacuation_instances.append(instance.id)
        except Exception:
            # Exception is raised while resetting instance state or
            # evacuating the instance itself.
            failed_evacuation_instances.append(instance.id)
        finally:
            if not instance_already_locked:
                # Unlock the server after evacuation and confirmation
                self.novaclient.unlock_server(context, instance.id)

    def execute(self, host_name, instance_list, reserved_host=None):
        msg = ("Start evacuation of instances from failed host '%(host_name)s'"
               ", instance uuids are: '%(instance_list)s'") % {
            'host_name': host_name, 'instance_list': ','.join(instance_list)}
        self.update_details(msg)

        def _do_evacuate(context, host_name, instance_list,                         reserved_host=None):
            failed_evacuation_instances = []

            if reserved_host:
                msg = "Enabling reserved host: '%s'" % reserved_host
                self.update_details(msg, 0.1)
                if CONF.host_failure.add_reserved_host_to_aggregate:
                    # Assign reserved_host to an aggregate to which the failed
                    # compute host belongs to.
                    aggregates = self.novaclient.get_aggregate_list(context)
                    for aggregate in aggregates:
                        if host_name in aggregate.hosts:
                            try:
                                msg = ("Add host %(reserved_host)s to "
                                       "aggregate %(aggregate)s") % {
                                    'reserved_host': reserved_host,
                                    'aggregate': aggregate.name}
                                self.update_details(msg, 0.2)

                                self.novaclient.add_host_to_aggregate(
                                    context, reserved_host, aggregate)
                                msg = ("Added host %(reserved_host)s to "
                                       "aggregate %(aggregate)s") % {
                                    'reserved_host': reserved_host,
                                    'aggregate': aggregate.name}
                                self.update_details(msg, 0.3)
                            except exception.Conflict:
                                msg = ("Host '%(reserved_host)s' already has "
                                       "been added to aggregate "
                                       "'%(aggregate)s'.") % {
                                    'reserved_host': reserved_host,
                                    'aggregate': aggregate.name}
                                self.update_details(msg, 1.0)
                                LOG.info(msg)

                            # A failed compute host can be associated with
                            # multiple aggregates but operators will not
                            # associate it with multiple aggregates in real
                            # deployment so adding reserved_host to the very
                            # first aggregate from the list.
                            break

                self.novaclient.enable_disable_service(
                    context, reserved_host, enable=True)

                # Set reserved property of reserved_host to False
                self.update_host_method(self.context, reserved_host)

            thread_pool = greenpool.GreenPool(
                CONF.host_failure_recovery_threads)

            for instance_id in instance_list:
                msg = "Evacuation of instance started: '%s'" % instance_id
                self.update_details(msg, 0.5)
                instance = self.novaclient.get_server(self.context,
                                                      instance_id)
                thread_pool.spawn_n(self._evacuate_and_confirm, context,
                                    instance, host_name,
                                    failed_evacuation_instances,
                                    reserved_host)

            thread_pool.waitall()

            evacuated_instances = list(set(instance_list).difference(set(
                failed_evacuation_instances)))

            if evacuated_instances:
                evacuated_instances.sort()
                msg = ("Successfully evacuate instances '%(instance_list)s' "
                       "from host '%(host_name)s'") % {
                    'instance_list': ','.join(evacuated_instances),
                    'host_name': host_name}
                self.update_details(msg, 0.7)

            if failed_evacuation_instances:
                msg = ("Failed to evacuate instances "
                       "'%(failed_evacuation_instances)s' from host "
                       "'%(host_name)s'") % {
                    'failed_evacuation_instances':
                        ','.join(failed_evacuation_instances),
                    'host_name': host_name}
                self.update_details(msg, 0.7)
                raise exception.HostRecoveryFailureException(
                    message=msg)

            msg = "Evacuation process completed!"
            self.update_details(msg, 1.0)

        lock_name = reserved_host if reserved_host else None

        @utils.synchronized(lock_name)
        def do_evacuate_with_reserved_host(context, host_name, instance_list,                                           reserved_host):
            _do_evacuate(self.context, host_name, instance_list,
                         reserved_host=reserved_host)

        if lock_name:
            do_evacuate_with_reserved_host(self.context, host_name,
                                           instance_list, reserved_host)
        else:
            # No need to acquire lock on reserved_host when recovery_method is
            # 'auto' as the selection of compute host will be decided by nova.
            _do_evacuate(self.context, host_name, instance_list)           

复制

EvacuateInstancesTask

实现对故障节点上的需要疏散的实例做疏散操作。其中分2种情况,是否指定

reserved_host

,若指定则疏散到该

reserved_host

,若无,由

nova-scheduler

选择疏散的目的节点。

具体实现:

(1)若

CONF.host_failure.add_reserved_host_to_aggregate

为true,则将

reserved_host

添加到故障节点所属的主机组。

(2)调用

novaclient

模块的

novaclient.enable_disable_service(context, reserved_host, enable=True)

方法将

reserved_host

的计算服务从disable状态修改为enable状态

(3)调用

update_host_method

方法将

reserved_host

的reserved属性设定为false

(4)根据配置文件

CONF.host_failure_recovery_threads

设定的线程数启动对应的线程数,来疏散

instance_list

数组里的实例

(5)等待所有疏散线程结束

(6)给出所有疏散的实例和所有未疏散的实例

其中第(4)步,多线程疏散方法

_evacuate_and_confirm

具体实现:

(a)调用

novaclient

模块获取vm实例是否为locked状态

novaclient.get_server(context, instance.id).locked

(b)若不是

locked

状态则调用

novaclient.lock_server(context, instance.id)

方法将vm实例锁定,目的是确保对实例疏散过程中,用户不能对该实例有任何操作。

(c)因为疏散支持对

error

active

stopped

三种状态的vm做疏散,若不是这三种状态将实例reset成这三种状态。

(d)执行对vm实例的疏散

(e)等待实例疏散的完成

masakari/setup.cfg

文件对task名称的定义:

masakari.task_flow.tasks =
    disable_compute_service_task = masakari.engine.drivers.taskflow.host_failure:DisableComputeServiceTask
    prepare_HA_enabled_instances_task = masakari.engine.drivers.taskflow.host_failure:PrepareHAEnabledInstancesTask
    evacuate_instances_task = masakari.engine.drivers.taskflow.host_failure:EvacuateInstancesTask
           

复制

配置文件对流程的定义:

cfg.Opt('host_auto_failure_recovery_tasks',
            type=types.Dict(
                bounds=False,
                value_type=types.List(bounds=True,
                                      item_type=types.String(quotes=True))),
            default={'pre': ['disable_compute_service_task'],
                     'main': ['prepare_HA_enabled_instances_task'],
                     'post': ['evacuate_instances_task']},
            help=("""
This option allows operator to customize tasks to be executed for host failure
auto recovery workflow.

Provide list of strings reflecting to the task classes that should be included
to the host failure recovery workflow. The full classname path of all task
classes should be defined in the 'masakari.task_flow.tasks' of setup.cfg and
these classes may be implemented by OpenStack Masaskari project team, deployer
or third party.

By default below three tasks will be part of this config option:-
1. disable_compute_service_task
2. prepare_HA_enabled_instances_task
3. evacuate_instances_task

The allowed values for this option is comma separated dictionary of object
names in between ``{`` and ``}``.""")),
]
           

复制

THE

END

订阅「后端云」公众号

长按二维码 ▼