Elasticsearch-66-修改IK分词器源码基于mysql热更新词库

上文中,我们如果要配置一个自定义的词语,或者停用词的时候,必须要手动添加到ik分词器的配置中,然后重启es节点,这样就很坑了,而且如果es集群中有上百个节点的话,那一个个的修改要疯了

通过修改ik分词器的源码,可以使用mysql作为词库,有词语更新的话,直接添加到mysql的表中就好了,不需要再去重启.

热更新方案

第一种:修改ik分词器源码,然后手动支持从mysql中每隔一定时间,自动加载新的词库
第二种:基于ik分词器原生支持的热更新方案,部署一个web服务器,提供一个http接口,通过modified和tag两个http响应头,来提供词语的热更新

第一种方案是比较常用的, 第二种呢ik git官方社区都不建议采用

源码下载

从github上把源码拉下来

1
git clone https://github.com/medcl/elasticsearch-analysis-ik.git

我们的es是5.2.0版本的,ik分词器也切换到5.2.0版本的分支上面

1
git checkout v5.2.0

切换完成后,直接用idea打开就好了

源码修改

第一步,在pom中加入mysql的依赖

1
2
3
4
5
6
<!-- mysql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>6.0.6</version>
</dependency>

第二步,配置mysql的连接,在config目录下创建一个.properties文件

1
2
3
4
5
6
7
8
9
jdbc.url=jdbc:mysql://localhost:3306/my_ik_word?allowMultiQueries=true&autoReconnect=true&useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT
jdbc.user=root
jdbc.password=123456
# 更新词库的语句
jdbc.reload.sql=select word from hot_words
# 更新停用词的语句
jdbc.reload.stopword.sql=select stop_word as word from hot_stop_words
# 隔多少时间去更新一次
jdbc.reload.interval=1000

第三步,新建一个线程,run方法中调用Dictionary类的reLoadMainDict()方法,就是让他去重新加载词典

1
2
3
4
5
6
7
8
9
10
11
12
public class HotDicReloadThread implements Runnable {

private static final Logger logger = ESLoggerFactory.getLogger(HotDicReloadThread.class.getName());

@Override
public void run() {
while (true){
logger.info("-------reload hot dic from mysql--------");
Dictionary.getSingleton().reLoadMainDict();
}
}
}

第四步,Dictionary类中,加入mysql驱动类

1
2
3
4
5
6
7
8
9
10
// prop用来获取上面的properties配置文件
private static Properties prop = new Properties();

static {
try {
Class.forName("com.mysql.jdbc.Driver");
} catch (ClassNotFoundException e) {
logger.error("error", e);
}
}

第五步,initial()方法中,启动刚刚创建的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* 词典初始化 由于IK Analyzer的词典采用Dictionary类的静态方法进行词典初始化
* 只有当Dictionary类被实际调用时,才会开始载入词典, 这将延长首次分词操作的时间 该方法提供了一个在应用加载阶段就初始化字典的手段
*
* @return Dictionary
*/
public static synchronized Dictionary initial(Configuration cfg) {
if (singleton == null) {
synchronized (Dictionary.class) {
if (singleton == null) {

singleton = new Dictionary(cfg);
singleton.loadMainDict();
singleton.loadSurnameDict();
singleton.loadQuantifierDict();
singleton.loadSuffixDict();
singleton.loadPrepDict();
singleton.loadStopWordDict();

// 执行更新词库的线程
new Thread(new HotDicReloadThread()).start();

if(cfg.isEnableRemoteDict()){
// 建立监控线程
for (String location : singleton.getRemoteExtDictionarys()) {
// 10 秒是初始延迟可以修改的 60是间隔时间 单位秒
pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
}
for (String location : singleton.getRemoteExtStopWordDictionarys()) {
pool.scheduleAtFixedRate(new Monitor(location), 10, 60, TimeUnit.SECONDS);
}
}

return singleton;
}
}
}
return singleton;
}

第六步,新添加一个loadMainDict()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
/**
* 从mysql中加载热更新词典
*/
private void loadMySqlExtDict(){
Connection connection = null;
Statement statement = null;
ResultSet resultSet = null;

try {
Path file = PathUtils.get(getDictRoot(),"jdbc-reload.properties");
prop.load(new FileInputStream(file.toFile()));

logger.info("-------jdbc-reload.properties-------");
for (Object key : prop.keySet()) {
logger.info("key:{}", prop.getProperty(String.valueOf(key)));
}

logger.info("------- query hot dict from mysql, sql:{}-------", prop.getProperty("jdbc.reload.sql"));

// 建立mysql连接
connection = DriverManager.getConnection(
prop.getProperty("jdbc.url"),
prop.getProperty("jdbc.user"),
prop.getProperty("jdbc.password")
);

// 执行查询
statement = connection.createStatement();
resultSet = statement.executeQuery(prop.getProperty("jdbc.reload.sql"));

// 循环输出查询啊结果,添加到Main.dict中去
while (resultSet.next()) {
String theWord = resultSet.getString("word");
logger.info("------hot word from mysql:{}------", theWord);

// 加到mainDict里面
_MainDict.fillSegment(theWord.trim().toCharArray());
}
} catch (Exception e) {
logger.error("error:{}", e);
} finally {
try {
if (resultSet != null) {
resultSet.close();
}
if (statement != null) {
statement.close();
}
if (connection != null) {
connection.close();
}
} catch (SQLException e){
logger.error("error", e);
}
}
}

第七步,在loadMainDict()方法最后,调用上面添加的这个方法

1
2
// 加载mysql词典
this.loadMySqlExtDict();

第八步,新添加loadMySqlStopwordDict()方法,用来从mysql中获取停用词

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* 从mysql中加载停用词
*/
private void loadMySqlStopwordDict(){
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;

try {
Path file = PathUtils.get(getDictRoot(), "jdbc-reload.properties");
prop.load(new FileInputStream(file.toFile()));

logger.info("-------jdbc-reload.properties-------");
for(Object key : prop.keySet()) {
logger.info("-------key:{}", prop.getProperty(String.valueOf(key)));
}

logger.info("-------query hot stopword dict from mysql, sql:{}",props.getProperty("jdbc.reload.stopword.sql"));

conn = DriverManager.getConnection(
prop.getProperty("jdbc.url"),
prop.getProperty("jdbc.user"),
prop.getProperty("jdbc.password"));
stmt = conn.createStatement();
rs = stmt.executeQuery(prop.getProperty("jdbc.reload.stopword.sql"));

while(rs.next()) {
String theWord = rs.getString("word");
logger.info("------- hot stopword from mysql: {}", theWord);
_StopWords.fillSegment(theWord.trim().toCharArray());
}

Thread.sleep(Integer.valueOf(String.valueOf(prop.get("jdbc.reload.interval"))));
} catch (Exception e) {
logger.error("error", e);
} finally {
try {
if(rs != null) {
rs.close();
}
if(stmt != null) {
stmt.close();
}
if(conn != null) {
conn.close();
}
} catch (SQLException e){
logger.error("error:{}", e);
}

}
}

第九步,在loadStopWordDict()方法最后,调用上面的更新停用词的方法

1
2
// 从mysql中加载停用词
this.loadMySqlStopwordDict();

至此,源码修改完毕,数据库的两个表如下

词库表

1
2
3
4
5
CREATE TABLE `hot_words` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`word` varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '词语',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

停用词库表

1
2
3
4
5
CREATE TABLE `hot_stop_words` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`stop_word` varchar(50) COLLATE utf8_unicode_ci DEFAULT NULL COMMENT '停用词',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

这些做完以后,maven打包项目

1
mvn clean package -DskipTests

打包完成后,在项目目录的target\releases 路径下面有个压缩包,解压到es\plugins\ik目录下,然后将mysql的驱动包丢进去, 之后重启es就完成了.

测试

在停用词的表中加入 “我”,然后去kibana中测试一下

1
2
3
4
5
GET /_analyze
{
"text": "我的",
"analyzer": "ik_max_word"
}

返回值:

1
2
3
{
"tokens": []
}

我, 这个停用词已经生效了,直接被干掉了.

修改后的项目

修改后的代码也传到我的github上去了,可以直接clone下来切换分支使用

1
git clone https://github.com/zhouze-java/elasticsearch-analysis-ik.git

切换分支

1
git checkout ik_zhouze

然后打包一下

1
mvn clean package -DskipTests

其他操作和上面一样.记得把mysql的驱动包丢进去