天天看點

redis使用pipeline通道大幅度提升redis的處理速度,節省成本redis使用pipeline通道大幅度提升redis的處理速度,節省成本

redis使用pipeline通道大幅度提升redis的處理速度,節省成本

最近在做項目的時候,遇到大量的讀寫,最開始都是set,get一條條的循環去取資料,當資料量大的時候,資料處理相當慢慢,就想到批處理資料的方式,最開始set資料的時候,想到的是mset 也算是批量插入資料,這個在資料量幾百的話甚至幾千的插入量,也是OK的,取資料的時候用mget  這個100的資料量以下,性能還可以保證,再大的話就是嚴重有問題,資料量越大取出的成本本越高 ,另外一個,在用mset批量插入的時候,遇到一個難道,不過這是對redis不太了解的原因,後面無意中發現,set的時候,可以直接在後面參加參數,利用redisTemplate模闆,

    redisTemplate.opsForValue().set("key_s2", maps2.toString(), 1000, TimeUnit.SECONDS); 輕松搞定 

  下面主要是針對 pipeline 管道方式批量插入,插入的資料量都 是在100000條資料,redis通過tcp來對外提供服務,client通過socket連接配接發起請求,每個請求在指令發出後會阻塞等待redis伺服器進行處理,處理完畢後将結果傳回給client。每一個指令都對應了發送、接收兩個網絡傳輸,假如一個流程需要0.1秒,那麼一秒最多隻能處理10個請求,将嚴重制約redis的性能。

當我們要處理上萬以上資料的時候,就不得不另想辦法,此時pipeline管道就是解決執行大量指令時、會産生大量資料來回次數而導緻延遲的技術。其實原理比較簡單,pipeline是把所有的指令一次發過去,避免頻繁的發送、接收帶來的網絡開銷,redis在打包接收到一堆指令後,依次執行,然後把結果再打包傳回給用戶端。

下面直接用代碼試下,當然還作了其它的一些改動,比如RedisConnection 和SessionCallback 參數性能上不太一樣,相差也不多,重點說一下, redisConnection.openPipeline();  這個方法在doInRedis内部其實已經有,但是在這裡首先打開和關閉,對性能有一定的提高,10萬次提升100毫秒左右

//加 redisConnection.openPipeline(); 打開資源  第一次耗時:470  第二次 耗時:468  毫秒

//不加redisConnection.openPipeline();打開和關閉 第一次 耗時:533  第二次 耗時:490,主要是第一次執行操作 其實是可以忽略了,并且這個的參數是 doInRedis重寫方法是 RedisConnection 參數,後續對set的的操作必須是以redisConnection 參數開始,應該是建立一個接連去處理後續的所有操作,再到通道關閉,這樣麻煩一些,所有的序列化都要自己去處理

public void pipelinedTest(){

        Long time = System.currentTimeMillis();

        for (int i = 0; i < 10000; i++) {

            redisTemplate.opsForValue().increment("pipline", 1);

        }

        System.out.println("耗時:" + (System.currentTimeMillis() - time));

        time = System.currentTimeMillis();

        Map maps = new HashMap<>();

        maps.put("name", "lmc");

        maps.put("姓名", "李四");

        redisTemplate.executePipelined(new RedisCallback<Object>() {

            @Override  

            public Object doInRedis(RedisConnection redisConnection) throws DataAccessException {  

                redisConnection.openPipeline();  

                for (int i = 0;i < 100000;i++){

                    redisConnection.incrBy("a".getBytes(),1L);

                }

                redisConnection.set("keys".getBytes(), maps.toString().getBytes());

                redisConnection.closePipeline();  

                return null;  

            }

        });

        System.out.println("*****************"+redisTemplate.opsForValue().get("keys"));

    }

第二種方式 execute 重寫方法,參數RedisOperations 這種方式比較省心,不需要參數來操作, 用redisTemplate模闆操作直接省去了很多資料封裝的問題,但是性能上稍微差一點

// 這是第一次執行耗時:596    第二次執行耗時:589  毫秒

        Map maps2 = new HashMap<>();

        maps2.put("name", "lmc");

        maps2.put("姓名", "李四");

        redisTemplate.executePipelined(new SessionCallback<Object>() {

            @Override

            public <K, V> Object execute(RedisOperations<K, V> redisOperations) throws DataAccessException {

                for (int i = 0; i < 100000; i++) {

                    redisTemplate.opsForValue().increment("piplineddd", 1L);

                }

                redisTemplate.opsForValue().set("key_s2", maps2.toString(), 1000, TimeUnit.SECONDS);

                return null;

            }

        });

        System.out.println("

: "+redisTemplate.opsForValue().get("key_s2"));

    直接一次性送出,是以executePipelined源碼提供了多種方式供我們選擇,底層送出都 一樣的,隻是在對外提供的方式為了滿足更多的需求,

還在慢慢研究當中,如有描述不恰當的,還請各位指教

原文位址https://blog.csdn.net/limingcai168/article/details/81170399

繼續閱讀