天天看點

ASP.NET Core 3.x 并發限制

前言

Microsoft.AspNetCore.ConcurrencyLimiter AspNetCore3.0後增加的,用于傳入的請求進行排隊處理,避免線程池的不足.

我們日常開發中可能常做的給某web伺服器配置連接配接數以及,請求隊列大小,那麼今天我們看看如何在通過中間件形式實作一個并發量以及隊列長度限制.

Queue政策

添加Nuget

Install-Package Microsoft.AspNetCore.ConcurrencyLimiter

public void ConfigureServices(IServiceCollection services)
        {
            services.AddQueuePolicy(options =>
            {
                //最大并發請求數
                options.MaxConcurrentRequests = 2;
                //請求隊列長度限制
                options.RequestQueueLimit = 1;
            });
            services.AddControllers();
        }
        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            //添加并發限制中間件
            app.UseConcurrencyLimiter();
            app.Run(async context =>
            {
                Task.Delay(100).Wait(); // 100ms sync-over-async

                await context.Response.WriteAsync("Hello World!");
            });
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseHttpsRedirection();

            app.UseRouting();

            app.UseAuthorization();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapControllers();
            });
        }     
           

通過上面簡單的配置,我們就可以将他引入到我們的代碼中,進而做并發量限制,以及隊列的長度;那麼問題來了,他是怎麼實作的呢?

public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
        services.Configure(configure);
        services.AddSingleton<IQueuePolicy, QueuePolicy>();
        return services;
}
           

QueuePolicy采用的是SemaphoreSlim信号量設計,SemaphoreSlim、Semaphore(信号量)支援并發多線程進入被保護代碼,對象在初始化時會指定 最大任務數量,當線程請求通路資源,信号量遞減,而當他們釋放時,信号量計數又遞增。

/// <summary>
        ///     構造方法(初始化Queue政策)
        /// </summary>
        /// <param name="options"></param>
        public QueuePolicy(IOptions<QueuePolicyOptions> options)
        {
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
            if (_maxConcurrentRequests <= 0)
            {
                throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");
            }

            _requestQueueLimit = options.Value.RequestQueueLimit;
            if (_requestQueueLimit < 0)
            {
                throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
            }
            //使用SemaphoreSlim來限制任務最大個數
            _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
        }

           

ConcurrencyLimiterMiddleware中間件

/// <summary>
        /// Invokes the logic of the middleware.
        /// </summary>
        /// <param name="context">The <see cref="HttpContext"/>.</param>
        /// <returns>A <see cref="Task"/> that completes when the request leaves.</returns>
        public async Task Invoke(HttpContext context)
        {
            var waitInQueueTask = _queuePolicy.TryEnterAsync();

            // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
            bool result;

            if (waitInQueueTask.IsCompleted)
            {
                ConcurrencyLimiterEventSource.Log.QueueSkipped();
                result = waitInQueueTask.Result;
            }
            else
            {
                using (ConcurrencyLimiterEventSource.Log.QueueTimer())
                {
                    result = await waitInQueueTask;
                }
            }

            if (result)
            {
                try
                {
                    await _next(context);
                }
                finally
                {
                    _queuePolicy.OnExit();
                }
            }
            else
            {
                ConcurrencyLimiterEventSource.Log.RequestRejected();
                ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
                context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
                await _onRejected(context);
            }
        }
           

每次當我們請求的時候首先會調用

_queuePolicy.TryEnterAsync()

,進入該方法後先開啟一個私有lock鎖,再接着判斷總請求量是否≥(請求隊列限制的大小+最大并發請求數),如果目前數量超出了,那麼我直接抛出,送你個503狀态;

if (result)
  {
         try
         {
             await _next(context);
         }
         finally
        {
            _queuePolicy.OnExit();
        }
        }
        else
        {
            ConcurrencyLimiterEventSource.Log.RequestRejected();
            ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
            context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
            await _onRejected(context);
        }

           

問題來了,我這邊如果說還沒到你設定的大小呢,我這個請求沒有給你伺服器造不成壓力,那麼你給我處理一下吧.

await _serverSemaphore.WaitAsync();

異步等待進入信号量,如果沒有線程被授予對信号量的通路權限,則進入執行保護代碼;否則此線程将在此處等待,直到信号量被釋放為止

lock (_totalRequestsLock)
    {
        if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
        {
             return false;
        }
            TotalRequests++;
        }
        //異步等待進入信号量,如果沒有線程被授予對信号量的通路權限,則進入執行保護代碼;否則此線程将在此處等待,直到信号量被釋放為止
        await _serverSemaphore.WaitAsync();
        return true;
    }
           

傳回成功後那麼中間件這邊再進行處理,

_queuePolicy.OnExit();

通過該調用進行調用

_serverSemaphore.Release();

釋放信号燈,再對總請求數遞減

Stack政策

再來看看另一種方法,棧政策,他是怎麼做的呢?一起來看看.再附加上如何使用的代碼.

public void ConfigureServices(IServiceCollection services)
        {
            services.AddStackPolicy(options =>
            {
                //最大并發請求數
                options.MaxConcurrentRequests = 2;
                //請求隊列長度限制
                options.RequestQueueLimit = 1;
            });
            services.AddControllers();
        }
           

通過上面的配置,我們便可以對我們的應用程式執行出相應的政策.下面再來看看他是怎麼實作的呢

public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
        {
            services.Configure(configure);
            services.AddSingleton<IQueuePolicy, StackPolicy>();
            return services;
        }

           

可以看到這次是通過

StackPolicy

類做的政策.來一起來看看主要的方法

/// <summary>
        ///     構造方法(初始化參數)
        /// </summary>
        /// <param name="options"></param>
        public StackPolicy(IOptions<QueuePolicyOptions> options)
        {
            //棧配置設定
            _buffer = new List<ResettableBooleanCompletionSource>();
            //隊列大小
            _maxQueueCapacity = options.Value.RequestQueueLimit;
            //最大并發請求數
            _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
            //剩餘可用空間
            _freeServerSpots = options.Value.MaxConcurrentRequests;
        }
           

當我們通過中間件請求調用,

_queuePolicy.TryEnterAsync()

時,首先會判斷我們是否還有通路請求次數,如果_freeServerSpots>0,那麼則直接給我們傳回true,讓中間件直接去執行下一步,如果目前隊列=我們設定的隊列大小的話,那我們需要取消先前請求;每次取消都是先取消之前的保留後面的請求;

public ValueTask<bool> TryEnterAsync()
        {
            lock (_bufferLock)
            {
                if (_freeServerSpots > 0)
                {
                    _freeServerSpots--;
                    return _trueTask;
                }
                // 如果隊列滿了,取消先前的請求
                if (_queueLength == _maxQueueCapacity)
                {
                    _hasReachedCapacity = true;
                    _buffer[_head].Complete(false);
                    _queueLength--;
                }
                var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
                _cachedResettableTCS = null;
                if (_hasReachedCapacity || _queueLength < _buffer.Count)
                {
                    _buffer[_head] = tcs;
                }
                else
                {
                    _buffer.Add(tcs);
                }
                _queueLength++;
                // increment _head for next time
                _head++;
                if (_head == _maxQueueCapacity)
                {
                    _head = 0;
                }
                return tcs.GetValueTask();
            }
        }
           

當我們請求後調用

_queuePolicy.OnExit();

出棧,再将請求長度遞減

public void OnExit()
        {
            lock (_bufferLock)
            {
                if (_queueLength == 0)
                {
                    _freeServerSpots++;

                    if (_freeServerSpots > _maxConcurrentRequests)
                    {
                        _freeServerSpots--;
                        throw new InvalidOperationException("OnExit must only be called once per successful call to TryEnterAsync");
                    }

                    return;
                }

                // step backwards and launch a new task
                if (_head == 0)
                {
                    _head = _maxQueueCapacity - 1;
                }
                else
                {
                    _head--;
                }
                //退出,出棧
                _buffer[_head].Complete(true);
                _queueLength--;
            }
        }

           

總結

基于棧結構的特點,在實際應用中,通常隻會對棧執行以下兩種操作:

  • 向棧中添加元素,此過程被稱為"進棧"(入棧或壓棧);
  • 從棧中提取出指定元素,此過程被稱為"出棧"(或彈棧);

隊列存儲結構的實作有以下兩種方式:

  • 順序隊列:在順序表的基礎上實作的隊列結構;
  • 鍊隊列:在連結清單的基礎上實作的隊列結構;