Elasticsearch简介与实践

Elasticsearch是一个基于Lucene的开源搜索引擎,使用Elasticsearch可以搭建分布式、可扩展、高可用的搜索集群,并提供RESTful API。Elasticsearch包含的数据结构及其与关系数据库的类比如下所示:

Elasticsearch 关系数据库
index database
mapping table
document row
field field

工作中,我们使用Elasticsearch搭建搜索集群,对文章构建索引并提供文章搜索接口。使用了Elasticsearch中文发行版elasticsearch-rtf,该版本针对中文集成了相关插件。从Github上下载压缩包https://github.com/medcl/elasticsearch-rtf/archive/1.0.0.tar.gz 并解压。

部署集群

集群配置

修改config/elasticsearch.yml,设置集群名称:

cluster.name: mp_production_es

设置节点名称:

node.name: “production1”

对于同一集群中的各个节点,要保证各节点的集群名称相同,节点名称不同,这样在启动集群中的各节点时,能通过广播发现同一网段中具有相同集群名称的其他节点组成集群。

Analyzer配置

搜索引擎构建索引时需要先对文档进行分析,从文档中提取出token(词元),实现此操作的是tokenizer,提取出的token会被进一步处理(如转成小写等),实现此操作的是filter, 被处理后的结果被称为term(词),搜索引擎使用这些term构建倒排索引。tokenizer+filter被称为analyzer(分析器)。Elasticsearch内置了很多analyzer, 还有很多第三方的analyzer插件, 比如用于中文分词的analyzer。在elasticsearch.yml中可以配置所支持的tokenizer、filter和analyzer。
elasticsearch-rtf已集成了很多第三方的analyzer插件,并在elasticsearch.yml中已配置,其中“string2int ”用于将字符串转化为整数,从而减小索引文件大小,节约内存,这个插件使用Redis存储字符串和整数的映射关系,所以如果需要使用这个插件,需要搭建Redis并配置Redis访问地址,如果不使用这个插件,可以直接删除该插件配置。“ansj”是基于ansj的中文分词插件,这个插件可选择使用Redis的pub/sub方式更新词典,如果不使用这个插件,也可以直接删除该插件配置。
实际使用中,针对analyzer,我们默认使用“keyword”,即不分词,内容整体作为一个Token,并配置了“ik”,用于对标题和正文进行中文分词,配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
index:
analysis:
analyzer:
ik:
alias:
- ik_analyzer
type: org.elasticsearch.index.analysis.IkAnalyzerProvider
ik_max_word:
type: ik
use_smart: false
ik_smart:
type: ik
use_smart: true

index.analysis.analyzer.default.type: keyword

JVM配置

修改bin/service/elasticsearch.conf,设置堆大小:

set.default.ES_HEAP_SIZE=8192

运行

执行以下命令启动各节点,各节点通过广播发现同一网段中具有相同集群名称的其他节点自动组成集群。

bin/service/elasticsearch start

基于RESTful API创建索引

创建index:

curl -XPUT 127.0.0.1:9200/mp

创建mapping,设置文章各字段,其中主键是“_id”,“title”和“content”使用“ik”进行中文分词:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
curl -XPUT '127.0.0.1:9200/mp/news/_mapping' -d '{
"news":{
"_all" : {
"enabled" : false
},
"_id" : {
"index": "not_analyzed",
"store" : "yes",
"type":"integer"},
"properties" :{
"cmsId" :{
"type" : "integer",
"index": "no",
"store" : "yes"
},
"title" :{
"type" : "string",
"store": "yes",
"term_vector": "with_positions_offsets",
"indexAnalyzer": "ik",
"searchAnalyzer": "ik",
"include_in_all": "true"
},
"mobileTitle" :{
"type" : "string",
"index": "no",
"store" : "yes"
},
"brief" :{
"type" : "string",
"index": "no",
"store" : "yes"
},
"content": {
"type" : "string",
"store": "yes",
"term_vector": "with_positions_offsets",
"indexAnalyzer": "ik",
"searchAnalyzer": "ik",
"include_in_all": "true"
},
"time": {
"type" : "long",
"index": "not_analyzed",
"store" : "yes"
},
"mediaId": {
"type" : "integer",
"index": "not_analyzed",
"store" : "yes"
},
"channelId": {
"type" : "integer",
"index": "not_analyzed",
"store" : "yes"
},
"categoryId": {
"type" : "integer",
"index": "not_analyzed",
"store" : "yes"
},
"img": {
"type" : "string",
"index": "no",
"store" : "yes"
},
"url": {
"type" : "string",
"index": "no",
"store" : "yes"
},
"tags": {
"type" : "string",
"index": "no",
"store" : "yes"
},
"tagList": {
"type" : "string",
"index": "no",
"store" : "yes"
},
"json": {
"type" : "string",
"index": "no",
"store" : "yes"
}
}
}
}'

索引创建成功后,通过浏览器可查看到相关信息:
1
其中,有5个shard(分片),每个shard有一个副本。

基于Java API更新索引和搜索文章

添加依赖

1
2
3
4
5
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.0.0</version>
</dependency>

建立连接

创建ElasticsearchClientManager类,用于维护Client实例,建立并保持和搜索集群的连接。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.sohu.cms.mp.es.search;

import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchClientManager {

private static Logger logger = LoggerFactory.getLogger(ElasticsearchClientManager.class);

private Client client;
private String clusterName;
private String clusterIps;
private int clusterPort;

public void init() {
Settings settings = ImmutableSettings.settingsBuilder().put("cluster.name", clusterName).build();
client = new TransportClient(settings);
for (String clusterIp : clusterIps.split(";")) {
client = ((TransportClient) client).addTransportAddress(new InetSocketTransportAddress(clusterIp, clusterPort));
}
logger.info("init cluster [" + clusterName + "|" + clusterIps + "] success");
}

public void destroy() {
logger.info("destroy success");
client.close();
}

// 省略get/set方法

}

1
2
3
4
5
<bean id="elasticsearchClientManager" class="com.sohu.cms.mp.es.search.ElasticsearchClientManager" init-method="init" destroy-method="destroy">
<property name="clusterName" value="mp_production_es"/>
<property name="clusterIps" value="xxx.xxx.xxx.xxx;xxx.xxx.xxx.xxx"/>
<property name="clusterPort" value="9300"/>
</bean>

更新索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public boolean index(News news) {
try {
Client client = elasticsearchClientManager.getClient();
XContentBuilder contentBuilder = XContentFactory.jsonBuilder()
.startObject()
.field("cmsId", news.getCmsId())
.field("title", news.getTitle())
.field("mobileTitle", news.getMobileTitle())
.field("brief", news.getBrief())
.field("content", news.getContent())
.field("time", news.getTime())
.field("mediaId", news.getMediaId())
.field("channelId", news.getChannelId())
.field("categoryId", news.getCategoryId())
.field("img", news.getImg())
.field("url", news.getUrl())
.field("tags", news.getTags())
.field("tagList", news.getTagList())
.field("json", news.getJson())
.endObject();
BulkRequestBuilder requestBuilder = client.prepareBulk();
requestBuilder.add(client.prepareIndex("mp", "news", String.valueOf(news.getId())).setSource(contentBuilder));
BulkResponse bulkResponse = requestBuilder.execute().actionGet();
if(bulkResponse.hasFailures()) {
return false;
} else {
return true;
}
} catch (Exception e) {
logger.error("updateIndex error", e);
return false;
}
}

搜索文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
/**
* 在指定频道、指定类别中,根据搜索词查询,为了支持分页,需要设置查询结果的起始和数目
* @param wd
* @param channels
* @param categories
* @param from
* @param size
* @return
*/
public SearchHits search(String wd, List<Integer> channels, List<Integer> categories, int from, int size) {
Client client = elasticsearchClientManager.getClient();
QueryBuilder query = QueryBuilders.boolQuery().must(QueryBuilders.multiMatchQuery(wd, "title", "content"))
.must(QueryBuilders.termsQuery("channelId", channels))
.must(QueryBuilders.termsQuery("categoryId", categories));
SearchResponse searchResponse= client.prepareSearch("mp")
.setQuery(query)
.addSort("_score", SortOrder.DESC)
.addSort("time", SortOrder.DESC)
.setTypes("news")
.setFrom(from).setSize(size).setExplain(true)
.addHighlightedField("content")
.addHighlightedField("title")
.setHighlighterPreTags("<span style='color:red'>")
.setHighlighterPostTags("</span>")
.execute()
.actionGet();
return searchResponse.getHits();
}