天天看點

elasticsearch裡面bulk的用法

elasticsearch裡面bulk的用法

上篇文章介紹了在es裡面批量讀取資料的方法mget,本篇我們來看下關于批量寫入的方法bulk。

bulk api可以在單個請求中一次執行多個索引或者删除操作,使用這種方式可以極大的提升索引性能。

bulk的文法格式是:

action and meta_data \n
optional source \n

action and meta_data \n
optional source \n

action and meta_data \n
optional source \n           

複制

`

從上面能夠看到,兩行資料構成了一次操作,第一行是操作類型可以index,create,update,或者delete,第二行就是我們的可選的資料體,使用這種方式批量插入的時候,我們需要設定的它的Content-Type為application/json。

針對不同的操作類型,第二行裡面的可選的資料體是不一樣的,如下:

(1)index 和 create  第二行是source資料體
(2)delete 沒有第二行
(3)update 第二行可以是partial doc,upsert或者是script           

複制

`

我們可以将我們的操作直接寫入到一個文本檔案中,然後使用curl指令把它發送到服務端:

一個requests檔案内容如下:

{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }           

複制

`

發送指令如下:

curl -s -H "Content-Type: application/x-ndjson" -XPOST localhost:9200/_bulk --data-binary "@requests"; echo           

複制

` 響應結果如下:

{"took":7, "errors": false, "items":[{"index":{"_index":"test","_type":"_doc","_id":"1","_version":1,"result":"created","forced_refresh":false}}]}           

複制

`

注意由于我們每行必須有一個換行符,是以json格式隻能在一行裡面而不能使用格式化後的内容,下面看一個正确的post bulk的請求資料體:

{ "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } }
{ "field1" : "value1" }
{ "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } }
{ "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } }
{ "field1" : "value3" }
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} }
{ "doc" : {"field2" : "value2"} }           

複制

`

bulk請求的傳回操作的結果也是批量的,每一個action都會有具體的應答體,來告訴你目前action是成功執行還是失敗 :

{
   "took": 30,
   "errors": false,
   "items": [
      {
         "index": {
            "_index": "test",
            "_type": "_doc",
            "_id": "1",
            "_version": 1,
            "result": "created",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "status": 201,
            "_seq_no" : 0,
            "_primary_term": 1
         }
      },
      {
         "delete": {
            "_index": "test",
            "_type": "_doc",
            "_id": "2",
            "_version": 1,
            "result": "not_found",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "status": 404,
            "_seq_no" : 1,
            "_primary_term" : 2
         }
      },
      {
         "create": {
            "_index": "test",
            "_type": "_doc",
            "_id": "3",
            "_version": 1,
            "result": "created",
            "_shards": {
               "total": 2,
               "successful": 1,
               "failed": 0
            },
            "status": 201,
            "_seq_no" : 2,
            "_primary_term" : 3
         }
      },
      {
         "update": {
            "_index": "test",
            "_type": "_doc",
            "_id": "1",
            "_version": 2,
            "result": "updated",
            "_shards": {
                "total": 2,
                "successful": 1,
                "failed": 0
            },
            "status": 200,
            "_seq_no" : 3,
            "_primary_term" : 4
         }
      }
   ]
}           

複制

`

bulk請求的路徑有三種和前面的mget的請求類似:

(1) /_bulk  

(2)/{index}/_bulk

(3)/{index}/{type}/_bulk           

複制

`

上面的三種格式,如果提供了index和type那麼在資料體裡面的action就可以不提供,同理提供了index但沒有type,那麼就需要在資料體裡面自己添加type。

此外,還有幾個參數可以用來控制一些操作:

(1)資料體裡面可以使用_version字段

(2)資料體裡面可以使用_routing字段

(3)可以設定waitforactive_shards參數,資料拷貝到多個shard之後才進行bulk操作

(4)refresh控制多久間隔多搜尋可見

最後重點介紹下update操作,update操作在前面的文章也介紹過,es裡面提供了多種更新資料的方法如:

(1)doc
(2)upsert
(3)doc_as_upsert
(4)script
(5)params ,lang ,source           

複制

`

在bulk裡面的使用update方法和java api裡面類似,前面的文章也介紹過詳細的使用,現在我們看下在bulk的使用方式:

POST _bulk
{ "update" : {"_id" : "1", "_type" : "_doc", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"} }
{ "update" : { "_id" : "0", "_type" : "_doc", "_index" : "index1", "retry_on_conflict" : 3} }
{ "script" : { "source": "ctx._source.counter += params.param1", "lang" : "painless", "params" : {"param1" : 1}}, "upsert" : {"counter" : 1}}
{ "update" : {"_id" : "2", "_type" : "_doc", "_index" : "index1", "retry_on_conflict" : 3} }
{ "doc" : {"field" : "value"}, "doc_as_upsert" : true }
{ "update" : {"_id" : "3", "_type" : "_doc", "_index" : "index1", "_source" : true} }
{ "doc" : {"field" : "value"} }
{ "update" : {"_id" : "4", "_type" : "_doc", "_index" : "index1"} }
{ "doc" : {"field" : "value"}, "_source": true}           

複制

` 其實就是非格式化的内容,放在一行然後送出就行了,不同之處在于前面的文章介紹的是單次請求,而使用bulk之後就可以一次請求批量發送多個操作了。

總結:

本篇文章介紹了在es裡面bulk操作的用法,使用bulk操作我們可以批量的插入資料來提升寫入性能,但針對不同的action的它的資料格式體是不一樣的,這一點需要注意,同時在每行資料結束時必須加一個換行符,不然es是不能正确識别其格式的。