天天看点

Netty源码实战(十) - 性能优化(下)参考

1.2 轻量级对象池 Recycler

1.2.1 Recycler的使用

Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考

所以不使用 new 而是直接复用

Netty源码实战(十) - 性能优化(下)参考

Netty使用

Netty源码实战(十) - 性能优化(下)参考

1.2.2 Recycler的创建

Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考

下面回到Recycler的构造方法,看其入参.

Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考

1.2.3 回收对象到 Recycler

1.2.3.1 同线程回收

客户端开始调用

Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考

Recycler抽象类

Netty源码实战(十) - 性能优化(下)参考

将当前对象压栈

Netty源码实战(十) - 性能优化(下)参考
  • 如下,首先判断当前线程,thread 即为S tack 对象中保存的成员变量,若是创建该 stack 的线程,则直接压栈Stack 中,若不是再 pushlater.先分析 pushnow.
Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考

首先验证两个 id,由于默认初始值为0,所以通过判断.

Netty源码实战(十) - 性能优化(下)参考

接下来将其都赋值为第三个 id,该值在整个Recycler 中都是唯一确定值

Netty源码实战(十) - 性能优化(下)参考

紧接着判断当前 size是否已到 maxcapacity,若达到上限,直接丢弃该对象即直接 return;否则继续判断 drop 处理器

Netty源码实战(十) - 性能优化(下)参考

首先判断,若该对象之前未被回收过,继续判断;

至今已经回收了多少个对象,其中 rm 为7,即111(二进制表示),即每隔8个对象,就会来此判断一次,将其与7进行与运算后,若不为0,则返回 true,表示只回收八分之一的对象.

继续回到 pushnow 的流程,接下来判断 size 是否等于数组的容量.

因为 element 是一个数组,并不是一开始就创建maxcapacity 容量大小,若容量不够了,则进行两倍大小扩容,再将其加入数组.

1.2.3.2 异线程回收对象

本节食用指南

Netty源码实战(十) - 性能优化(下)参考
1.2.3.2.1 获取 WeakOrderQueue(以下简称WOQ)

由前面 pushnow 进入同线程回收, pushlater 即进入异线程回收过程.

Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考

先看看这么个东西是啥

Netty源码实战(十) - 性能优化(下)参考

其类型就很神奇了,首先是个FTL,即每个线程都有一个 map,map 的key=stack 表示对于不同线程,对不同 stack 来说对应于不同的WOQ.

那么它为何要定义成一个 map 结构呢,假设有3个线程T1/2/3;

T1创建的对象肯可能跑到T3中回收,T2中创建的对象也可能到了T3回收.

那么元素就是T1以及T2的WOQ.

Netty源码实战(十) - 性能优化(下)参考

假设当前在T2中,接下呢就通过get(this)拿到T1的WOQ,其中的 this 指的就是T1的 Stack.

然后若 queue==null,即表示T2从未回收过T1的对象,接下来开始判断

当前的即T2已经回收过的线程数 size,若不小于 mDQ,说明已经不能再回收其他线程的对象了!

给WOQ设置 dummy 标志,即对应下面的若下次看到一个线程标志了 dummy 直接return;什么也不做.

以上即为第一个过程,从FTL中拿一个Stack 对应的WOQ.

1.2.3.2.2 创建 WeakOrderQueue

若之前没拿到呢,那就直接创建一个WOQ吧!

接下来让我们看看一个线程创建WOQ是如何与待回收对象的Stack 进行绑定的.

Netty源码实战(十) - 性能优化(下)参考

其中的 this 即为 stack,是在T1中维护的,thread 即表示当前线程T2.

allocat就是为了给当前线程T2分配一个在T1中的Stack 对应的一个WOQ.

Netty源码实战(十) - 性能优化(下)参考

首先判断,T1中的 Stack还能否再分配LINK_capacity 个内存,若不能直接返回 null;

若可以,就直接 new 一个WOQ.

让我们具体看看其实现.

Netty源码实战(十) - 性能优化(下)参考
  • 此函数意义为:该 Stack 允许外部线程给它缓存多少个对象

    经过CAS操作设置该值为Stack 可为其他线程共享的回收对象的个数.

容量足够,则直接创建一个WOQ,下面来看看其数据结构.

Netty源码实战(十) - 性能优化(下)参考

一个链表结构.将其 handle与Link 进行分离,极大地提升了性能,

因为不必判断当前T2能否回收T1的对象,而只需判断当前的L ink 中是否有空的,则可直接将 hande 塞进去.因为在前面一次性的判断过,从T1中是否能批量分配这么多对象(以减少很多操作的频率).

Netty源码实战(十) - 性能优化(下)参考

使用同步,将WOQ插到Stack 的头部.

Netty源码实战(十) - 性能优化(下)参考
1.2.3.2.3 将对象追加到 WeakOrderQueue

一开始呢,就是这么创建一个WOQ,默认有16个 handle

Netty源码实战(十) - 性能优化(下)参考

T2已经拿到queue,接着就是添加元素.

Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考

首先设置 

上次回收 id

.

Netty源码实战(十) - 性能优化(下)参考

该 id 为WOQ的 id,所以是以WOQ为基础的

Netty源码实战(十) - 性能优化(下)参考

然后拿到尾指针,获取Link 的长度,若已经等于 link_capacity,说明已经不可写了;

继续判断 想办法看看T1是否还能再分配一个Link来保存待回收的对象.

不允许,则直接丢弃;

允许,则直接创建Link并重新赋值 tail 节点.

Netty源码实战(十) - 性能优化(下)参考

创建完后,拿到其写指针,即 tail 的长度(0).所以 tail 节点也已经又有了足够的存储空间,将 handle 追加进去.再将该 handle 的 stack 指针重置为 null,因为已经不属于原来的 stack 了.

最后,写指针+1.

1.2.4 从 Recycler 获取对象

Netty源码实战(十) - 性能优化(下)参考

本节分析若当前 stack 为空

Netty源码实战(十) - 性能优化(下)参考

若当前线程T1去获取对象,若 stack 中有对象,则直接拿出.T1所拥有的对象即为T1拥有的 stack 中的对象,若发现其中为空,会尝试与 和T1的 stack 关联的WOQ中的 T1创建的,但是在其他线程中去回收的对象.那么,T1中对象不足,就需要在其他线程中去回收.

其中的 cusor 指针即当前所需要回收的对象

弹栈获取元素

Netty源码实战(十) - 性能优化(下)参考

若 size 为0,则从其他线程回收

Netty源码实战(十) - 性能优化(下)参考

若已经回收到则直接 return true.没有则重置两个指针,将 cusor 指向头结点,意味着准备从头开始回收.

Netty源码实战(十) - 性能优化(下)参考

接下来具体分析这段长代码

boolean scavengeSome() {
            WeakOrderQueue prev;
            // 先拿到 cusor
            WeakOrderQueue cursor = this.cursor;
            // cusor 节点无对象
            if (cursor == null) {
                prev = null;
                // 指向头结点
                cursor = head;
                // 头结点依旧为空,已经没有与之关联的WOQ,直接返回 false.
                if (cursor == null) {
                    return false;
                }
            } else {
                prev = this.prev;
            }

            boolean success = false;
            // 此处 do/while 循环只为去寻找与 stack 关联的WOQ,看看到底能不能碰到一个对象.
            do {
                // transfer 即为了将WOQ中的对象传输到 stack 中.成功获取则结束循环!
                if (cursor.transfer(this)) {
                    success = true;
                    break;
                }
                
                // 没有回收成功,则看往 cusor 的下一个节点
                WeakOrderQueue next = cursor.next;
                // owner 为与当前WOQ关联的一个线程(对应图中的T4)
                // 为空,说明T4已经不存在!随后即,做一些善后清理工作
                if (cursor.owner.get() == null) {
                    // If the thread associated with the queue is gone, unlink it, after
                    // performing a volatile read to confirm there is no data left to collect.
                    // We never unlink the first queue, as we don't want to synchronize on updating the head.
                    // 判断节点中是否还有数据
                    if (cursor.hasFinalData()) {
                        // 就需要想办法将数据传输到 stack 中
                        for (;;) {
                            if (cursor.transfer(this)) {
                                success = true;
                            } else {
                                break;
                            }
                        }
                    }
                    // 处理完该节点后,即将其删除,通过传统的指针的删除方法
                    if (prev != null) {
                        prev.setNext(next);
                    }
                  //  T4还存活,继续看后继节点.
                } else {
                    prev = cursor;
                }

                cursor = next;
            // cusor 为空时,诶就结束循环啦!
            } while (cursor != null && !success);

            this.prev = prev;
            this.cursor = cursor;
            return success;
        }
      
  • 下面看传输方法
// transfer as many items as we can from this queue to the stack, returning true if any were transferred
        @SuppressWarnings("rawtypes")
        boolean transfer(Stack<?> dst) {
            // 先找到头结点
            Link head = this.head;
            if (head == null) {
                // WOQ无数据,直接返回 false
                return false;
            }

            // 说明当前Link 的所有数据已被取走.
            if (head.readIndex == LINK_CAPACITY) {
                if (head.next == null) {
                    return false;
                }
                this.head = head = head.next;
            }
            
            // 从该索引开始取对象
            final int srcStart = head.readIndex;
            int srcEnd = head.get();
           // 当前Link 要传输到 Stack 的对象个数.
            final int srcSize = srcEnd - srcStart;
            if (srcSize == 0) {
                return false;
            }
      
Netty源码实战(十) - 性能优化(下)参考
// dst 为当前Stack
            final int dstSize = dst.size;
            // 将对象都传输到Stack 所需容量.
            final int expectedCapacity = dstSize + srcSize;

             // 所需容量大于Stack 的大小,
            if (expectedCapacity > dst.elements.length) {
                // 因此进行扩容
                final int actualCapacity = dst.increaseCapacity(expectedCapacity);
                // 可传输的最后一个对象
                srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
            }

            if (srcStart != srcEnd) {
                final DefaultHandle[] srcElems = head.elements;
                final DefaultHandle[] dstElems = dst.elements;
                int newDstSize = dstSize;
                for (int i = srcStart; i < srcEnd; i++) {
                    DefaultHandle element = srcElems[i];
                    //为0说明未被回收过
                    if (element.recycleId == 0) {
                        element.recycleId = element.lastRecycledId;
                    } else if (element.recycleId != element.lastRecycledId) {
                        throw new IllegalStateException("recycled already");
                    }
                    srcElems[i] = null;

                    if (dst.dropHandle(element)) {
                        // Drop the object.
                        continue;
                    }
                    element.stack = dst;
                    dstElems[newDstSize ++] = element;
                }

                if (srcEnd == LINK_CAPACITY && head.next != null) {
                    // Add capacity back as the Link is GCed.
                    reclaimSpace(LINK_CAPACITY);

                    this.head = head.next;
                }

                head.readIndex = srcEnd;
                if (dst.size == newDstSize) {
                    return false;
                }
                dst.size = newDstSize;
                return true;
            } else {
                // The destination stack is full already.
                return false;
            }
        }
      

1.3 小结

Netty源码实战(十) - 性能优化(下)参考
Netty源码实战(十) - 性能优化(下)参考

参考

Java读源码之Netty深入剖析