那年那日那朵花

".......(o´ω`o)......"

使用elasticsearch-py

2016-09-08 15:32 python elk

之前使用logstash和filebeat收集日志到elasticsearch确实很方便,但是感觉不够灵活,而且要根据情况更改不同的配置,调试后又发现种种原因达不到要的效果。俗话说自己动手丰衣足食,所以干脆就尝试着自己写导入elasticsearch的脚本程序,现在就记录下遇到的一些问题和解决方法。

第一个问题是怎么将信息传到elasticsearch里面,elasticsearch是提供了RestfulAPI的,所以可以通过http PUT请求的方式将信息传入elasticsearch。但是呢,那样做会有点麻烦。所以我就使用了elasticsearch的python客户端模块。可以在python官网的PyPI上搜索下载。

安装好后,可以先在官方网站上看下文档熟悉下。

正常的导入

先贴下我的代码。

def senddata(self,body):
    es = Elasticsearch(["http://192.168.122.122:9200"])
    indexTimeStamp = datetime.datetime.now().strftime('%Y.%m.%d.%H')
    indexName = "hello-" + indexTimeStamp
    if not es.indices.exists(indexName):
        es.indices.create(index = indexName,body = self.mapping,ignore = 400)
    es.index(index=indexName, doc_type="test", body=body)

这里定义了一个发送数据的方法,就是在发送数据前,先判断一下索引是否存在,不存在就用es.indices.create方法创建。然后通过es.index(index=indexName, doc_type="test", body=body)发送。其中body是一个字典。

@calTime
def solvedata(self):
    log("开始处理文件" + self.filename)
    with open(self.filename) as f:
        for line in f:
            #去掉每行结尾换行符
            line = line.strip('\n')
            body = self.splitdata(line)
            self.senddata(body)

我这里的数据源是文本文件。内容是以竖线"|"分割的各个字段。self.splitdata是拼出字典的方法,代码不贴了,简要说明一下。先定义一个列表作为字典的key,然后通过读取这个文件将内容分割成列表作为字典的velue。然后合并成一个字典,其中的某些字段需要做类型转换。读取每行后循环发送。这就是基本的功能。

好的,到这里基本功能已经可以了,但是想想就会发现其他问题已经出现了,首先是这样读取每行循环发送的速度可想而知,但是这个问题先放到后面。先来讲讲另一个问题。

时区问题

这里我的数据源里有一个字段我要转成时间类型作为文档的时间戳,但是直接用datatime转换的话是不带UTC的,而我们在kibana里看到的是用UTC时间转换的。这样就会发现我们存在es里面的文档的时间戳差了八个小时。这个其实是kibana的展示问题,为了解决这个问题,我们可以在处理数据时加八小时,但是这个治标不治本。我们需要转时区。

这里我用到了python的pytz模块。

tempdata = datetime.datetime.strptime(dataLst[1],"%Y-%m-%d %H:%M:%S")
tz = pytz.timezone("Asia/Shanghai")
dataLst[1] = tz.localize(tempdata)

这里我为什么还要用localize方法,因为pytz不提供"Asia/Beijing"时区。。。。。。。。如果直接用"Asia/Shanghai"时区会发现时间差不是8小时,而是8小时6分钟。坑爹啊 = = 。所以这个要注意了。

使用Bulk处理

好,解决时区问题后我们看看性能效率问题。elasticsearch提供了一个bulk功能。就是说将多条数据一次性发送给elasticsearch,这样就可以提高效率了。

from elasticsearch import Elasticsearch
from elasticsearch import helpers

要用到bulk功能需要另外导入from elasticsearch import helpers

actionLst = []
with open(self.filename) as f:
    for line in f:
        #去掉每行结尾换行符
        line = line.strip('\n')
        body = self.splitdata(line)
        action = {
                    "_index": indexName,
                    "_type": "test",
                    "_source": body
                    }
        actionLst.append(action)
        if len(actionLst) == 500:
            helpers.bulk(es,actionLst)
            actionLst = []

if len(actionLst) > 0:
    helpers.bulk(es,actionLst)
    actionLst = []

用bulk功能的话调用helpers.bulk方法就可以了这里的 es就是上面的es实例,actionLst是一个列表。每次到包含500条数据的时候,bulk发送过去。这里的数据也是一个字典但是不同于之前的,这里需要定义"_index" 索引名称,"_type"索引类型 ",而"_source"就是之前的具体数据了。通过测试导入数据快了7到8倍。

心情

Cloudhu 个人随笔|built by django|

沪ICP备16019452号-1