天天看点

Dapr实战(三)状态管理

状态管理解决了什么

分布式应用程序中的状态可能很有挑战性。 例如:

  • 应用程序可能需要不同类型的数据存储。
  • 访问和更新数据可能需要不同的一致性级别。
  • 多个用户可以同时更新数据,这需要解决冲突。
  • 服务必须重试 与数据存储交互 时发生的任何短期暂时性错误。

Dapr 状态管理解决了这些难题。 它简化了跟踪状态,而无需依赖关系或第三方存储 SDK 上的学习曲线。

工作原理

Dapr实战(三)状态管理

应用程序与 Dapr sidecar 交互,以存储和检索键/值数据。 在底层,sidecar API 使用可配置的状态存储组件来保存数据。 开发人员可以从不断增长的受支持状态存储集合中选择,其中包括 Azure Cosmos DB、SQL Server 和 Cassandra。

可以使用 HTTP 或 gRPC 调用 API。 使用以下 URL 调用 HTTP API:

http://localhost:<dapr-port>/v1.0/state/<store-name>/      
  • <dapr-port>

    :Dapr 侦听的 HTTP 端口。
  • <store-name>

    :使用的状态存储组件的名称。

状态组件

Dapr支持的组件

为本地自承载开发初始化时,Dapr 将 Redis 注册为默认状态存储。 下面是默认状态存储配置的示例,配置文件位置为C:\Users\<username>\.dapr\components。 记下默认名称 

statestore

 :

apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: statestore
spec:
  type: state.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
  - name: actorStateStore
    value: "true"      

项目演示

仍然使用 上一篇服务调用 的FrontEnd项目,新建StateController

using Dapr;
using Dapr.Client;

using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;

using System;
using System.Collections.Generic;
using System.Threading.Tasks;

namespace FrontEnd.Controllers
{
    [Route("[controller]")]
    [ApiController]
    public class StateController : ControllerBase
    {
        private readonly ILogger<StateController> _logger;
        private readonly DaprClient _daprClient;
        public StateController(ILogger<StateController> logger, DaprClient daprClient)
        {
            _logger = logger;
            _daprClient = daprClient;
        }

        // 获取一个值
        [HttpGet]
        public async Task<ActionResult> GetAsync()
        {
            var result = await _daprClient.GetStateAsync<string>("statestore", "guid");
            return Ok(result);
        }

        //保存一个值
        [HttpPost]
        public async Task<ActionResult> PostAsync()
        {
            await _daprClient.SaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), new StateOptions() { Consistency = ConsistencyMode.Strong });
            return Ok("done");
        }

        //删除一个值
        [HttpDelete]
        public async Task<ActionResult> DeleteAsync()
        {
            await _daprClient.DeleteStateAsync("statestore", "guid");
            return Ok("done");
        }

        //通过tag防止并发冲突,保存一个值
        [HttpPost("withtag")]
        public async Task<ActionResult> PostWithTagAsync()
        {
            var (value, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
            await _daprClient.TrySaveStateAsync<string>("statestore", "guid", Guid.NewGuid().ToString(), etag);
            return Ok("done");
        }

        //通过tag防止并发冲突,删除一个值
        [HttpDelete("withtag")]
        public async Task<ActionResult> DeleteWithTagAsync()
        {
            var (value, etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
            return Ok(await _daprClient.TryDeleteStateAsync("statestore", "guid", etag));
        }


        // 从绑定获取一个值,健值name从路由模板获取
        [HttpGet("frombinding/{name}")]
        public async Task<ActionResult> GetFromBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
        {
            return Ok(state.Value);
        }


        // 根据绑定获取并修改值,健值name从路由模板获取
        [HttpPost("withbinding/{name}")]
        public async Task<ActionResult> PostWithBindingAsync([FromState("statestore", "name")] StateEntry<string> state)
        {
            state.Value = Guid.NewGuid().ToString();
            return Ok(await state.TrySaveAsync());
        }


        // 获取多个个值
        [HttpGet("list")]
        public async Task<ActionResult> GetListAsync()
        {
            var result = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
            return Ok(result);
        }

        // 删除多个个值
        [HttpDelete("list")]
        public async Task<ActionResult> DeleteListAsync()
        {
            var data = await _daprClient.GetBulkStateAsync("statestore", new List<string> { "guid" }, 10);
            var removeList = new List<BulkDeleteStateItem>();
            foreach (var item in data)
            {
                removeList.Add(new BulkDeleteStateItem(item.Key, item.ETag));
            }
            await _daprClient.DeleteBulkStateAsync("statestore", removeList);
            return Ok("done");
        }
    }
}      

cmd运行

dapr run --dapr-http-port 3501 --app-port 5001  --app-id frontend dotnet  .\FrontEnd\bin\Debug\net5.0\FrontEnd.dll      

可通过postman调用sidecar的endpoint

Dapr实战(三)状态管理
Dapr实战(三)状态管理

 查看store存储中的内容

进入容器内部

docker exec -it dapr_redis /bin/sh      

调用redis-cli

redis-cli      

查看所有key

keys *      

可以看到有"frontend||guid"这个key,所以状态在redis中存储中Name的规则是appName||keyName,这样可以防止不同app的键冲突

我们通过type key查看下这个键的类型,可以发现他是一个hash

127.0.0.1:6379> type frontend||guid
hash      

再通过hgetall key查看他的数据,发现有两个键,一个data,一个version

127.0.0.1:6379> hgetall  frontend||guid
1) "data"
2) "\"e17b3e06-ba30-42c5-8960-48511c70b496\""
3) "version"
4) "1"      

data很明显是存入的数据,version呢?现在猜测是防止并发冲突的etag,我们下面来验证一下

在StateController中新增接口

// 获取一个值和etag
        [HttpGet("withetag")]
        public async Task<ActionResult> GetWithEtagAsync()
        {
            var (value,etag) = await _daprClient.GetStateAndETagAsync<string>("statestore", "guid");
            return Ok($"value is {value}, etag is {etag}");
        }      

通过dapr重启这个app,并调用withetag api,很明显redis中version与etag相等,初步印证我们的猜测

Dapr实战(三)状态管理

 我们可以通过post方法修改一下guid这个key,修改后etag会变更,再来看一下redis中version和etag是不是一个东西

首先调用POST方法修改值

Dapr实战(三)状态管理

 再调用withetag方法,看下etag,发现etag变成了2

Dapr实战(三)状态管理

 在比较一下redis中的version

127.0.0.1:6379> hgetall  frontend||guid
1) "data"
2) "\"36a55558-35c3-402c-ac9e-615014eb6904\""
3) "version"
4) "2"      

现在可以确定etag就是redis中的version了

Dapr实战(三)状态管理