那年那日那朵花

".......(o´ω`o)......"

用elasticsearch做博客全文搜索

2017-01-20 18:23 linux python elk docker

这几天闲的没事,准备把我这个博客的搜索功能升下级。原来我的搜索是根据文章标题字段做的查询,现在准备用elasticsearch做为后台搜索引擎来做全文搜索。

主要的步骤分为

  • docker运行elasticsearch服务
  • 博客文章的同步
  • 具体实现

首先要做后台搜索就需要部署一下elasticsearch,同时为了支持中文需要用到ik分词插件。elasticsearch运行需要有java环境,由于我这台服务器上面没有安装过java。而且后面要用到的ik分词插件需要maven打包。这么搞需要装非常多的依赖,虽然这样没什么问题但我个人还是希望这台服务器能干净点,所以准备用docker来运行elasticsearch。

这里的话elasticsearch是有官方镜像的,可以直接拿来使用。但是官方的镜像中不包含ik分词插件,那这样的话需要自己写个dockerfile进行镜像创建。在写dockerfile之前,需要先编译打包ik分词插件,到时候直接打进镜像中。

我这里用的是2.3.5elasticsearch + 1.9.5 ikik分词的github。找到对应版本,照着步骤编译打包即可。我是在一台虚拟机上操作的,这样可以保持线上服务器的纯净。编译打包后的ik分词插件和dockerfile我也已经放在github上了elasticsearch2.3.5+ik1.9.5

下面这个就是dockerfile的内容了,只要把ik分词放到elasticsearch的plugin目录中就好了。

FROM elasticsearch:2.3.5
WORKDIR /usr/share/elasticsearch
VOLUME /usr/share/elasticsearch/logs
ADD elasticsearch-analysis-ik-1.9.5.zip /tmp/
RUN unzip /tmp/elasticsearch-analysis-ik-1.9.5.zip -d /usr/share/elasticsearch/plugins/ik

目录结构如下

tree
.
├── Dockerfile
└── elasticsearch-analysis-ik-1.9.5.zip

运行** docker build -t myes . ** 制作镜像 注意有个点。

#这里指定本机127.0.0.1不然docker会把9200端口暴露在公网上,即使开了防火墙也不生效。
docker run  -p 127.0.0.1:9200:9200 -v /es/data:/usr/share/elasticsearch/data  -v /es/logs:/usr/share/elasticsearch/logs  -d myes

用以上命令启动容器即可。可以通过执行docker logs containerID 查看启动日志

然后可以照着ik分词github上的方式做下验证,具体如下

curl -XPUT http://localhost:9200/index

#这里content的type要改为string。ik分词github上的应该是新版es才能用的text,而这里我用的2.3.5版不支持
curl -XPOST http://localhost:9200/index/fulltext/_mapping -d'
{
    "fulltext": {
             "_all": {
            "analyzer": "ik_max_word",
            "search_analyzer": "ik_max_word",
            "term_vector": "no",
            "store": "false"
        },
        "properties": {
            "content": {
                "type": "string",
                "analyzer": "ik_max_word",
                "search_analyzer": "ik_max_word",
                "include_in_all": "true",
                "boost": 8
            }
        }
    }
}'

curl -XPOST http://localhost:9200/index/fulltext/1 -d'
{"content":"美国留给伊拉克的是个烂摊子吗"}
'
curl -XPOST http://localhost:9200/index/fulltext/2 -d'
{"content":"公安部:各地校车将享最高路权"}
'
curl -XPOST http://localhost:9200/index/fulltext/3 -d'
{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}
'
curl -XPOST http://localhost:9200/index/fulltext/4 -d'
{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}
'

curl -XPOST http://localhost:9200/index/fulltext/_search  -d'
{
    "query" : { "match" : { "content" : "中国" }},
    "highlight" : {
        "pre_tags" : ["<tag1>", "<tag2>"],
        "post_tags" : ["</tag1>", "</tag2>"],
        "fields" : {
            "content" : {}
        }
    }
}
'

下面的是返回结果。已经部署ok了

{"took":125,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":2,"max_score":1.5,"hits":[{"_index":"index","_type":"fulltext","_id":"4","_score":1.5,"_source":
{"content":"中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"}
,"highlight":{"content":["<tag1>中国</tag1>驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首"]}},{"_index":"index","_type":"fulltext","_id":"3","_score":0.53699243,"_source":
{"content":"中韩渔警冲突调查:韩警平均每天扣1艘中国渔船"}
,"highlight":{"content":["中韩渔警冲突调查:韩警平均每天扣1艘<tag1>中国</tag1>渔船"]}}]}}

第二点是博客文章的同步功能,这里有两种方案,我的选择是第二种。

  • 用elasticsearch-jdbc定时增量的将blog文章导入elasticsearch
  • 自己动手丰衣足食

由于我后台数据库用的是mysql,网上也有现成的工具elasticsearch-jdbc可以将mysql中的数据同步到elasticsearch中,具体同步哪些信息通过自己写sql来实现,elasticsearch-jdbc会定时批量的将sql查询出的结果导入到elasticsearch中。但是elasticsearch-jdbc还是会有一个时效性的问题,毕竟不是实时,而且还要运行java。。。故我决定自己写这部分的逻辑。

其实自己实现还是很简单的,由于只是个人博客,数据量无视,只要在每次保存文章的时候将信息同步到elasticsearch中就可以了,新增或修改文章的时候将他的主键作为elasticsearch文档中的_id即可。因为使用了django的model模块,表都是自己定义的一个个class,这个class继承models.Model。那么就只要改写他的save方法就可以了。每当保存的时候先调用父类的save方法,也就是原来的save方法,然后再添加进我们要的逻辑。具体的实现如下。

class Article(models.Model):
    title = models.CharField(max_length = 60,verbose_name = u'标题')
    content = models.TextField(verbose_name = u'内容')
    timestamp = models.DateTimeField(auto_now_add = True,verbose_name = u'时间戳')
    last_modified = models.DateTimeField(auto_now = True,verbose_name = u'最后修改时间')
    tag = models.ManyToManyField('TagInfo',blank = True,verbose_name = u'分类标签')
    pic_height=models.PositiveIntegerField(default = 530)
    pic_width=models.PositiveIntegerField(default = 530)
    pic = models.ImageField(upload_to = 'pic/%Y/%m/%d',blank = True,height_field='pic_height', width_field='pic_width')
    status = models.CharField(max_length=1, choices=STATUS_CHOICES,default='d')
    def __unicode__(self):
        return self.title

    #overwrite model save method,在保存后同步指定字段信息到elasticsearch里面。
    def save(self, *args, **kwargs):
        super(self.__class__,self).save(*args, **kwargs)
        try:
            esinsert = {}
            esinsert['title'] = self.title
            esinsert['content'] = self.content
            esinsert['status'] = self.status
            esinsert['createtime'] = trans_localdate_format(self.timestamp)
            #print esinsert
            sync_es(esinsert,self.id)
        except Exception,e:
            print e
            print "sync elasticsearch error"

这里trans_localdate_format方法是将时间格式转换为本地的,sync_es方法是将信息导入到elasticsearch的。导入elasticsearch的话,主要用python的客户端elasticsearch.py来实现。

后面就是具体的实现了。这个又分为数据导入和查询。

导入的话,先创建索引,然后要定义一个mapping映射,因为默认的es分词解析器是不支持的中文的。例如"上海"这个词,用elasticsearch默认的解析器是分为一个"上"和一个"海",而不是"上海"这个词语。所以才要装ik分词插件,这里直接用ik_max_word最大颗粒度的ik分词解析器。

def sync_es(inputdict,idnum):
    es = Elasticsearch(["http://127.0.0.1:9200"])
    articlemapping = {
      "mappings" : {
        "article" : {
            "_all": {
                    "analyzer": "ik_max_word",
                    "search_analyzer": "ik_max_word",
                    "term_vector": "no",
                    "store": "false"
                    },
            "properties" : {
                    "title" : { 
                                "type" : "string", 
                                "analyzer": "ik_max_word",
                                "search_analyzer": "ik_max_word",
                                "include_in_all": "true",
                                "boost": 8
                                },
                    "content" : { 
                                "type" : "string", 
                                "analyzer": "ik_max_word",
                                "search_analyzer": "ik_max_word",
                                "include_in_all": "true",
                                "boost": 8
                                },
                            }
             }
        }
    }
    indexName = "blog"
    if not es.indices.exists(indexName):
        es.indices.create(index = indexName, body = articlemapping,ignore = 400)
    return es.index(index=indexName, doc_type="article", body=inputdict, id=idnum)

然后查询的话通过前台页面提交表单,将查询语句拼接成字典即可查询,然后就可以把查询到的对象做进一步处理了。

def search_result(queryStatements):
    es = Elasticsearch(["http://127.0.0.1:9200"])
    indexName = "blog"
    queryBody = {
        "query" : { 
            "query_string" : {
                "analyze_wildcard" : "true",
                "query" : queryStatements
            }
        }
    }
    #print queryBody
    queryResult = es.search(index=indexName,body=queryBody)
    return queryResult

同时我还在django的admin模块中添加了批量导入elasticsearch功能。实现方法也比较简单,主要是利用了admin模块现有的封装。

#add manual sync to es 
def sync_to_elasticsearch(self, request, queryset):
    for i in queryset:
        try:
            esinsert = {}
            esinsert['title'] = i.title
            esinsert['content'] = i.content
            esinsert['status'] = i.status
            esinsert['createtime'] = trans_localdate_format(i.timestamp)
            #print esinsert
            sync_es(esinsert,i.id)
            self.message_user(request, "sync to elasticsearch successfully.")
        except:
            self.message_user(request, "sync to elasticsearch happen wrong.")

然后定义一个类继承admin.ModelAdmin的类,给他添加属性actions = [sync_to_elasticsearch],然后在admin.site.register注册表的时候传进去就可以了。

总的来说这次博客搜索功能的升级,主要就这三块,剩余的就是细节部分了。

Cloudhu 个人随笔|built by django|

沪ICP备16019452号-1