天天看点

STS Python_SDK授权临时用户读写OSS资源

名词解释

RAM (Resource Access Management)

STS(Security Token Service)

是阿里云提供的

权限管理系统

RAM主要的作用是控制账号系统的权限。您可以使用RAM在主账号的权限范围内创建子用户,给不同的子用户分配不同的权限从而达到授权管理的目的。STS是一个安全凭证(Token)的管理系统。您可以使用STS来完成对于临时用户的访问授权。

  • 子账号(RAM account) :从阿里云的主账号中创建出来的子账号,在创建的时候可以分配独立的密码和权限,每个子账号拥有自己AccessKey,可以和阿里云主账号一样正常的完成有权限的操作。一般来说,这里的子账号可以理解为具有某种权限的用户,可以被认为是一个具有某些权限的操作发起者。
  • 角色(Role)

    :表示某种操作权限的虚拟概念,但是没有独立的登录密码和AccessKey。

    说明 子账号可以扮演角色,扮演角色时候的权限是该角色自身的权限。

  • 授权策略(Policy) :用来定义权限的规则,比如允许用户读取或写入某些资源。
  • 资源(Resource):代表用户可访问的云资源,比如OSS所有的Bucket、OSS的某个Bucket,或OSS的某个Bucket下面的某个Object等。
  • 扮演角色(Assume role)扮演角色是实体用户获取角色身份的安全令牌的 方法。一个实体用户调用STS API AssumeRole可以获得角色的安全令牌,使用安全令牌可以访问云服务API。

这里将手动定义 授权策略(Policy),将 授权策略 授权给 角色 ,然后子账号(RAM account)通过 扮演角色_方法 _获取 角色 的 安全令牌 对 资源 进行操作.

RAM 用户 可以使用 API 扮演 RAM 角色。当 RAM 用户被授予

 AliyunSTSAssumeRoleAccess

 权限策略 之后,可以使用其访问密钥调用 STS API 

AssumeRole

 接口,以获取某个角色的 安全令牌,从而使用安全令牌访问资源。

新建用户,角色,授权策略

新建RAM用户

  1. 登录  RAM 控制台
  2. 在左侧导航栏的人员管理菜单下,单击用户。
  3. 单击新建用户,输入登录名称和显示名称。

    说明 单击添加用户,可一次性创建多个 RAM 用户。

  4. 在访问方式区域下,选择控制台密码登录或编程访问。
    • 控制台密码登录:可以完成对登录安全的基本设置,包括自动生成或自定义登录密码、是否要求下次登录时重置密码以及是否要求开启多因素认证。
    • 编程访问:将会自动为 RAM 用户创建访问密钥(AccessKey)。RAM 用户可以通过 API 或其他开发工具访问阿里云。
  5. 说明 为了保障账号安全,建议仅为 RAM 用户选择一种登录方式。避免 RAM 用户离开组织后仍可以通过访问密钥访问阿里云资源。
  6. 单击确认。

授权RAM用户AssumeRole接口权限

  1. 在用户列表中找到刚才创建的用户,单击列表【操作】栏下的【添加权限】
  2. 在弹出对话框中的【系统权限策略】搜索并添加 _AliyunSTSAssumeRoleAccess_

新建自定义权限策略test_policy

  1. 在左侧导航栏的权限管理菜单下,单击权限策略管理。
  2. 单击新建权限策略。
  3. 填写策略名称和备注。
  4. 配置模式选择可视化配置或脚本配置。
    • 若选择可视化配置:单击添加授权语句,根据界面提示,对权限效力、操作名称和资源等进行配置。
    • 若选择脚本配置,请参考 语法结构 编辑策略内容。

下面为OSS所有权限语法示例

{
    "Statement": [
        {
            "Action": "oss:*",
            "Effect": "Allow",
            "Resource": "*"
        }
    ],
    "Version": "1"
}           

新建角色testRole

注:这里角色有三种类型,创建可信实体为阿里云账号的RAM角色
  1. 云账号登录 RAM控制台
  2. 在左侧导航栏,单击RAM角色管理。
  3. 单击新建RAM角色。
  4. 选择可信实体类型为阿里云账号,单击下一步。
  5. 输入角色名称和备注。
  6. 选择云账号后,单击完成。

将自定义权限策略授权给角色

  1. 在左侧导航栏的权限管理菜单下,单击授权。
  2. 单击新增授权。
  3. 在被授权主体区域下,输入 RAM 角色名称后,单击需要授权的 RAM 角色。
  4. 在左侧权限策略名称列表下,单击需要授予 RAM 角色的权限策略。

    说明 在右侧区域框,选择某条策略并单击 ×,可撤销该策略。

  5. 单击确定。

完成授权后可在角色管理中查看授权详情

STS Python_SDK授权临时用户读写OSS资源
注:这里策略主体类型有两种,权限策略也可直接授权给用户    
STS Python_SDK授权临时用户读写OSS资源

通过STS完成临时授权

STS基本概念

阿里云临时安全令牌

(Security Token Service,STS)

是阿里云提供的一种临时访问权限管理服务。

  • RAM角色(RAM role)一种虚拟的RAM用户。RAM角色的全局资源描述符(Role ARN)Role ARN是角色的全局资源描述符(Aliyun Resource Name,简称ARN),用来指定具体角色。每个角色都有一个唯一的全局资源描述符。格式:

    acs:ram::$accountID:role/$roleName

  • 可信实体(Trusted entity)角色的可信实体是指可以扮演角色的实体用户身份。创建角色时必须指定可信实体,角色只能被受信的主体扮演。可信实体可以是受信的阿里云账号、受信的阿里云服务或身份提供商。
  • 扮演角色(Assume role)扮演角色是实体用户获取角色身份的安全令牌的方法。一个实体用户调用STS API AssumeRole可以获得角色的安全令牌,使用安全令牌可以访问云服务API。

注:这里需要区分下RAM角色与扮演角色的区别,可信实体 通过 扮演角色 _方法__  _获取 RAM角色 的安全令牌

扮演角色的API接口概览

STS提供API调用接口每个请求都需要指定如下信息:

  • 要执行的操作:Action参数。
  • 每个操作接口都需要包含的公共请求参数。
  • 操作接口所特有的请求参数。

调用AssumeRole接口获取一个临时身份。参考

API
  • RoleArn表示的是需要扮演的角色ID,角色的ID可以在角色管理 > 角色详情中找到。
  • RoleSessionName是一个用来标示临时凭证的名称,一般来说建议使用不同的应用程序用户来区分。
  • Policy表示的是在扮演角色的时候额外加上的一个权限限制。此参数可以限制生成的STS token的权限,若不指定则返回的token拥有指定角色的所有权限。
  • DurationSeconds指的是临时凭证的有效期,单位是s,最小为900,最大为3600。
  • id和secret表示的是需要扮演角色的子账号的AccessKey。

AssumeRole接口请求示例

https://sts.aliyuncs.com?Action=AssumeRole
&RoleArn=acs:ram::123456789012****:role/adminrole
&RoleSessionName=alice
&DurationSeconds=3600
&Policy=<url_encoded_policy>
&<公共请求参数>           

AssumeRole接口返回格式(json)

{
    "Credentials": {
        "AccessKeyId": "STS.L4aBSCSJVMuKg5U1****",
        "AccessKeySecret": "wyLTSmsyPGP1ohvvw8xYgB29dlGI8KMiH2pK****",
        "Expiration": "2015-04-09T11:52:19Z",
        "SecurityToken": "********"
    },
    "AssumedRoleUser": {
        "arn": "acs:sts::123456789012****:assumed-role/AdminRole/alice",
        "AssumedRoleUserId":"34458433936495****:alice"
        },
    "RequestId": "6894B13B-6D71-4EF5-88FA-F32781734A7F"
}           

通过STS_SDK完成角色扮演

这里将通过官方提供的SDK模块进行具体的角色扮演

 安装相关SDK包

pip install oss2

pip install aliyun-python-sdk-core

pip install aliyun-python-sdk-sts

sts

sdk

 github 地址

 获取临时身份信息

# 在控制台将 AliyunSTSAssumeRoleAccess 权限授权给子用户testRole,testRole操作AssumeRole接口,获取临时身份
def fetch_sts_info(access_key_id, access_key_secret, sts_role_arn):
    """子用户角色扮演获取临时身份的信息
    :param access_key_id: 子用户的 access key id
    :param access_key_secret: 子用户的 access key secret
    :param sts_role_arn: STS角色的Arn
    :return StsInfo 返回临时身份信息对象
    """
    # 配置要访问的STS endpoint
    REGIONID = 'cn-hongkong'
    ENDPOINT = 'sts.cn-hongkong.aliyuncs.com'
    region_provider.add_endpoint('Sts', REGIONID, ENDPOINT)

    clt = client.AcsClient(access_key_id, access_key_secret, 'cn-hongkong')
    request = AssumeRoleRequest.AssumeRoleRequest()

    #request.set_accept_format('json')
    #指定角色ARN
    request.set_RoleArn(sts_role_arn)
    #设置会话名称,审计服务使用此名称区分调用者
    request.set_RoleSessionName('oss-python-sdk-example')
    #发起请求,并得到response
    response = clt.do_action_with_exception(request)
    #格式化输出返回结果,将字符串结果转化为字典类型
    i = json.loads(oss2.to_unicode(response))
    #实例化StsInfo类,并将通过sts获取的临时身份信息存入
    global StsInfo
    StsInfo = StsInfo()
    StsInfo.access_key_id = i['Credentials']['AccessKeyId']
    StsInfo.access_key_secret = i['Credentials']['AccessKeySecret']
    StsInfo.security_token = i['Credentials']['SecurityToken']
    StsInfo.request_id = i['RequestId']
    StsInfo.expiration = oss2.utils.to_unixtime(i['Credentials']['Expiration'], '%Y-%m-%dT%H:%M:%SZ')

    #返回StsInfo对象
    return StsInfo
               

将临时身份信息存入json文件

根据需求,可将临时身份信息存储到json文件中,等到临时身份过期后再重新请求,避免重复请求,造成临时身份泛滥,引发资源匮乏与安全隐患

def save_info():
    #存储临时身份信息
    with open('StsInfo.json','w',encoding='utf-8') as f:
        data = {'sts_key_id':StsInfo.access_key_id,'sts_key_secret':StsInfo.access_key_secret,'sts_secrity_token':StsInfo.security_token,'sts_expire_date':StsInfo.expiration,'sts_reques_id':StsInfo.request_id}
        json.dump(data,f,ensure_ascii=False)
        f.close
        
def open_info():
    #读取临时身份信息
    with open('./StsInfo.json','r',encoding='utf-8') as f:
        global STSINFO
        STSINFO = json.load(f)
        return STSINFO           

通过临时身份操作Bucket资源

def buck_put_object():
    #打印验证临时身份信息
    print(StsInfo)
    print('key id:',StsInfo.access_key_id)
    print("key_secret:",StsInfo.access_key_secret)
    print("secrity_token:",StsInfo.security_token)
    print("request_id:",StsInfo.request_id)
    print("expiration:",StsInfo.expiration)
    
    #实例化Bucket对象,并上传字符串
    auth = oss2.StsAuth(StsInfo.access_key_id, StsInfo.access_key_secret, StsInfo.security_token)
    bucket = oss2.Bucket(auth,endpoint,'fralychen')
    result = bucket.put_object('fralychen','good good study day day up')           

全部代码

# -*- conding:utf-8 -*-
import json
import os

import oss2

from aliyunsdkcore import client
from aliyunsdkcore.profile import region_provider
from aliyunsdksts.request.v20150401 import AssumeRoleRequest

##定义一些变量
access_key_id = 'LTAI4FoMe6umpCSFQdEC9neg' 
access_key_secret = 'jgEvtFOqIAGqKZve7zMvg8dJhSZv9J'
bucket_name = 'fralychen'
endpoint = 'oss-cn-hongkong.aliyuncs.com'
sts_role_arn = 'acs:ram::1149877324567510:role/testrole'



# 确认上面的参数都填写正确了
for param in (access_key_id, access_key_secret, bucket_name, endpoint, sts_role_arn):
    assert '<' not in param, '请设置参数:' + param

#创建StsToken类方便用来存储临时身份信息
class StsInfo(object):
    """AssumeRole返回的临时身份信息
    :param str access_key_id: 临时身份的access key id
    :param str access_key_secret: 临时身份的access key secret
    :param int expiration: 过期时间,UNIX时间,自1970年1月1日UTC零点的秒数
    :param str security_token: 临时身份Token
    :param str request_id: 请求ID
    """
    def __init__(self):
        self.access_key_id = ''
        self.access_key_secret = ''
        self.expiration = 0
        self.security_token = ''
        self.request_id = ''

# 在控制台将 AliyunSTSAssumeRoleAccess 权限授权给RAM子用户之后才能通过RAM用户获取临时身份信息
def fetch_sts_info(access_key_id, access_key_secret, sts_role_arn):
    """子用户角色扮演获取临时身份的信息
    :param access_key_id: 子用户的 access key id
    :param access_key_secret: 子用户的 access key secret
    :param sts_role_arn: STS角色的Arn
    :return StsInfo 返回临时身份信息对象
    """
    # 配置要访问的STS endpoint
    _REGIONID = 'cn-hongkong'
    _ENDPOINT = 'sts.cn-hongkong.aliyuncs.com'
    region_provider.add_endpoint('Sts', _REGIONID, _ENDPOINT)

    clt = client.AcsClient(access_key_id, access_key_secret, 'cn-hongkong')
    request = AssumeRoleRequest.AssumeRoleRequest()

    #request.set_accept_format('json')
    #指定角色ARN
    request.set_RoleArn(sts_role_arn)
    #设置会话名称,审计服务使用此名称区分调用者
    request.set_RoleSessionName('oss-python-sdk-example')
    #设置临时身份过期时间
    request.set_DurationSeconds(DurationSeconds)
    #发起请求,并得到response
    response = clt.do_action_with_exception(request)
    #格式化输出返回结果,将字符串结果转化为字典类型
    i = json.loads(oss2.to_unicode(response))
    #实例化StsInfo类并将临时身份信息存入对象
    global StsInfo
    StsInfo = StsInfo()
    StsInfo.access_key_id = i['Credentials']['AccessKeyId']
    StsInfo.access_key_secret = i['Credentials']['AccessKeySecret']
    StsInfo.security_token = i['Credentials']['SecurityToken']
    StsInfo.request_id = i['RequestId']
    StsInfo.expiration = oss2.utils.to_unixtime(i['Credentials']['Expiration'], '%Y-%m-%dT%H:%M:%SZ')
    

    #存储临时身份信息
    save_info()


#使用sts授权的临时身份上传文件到bucket
def buck_put_object(sts_key_id, sts_key_secret, sts_secrity_token):
    """上传字符串到资源
    :param sts_key_id: 临时身份的 access key id
    :param sts_key_secret: 临时身份的 access key secret
    :param sts_secrity_token: 临时身份的 secrity token
    :retu
    """ 
    #实例化Bucket对象,并上传字符串
    auth = oss2.StsAuth(sts_key_id, sts_key_secret, sts_secrity_token)
    bucket = oss2.Bucket(auth,endpoint,'fralychen')
    result = bucket.put_object('fralychen','good good study day day up')

#根据需求,可将临时身份信息存储到json文件中,等到临时身份过期后再重新请求,避免重复请求,用户泛滥
def save_info():
    #存储临时身份信息
    with open('StsInfo.json','w',encoding='utf-8') as f:
        data = {'sts_key_id':StsInfo.access_key_id,'sts_key_secret':StsInfo.access_key_secret,'sts_secrity_token':StsInfo.security_token,'sts_expire_date':StsInfo.expiration,'sts_reques_id':StsInfo.request_id}
        json.dump(data,f,ensure_ascii=False)
        f.close


def open_info():
    #读取临时身份信息
    with open('./StsInfo.json','r',encoding='utf-8') as f:
        global STSINFO
        STSINFO = json.load(f)
        return STSINFO


#定义临时身份过期时间
DurationSeconds = 900

try:
    open_info()
except IOError:
    print("Error: 没有用户信息文件或文件读取失败")
    print("初始化身份信息,临时身份信息存储中....")
    fetch_sts_info(access_key_id, access_key_secret, sts_role_arn)
    print("临时身份信息存储完毕,当前目录下StsInfo.json")
else:
    if oss2.utils.http_to_unixtime(oss2.utils.http_date()) + DurationSeconds > STSINFO["sts_expire_date"]:
        buck_put_object(sts_key_id = STSINFO["sts_key_id"],sts_key_secret = STSINFO["sts_key_secret"], sts_secrity_token = STSINFO["sts_secrity_token"])
        print("上传成功,good_lucky")
    else:
        print("更新临时用户信息,请稍后")
        fetch_sts_info(access_key_id, access_key_secret, sts_role_arn)
        buck_put_object(sts_key_id = STSINFO["sts_key_id"],sts_key_secret = STSINFO["sts_key_secret"], sts_secrity_token = STSINFO["sts_secrity_token"])
        print("上传成功,good_lucky")