之前使用logstash和filebeat收集日志到elasticsearch确实很方便,但是感觉不够灵活,而且要根据情况更改不同的配置,调试后又发现种种原因达不到要的效果。俗话说自己动手丰衣足食,所以干脆就尝试着自己写导入elasticsearch的脚本程序,现在就记录下遇到的一些问题和解决方法。
第一个问题是怎么将信息传到elasticsearch里面,elasticsearch是提供了RestfulAPI的,所以可以通过http PUT请求的方式将信息传入elasticsearch。但是呢,那样做会有点麻烦。所以我就使用了elasticsearch的python客户端模块。可以在python官网的PyPI上搜索下载。
安装好后,可以先在官方网站上看下文档熟悉下。
先贴下我的代码。
def senddata(self,body): es = Elasticsearch(["http://192.168.122.122:9200"]) indexTimeStamp = datetime.datetime.now().strftime('%Y.%m.%d.%H') indexName = "hello-" + indexTimeStamp if not es.indices.exists(indexName): es.indices.create(index = indexName,body = self.mapping,ignore = 400) es.index(index=indexName, doc_type="test", body=body)
这里定义了一个发送数据的方法,就是在发送数据前,先判断一下索引是否存在,不存在就用es.indices.create方法创建。然后通过es.index(index=indexName, doc_type="test", body=body)发送。其中body是一个字典。
@calTime def solvedata(self): log("开始处理文件" + self.filename) with open(self.filename) as f: for line in f: #去掉每行结尾换行符 line = line.strip('\n') body = self.splitdata(line) self.senddata(body)
我这里的数据源是文本文件。内容是以竖线"|"分割的各个字段。self.splitdata是拼出字典的方法,代码不贴了,简要说明一下。先定义一个列表作为字典的key,然后通过读取这个文件将内容分割成列表作为字典的velue。然后合并成一个字典,其中的某些字段需要做类型转换。读取每行后循环发送。这就是基本的功能。
好的,到这里基本功能已经可以了,但是想想就会发现其他问题已经出现了,首先是这样读取每行循环发送的速度可想而知,但是这个问题先放到后面。先来讲讲另一个问题。
这里我的数据源里有一个字段我要转成时间类型作为文档的时间戳,但是直接用datatime转换的话是不带UTC的,而我们在kibana里看到的是用UTC时间转换的。这样就会发现我们存在es里面的文档的时间戳差了八个小时。这个其实是kibana的展示问题,为了解决这个问题,我们可以在处理数据时加八小时,但是这个治标不治本。我们需要转时区。
这里我用到了python的pytz模块。
tempdata = datetime.datetime.strptime(dataLst[1],"%Y-%m-%d %H:%M:%S") tz = pytz.timezone("Asia/Shanghai") dataLst[1] = tz.localize(tempdata)
这里我为什么还要用localize方法,因为pytz不提供"Asia/Beijing"时区。。。。。。。。如果直接用"Asia/Shanghai"时区会发现时间差不是8小时,而是8小时6分钟。坑爹啊 = = 。所以这个要注意了。
好,解决时区问题后我们看看性能效率问题。elasticsearch提供了一个bulk功能。就是说将多条数据一次性发送给elasticsearch,这样就可以提高效率了。
from elasticsearch import Elasticsearch from elasticsearch import helpers
要用到bulk功能需要另外导入from elasticsearch import helpers
actionLst = [] with open(self.filename) as f: for line in f: #去掉每行结尾换行符 line = line.strip('\n') body = self.splitdata(line) action = { "_index": indexName, "_type": "test", "_source": body } actionLst.append(action) if len(actionLst) == 500: helpers.bulk(es,actionLst) actionLst = [] if len(actionLst) > 0: helpers.bulk(es,actionLst) actionLst = []
用bulk功能的话调用helpers.bulk方法就可以了这里的 es就是上面的es实例,actionLst是一个列表。每次到包含500条数据的时候,bulk发送过去。这里的数据也是一个字典但是不同于之前的,这里需要定义"_index" 索引名称,"_type"索引类型 ",而"_source"就是之前的具体数据了。通过测试导入数据快了7到8倍。
Cloudhu 个人随笔|built by django|
沪ICP备16019452号-1