話不多說,環境是ElasticSearch 安裝教程 可以看這個。我的環境是5.3 + 分詞 。
一、Index的建立
5.x的預設是不會在你插入資料的時候主動建立index的,是以網上其他地方的介紹代碼,都有問題。你是沒法直接用的。
建立index:
// 設定叢集名稱
Settings settings = Settings.builder().put("cluster.name", clusterName).build();
// 建立client
client = new PreBuiltTransportClient(settings);
Map<String, Integer> nodeMap = parseNodeIpInfo();
for (Map.Entry<String, Integer> entry : nodeMap.entrySet()) {
client.addTransportAddress(
new InetSocketTransportAddress(InetAddress.getByName(entry.getKey()), entry.getValue()));
}
//建立client後,擷取index的配置參數
XContentBuilder mapping = getIndexMapping();
client.admin().indices().prepareCreate(userIndexName).setSettings(mapping).get();
/**
* 解析節點IP資訊,多個節點用逗号隔開,IP和端口用冒号隔開
* @return
*/
private Map<String, Integer> parseNodeIpInfo() {
String[] nodeIpInfoArr = esNodes.split(","); //esNodes為外部注入的es的ip
Map<String, Integer> map = new HashMap<String, Integer>(nodeIpInfoArr.length);
for (String ipInfo : nodeIpInfoArr) {
String[] ipInfoArr = ipInfo.split(":");
map.put(ipInfoArr[], Integer.parseInt(ipInfoArr[]));
}
return map;
}
/**
* 建立所需的index配置
* @return
* @throws IOException
*/
private XContentBuilder getIndexMapping() throws IOException {
XContentBuilder mapping = XContentFactory.jsonBuilder();
mapping.startObject()
.startObject("index")
.startObject("analyzer")
.startObject("pinyin_analyzer")
.field("tokenizer", "my_pinyin")
.endObject()
.startObject("default")
.field("tokenizer", "ik_max_word")
.endObject()
.endObject()
.startObject("tokenizer")
.startObject("my_pinyin")
.field("type", "pinyin")
// 拼音首字母單獨開一個
.field("keep_separate_first_letter", true)
.field("keep_full_pinyin", true)
.field("keep_original", true)
.field("limit_first_letter_length", )
.field("lowercase", true)
.field("remove_duplicated_term", true)
.endObject().endObject().endObject().endObject().endObject();
return mapping;
}
// 判斷index建立是否成功
IndicesExistsResponse userResponse = client.admin().indices().exists(new IndicesExistsRequest(userIndexName)).actionGet();
return userResponse.isExists(); //true 存在
二、建立type
在ElasticSearch裡面,上面的index就相當于建立一個了一個資料庫,而type則是建立一張表。
//es裡面設定參數的話用這個比較形象的去建構json格式
XContentBuilder mapping = XContentFactory.jsonBuilder()
.startObject()
.startObject(userTypeName) //該type 名
.startObject("properties")
.startObject("userId") //相當于資料庫字段名 類型
.field("type", "text") //字段類型
.endObject()
.startObject("loginName") // 嵌套對象字段
.field("type", "keyword") //特殊設定,該字段是拼音分詞的
.startObject("fields")
.startObject("pinyin")
.field("type", "text")
.field("store", false)
.field("term_vector", "with_offsets")
.field("analyzer", "pinyin_analyzer")
.field("boost", )
.endObject()
.endObject()
.endObject() //删去了部分字段
.startObject("orgId")
.field("type", "text")
.endObject()
.startObject("orgPath")
.field("type", "text")
.endObject()
.endObject().endObject().endObject();
PutMappingRequest mappingRequest = Requests.putMappingRequest(userIndexName).type(userTypeName).source(mapping);
client.admin().indices().putMapping(mappingRequest).actionGet();
//判斷type是否存在
TypesExistsRequest type = new TypesExistsRequest(new String[] { userIndexName }, userTypeName);
return client.admin().indices().typesExists(type).actionGet().isExists();
三、增
向ElasticSearch裡面添加資料的話可以一個個插入也可以批量添加:
public void bulkSaveUserToES(List<UserESData> users) throws Exception {
BulkRequestBuilder bulk = client.prepareBulk();
//循環添加資料
for (UserESData u : users) {
//這裡是用我資料庫裡面的id作為ElasticSearch的文檔id
bulk.add(client.prepareIndex(userIndexName, userTypeName, u.getUserId()).setSource(JSON.toJSONString(u),XContentType.JSON));
}
//執行
bulk.execute().actionGet();
}
單個資料儲存
public void saveUserESData(final UserESData user) throws Exception {
client.prepareIndex(userIndexName, userTypeName, user.getUserId())
.setSource(JSON.toJSONString(user), XContentType.JSON).get();
}
四、改
在這裡ElasticSearch提供多種方式用于定位要修改的資料:
//根據id修改
XContentBuilder builder = null;
try {
builder = XContentFactory.jsonBuilder()
.startObject()
.field("userName", user.getUserName())
.field("orgId", user.getOrgId())
.field("orgPath", user.getOrgPath())
.endObject();
} catch (IOException e) {
log.error("方法updateUserESData 再構造存儲對象時出錯!", e);
}
//根據這個id
client.prepareUpdate(userIndexName, userTypeName, user.getUserId()).setDoc(builder)
.get();
//批量根據id更新
BulkRequestBuilder bulk = client.prepareBulk();
for (Entry<String, Object[]> info : userInfo.entrySet()) {
bulk.add(client.prepareUpdate(userIndexName, userTypeName, info.getKey())
.setDoc(XContentFactory.jsonBuilder()
.startObject()
.field("orgId", info.getValue()[])
.field("orgPath", info.getValue()[])
.field("teamId", info.getValue()[])
.endObject()));
}
//執行
bulk.execute().get();
若是根據某些條件更新,可以使用ElasticSearch的腳本 Painless
對于我們在ElasticSearch裡面的每一行資料(文檔)都相當于 “ctx“ 資料的屬性是可以通路的
如上邊的圖,我點選一行資料可以看到很多屬性。我使用ctx._source.userId 就可以擷取該userId
在程式裡:
//使用ElasticSearch的script來存放腳本 , 讓userName修改為newName
Script script = new Script("ctx._source.userName=" + newName);
//建立更新的條件,假如是orgPath要能滿足orgInfo[]這個條件的,就修改userName
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.should(QueryBuilders.matchPhraseQuery("orgPath", orgInfo[]));
//執行
UpdateByQueryAction.INSTANCE.newRequestBuilder(client).script(script).filter(boolQuery)
.source(orgIndexName).get();
//該段代碼相當于sql
UPDATE table SET userName = ? WHERE orgPath LIKE ?
如果條件有多個比如:UPDATE table SET userName = ? WHERE orgPath LIKE ? AND userName LIKE ?
script 不用變,
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
boolQuery.should(QueryBuilders.matchPhraseQuery("orgPath", orgInfo[]));
boolQuery.must(QueryBuilders.matchPhraseQuery("userName", "陳"));
//如果希望修改路徑(字元串)的部分值,那麼參照Painless 的白名單API 可以這樣編寫腳本
StringBuilder sb = new StringBuilder();
sb.append("if(ctx._source.orgPath.startsWith(\"").append(oldOrgPath)
.append("\")){ctx._source.orgPath=ctx._source.orgPath.replace(\"")
.append(oldOrgPath).append("\",\"").append(newOrgPath)
.append("\")}");
Script script = new Script(sb.toString());
/** 如果是以oldOrgPath開頭的路徑,則替換為newOrgPath
if(ctx._source.orgPath.startsWith(oldOrgPath)){
ctx._source.orgPath = ctx._source.orgPath.replace(oldOrgPath,newOrgPath)
}
*/
五、删除
ElasticSearch的删除
//根據id删除
client.prepareDelete(userIndexName, userTypeName, userId).execute().actionGet();
//根據條件來删除 傳回删除條數 queryBuilder 建構的查詢條件
long delete = DeleteByQueryAction.INSTANCE.newRequestBuilder(client).filter(queryBuilder).source(userIndexName).get().getDeleted();