pandas 从 ElasticSearch 结果创建 DataFrame

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/25186148/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-13 22:20:57  来源:igfitidea点击:

Creating DataFrame from ElasticSearch Results

pythonpandaselasticsearch

提问by Justin S

I am trying to build a DataFrame in pandas, using the results of a very basic query to ElasticSearch. I am getting the Data I need, but its a matter of slicing the results in a way to build the proper data frame. I really only care about getting the timestamp, and path, of each result. I have tried a few different es.search patterns.

我正在尝试使用对 ElasticSearch 的非常基本的查询结果在 Pandas 中构建一个 DataFrame。我得到了我需要的数据,但它以某种方式对结果进行切片以构建正确的数据框。我真的只关心获取每个结果的时间戳和路径。我尝试了几种不同的 es.search 模式。

Code:

代码:

from datetime import datetime
from elasticsearch import Elasticsearch
from pandas import DataFrame, Series
import pandas as pd
import matplotlib.pyplot as plt
es = Elasticsearch(host="192.168.121.252")
res = es.search(index="_all", doc_type='logs', body={"query": {"match_all": {}}}, size=2, fields=('path','@timestamp'))

This gives 4 chunks of data. [u'hits', u'_shards', u'took', u'timed_out']. My results are inside the hits.

这给出了 4 个数据块。[u'hits', u'_shards', u'took', u'timed_out']。我的结果在点击率之内。

res['hits']['hits']
Out[47]: 
[{u'_id': u'a1XHMhdHQB2uV7oq6dUldg',
  u'_index': u'logstash-2014.08.07',
  u'_score': 1.0,
  u'_type': u'logs',
  u'fields': {u'@timestamp': u'2014-08-07T12:36:00.086Z',
   u'path': u'app2.log'}},
 {u'_id': u'TcBvro_1QMqF4ORC-XlAPQ',
  u'_index': u'logstash-2014.08.07',
  u'_score': 1.0,
  u'_type': u'logs',
  u'fields': {u'@timestamp': u'2014-08-07T12:36:00.200Z',
   u'path': u'app1.log'}}]

The only things I care about, are getting the timestamp, and path for each hit.

我唯一关心的是获取时间戳和每次点击的路径。

res['hits']['hits'][0]['fields']
Out[48]: 
{u'@timestamp': u'2014-08-07T12:36:00.086Z',
 u'path': u'app1.log'}

I can not for the life of me figure out who to get that result, into a dataframe in pandas. So for the 2 results I have returned, I would expect a dataframe like.

我一生都无法弄清楚谁可以将结果放入Pandas的数据框中。因此,对于我返回的 2 个结果,我希望像这样的数据框。

   timestamp                   path
0  2014-08-07T12:36:00.086Z    app1.log
1  2014-08-07T12:36:00.200Z    app2.log

采纳答案by CT Zhu

There is a nice toy called pd.DataFrame.from_dictthat you can use in situation like this:

有一个很好的玩具pd.DataFrame.from_dict,你可以在这样的情况下使用它:

In [34]:

Data = [{u'_id': u'a1XHMhdHQB2uV7oq6dUldg',
      u'_index': u'logstash-2014.08.07',
      u'_score': 1.0,
      u'_type': u'logs',
      u'fields': {u'@timestamp': u'2014-08-07T12:36:00.086Z',
       u'path': u'app2.log'}},
     {u'_id': u'TcBvro_1QMqF4ORC-XlAPQ',
      u'_index': u'logstash-2014.08.07',
      u'_score': 1.0,
      u'_type': u'logs',
      u'fields': {u'@timestamp': u'2014-08-07T12:36:00.200Z',
       u'path': u'app1.log'}}]
In [35]:

df = pd.concat(map(pd.DataFrame.from_dict, Data), axis=1)['fields'].T
In [36]:

print df.reset_index(drop=True)
                 @timestamp      path
0  2014-08-07T12:36:00.086Z  app2.log
1  2014-08-07T12:36:00.200Z  app1.log

Show it in four steps:

分四步展示:

1, Read each item in the list (which is a dictionary) into a DataFrame

1、将列表中的每一项(即a dictionary)读入aDataFrame

2, We can put all the items in the list into a big DataFrameby concatthem row-wise, since we will do step#1 for each item, we can use mapto do it.

2,我们可以将列表中的所有项目DataFrameconcat行排列成一个大的,因为我们将对每个项目执行步骤#1,我们可以使用map它来做。

3, Then we access the columns labeled with 'fields'

3,然后我们访问标有的列 'fields'

4, We probably want to rotate the DataFrame90 degrees (transpose) and reset_indexif we want the index to be the default intsequence.

4,我们可能想旋转DataFrame90度(转置),reset_index如果我们希望索引为默认int序列。

enter image description here

在此处输入图片说明

回答by Brown nightingale

Or you could use the json_normalize function of pandas :

或者你可以使用 pandas 的 json_normalize 函数:

from pandas.io.json import json_normalize
df = json_normalize(res['hits']['hits'])

And then filtering the result dataframe by column names

然后按列名过滤结果数据框

回答by John D

Better yet, you can use the fantastic pandasticsearchlibrary:

更好的是,您可以使用奇妙的pandasticsearch库:

from elasticsearch import Elasticsearch
es = Elasticsearch('http://localhost:9200')
result_dict = es.search(index="recruit", body={"query": {"match_all": {}}})

from pandasticsearch import Select
pandas_df = Select.from_dict(result_dict).to_pandas()

回答by Erick Storck

I tested all the answers for performance and I found that the pandasticsearchapproach is the fastest by a large margin:

我测试了所有性能的答案,我发现该pandasticsearch方法在很大程度上是最快的:

tests:

测试:

test1 (using from_dict)

test1(使用 from_dict)

%timeit -r 2 -n 5 teste1(resp)

10.5 s ± 247 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)

每个循环 10.5 s ± 247 ms(平均值 ± 标准偏差,2 次运行,每次 5 次循环)

test2 (using a list)

test2(使用列表)

%timeit -r 2 -n 5 teste2(resp)

2.05 s ± 8.17 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)

每个循环 2.05 s ± 8.17 ms(平均值 ± 标准偏差,2 次运行,每次 5 次循环)

test3 (using import pandasticsearch as pdes)

test3(使用 import pandasticsearch 作为 pdes)

%timeit -r 2 -n 5 teste3(resp)

39.2 ms ± 5.89 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)

每个循环 39.2 ms ± 5.89 ms(2 次运行的平均值 ± 标准偏差,每次 5 次循环)

test4 (using from pandas.io.json import json_normalize)

test4(使用 from pandas.io.json import json_normalize)

%timeit -r 2 -n 5 teste4(resp)

387 ms ± 19 ms per loop (mean ± std. dev. of 2 runs, 5 loops each)

每个循环 387 ms ± 19 ms(2 次运行的平均值 ± 标准偏差,每次 5 次循环)

I hope its can be usefull for anyone

我希望它对任何人都有用

CODE:

代码:

index = 'teste_85'
    size = 10000
    fields = True
    sort = ['col1','desc']
    query = 'teste'
    range_gte = '2016-01-01'
    range_lte = 'now'
    resp = esc.search(index = index,
                        size = size,
                        scroll = '2m',
                        _source = fields,
                        doc_type = '_doc',
                        body = {
                            "sort" : { "{0}".format(sort[0]) : {"order" : "{0}".format(sort[1])}},
                            "query": {
                                    "bool": {
                                    "must": [
                                        { "query_string": { "query": "{0}".format(query) } },
                                        { "range": { "anomes": { "gte": "{0}".format(range_gte), "lte": "{0}".format(range_lte) } } },
                                    ]
                                    }
                                }
                                })

    def teste1(resp):
        df = pd.DataFrame(columns=list(resp['hits']['hits'][0]['_source'].keys()))
        for hit in resp['hits']['hits']:
            df = df.append(df.from_dict(hit['_source'], orient='index').T)
        return df

    def teste2(resp):
        col=list(resp['hits']['hits'][0]['_source'].keys())
        for hit in resp['hits']['hits']:
            df = pd.DataFrame(list(hit['_source'].values()), col).T
        return df

    def teste3(resp):
        df = pdes.Select.from_dict(resp).to_pandas()
        return df

    def teste4(resp):
        df = json_normalize(resp['hits']['hits'])
        return df

回答by John D

If your request is likely to return more than 10,000 documents from Elasticsearch, you will need to use the scrolling function of Elasticsearch. Documentation and examples for this function are rather difficult to find, so I will provide you with a full, working example:

如果您的请求很可能从 Elasticsearch 返回超过 10,000 个文档,您将需要使用 Elasticsearch 的滚动功能。这个函数的文档和例子很难找到,所以我会为你提供一个完整的、有效的例子:

import pandas as pd
from elasticsearch import Elasticsearch
import elasticsearch.helpers


es = Elasticsearch('http://localhost:9200')

body={"query": {"match_all": {}}}
results = elasticsearch.helpers.scan(es, query=body, index="my_index")
df = pd.DataFrame.from_dict([document['_source'] for document in results])

Simply edit the fields that start with "my_" to correspond to your own values

只需编辑以“my_”开头的字段以对应您自己的值

回答by thorwhalen

Here's a bit of code you might find useful for your work. It's simple, and extendible, but has been saving me a lot of time when faced with just "grabbing" some data from ElasticSearch to analyze.

下面是一些您可能会发现对您的工作有用的代码。它很简单,而且可扩展,但在面对从 ElasticSearch 中“抓取”一些数据进行分析时,它为我节省了大量时间。

If you just want to grab all the data of a given index and doc_type of your localhost you can do:

如果您只想获取本地主机的给定索引和 doc_type 的所有数据,您可以执行以下操作:

df = ElasticCom(index='index', doc_type='doc_type').search_and_export_to_df()

You can use any of the arguments you'd usually use in elasticsearch.search(), or specify a different host. You can also choose whether to include the _id or not, and specify whether the data is in '_source' or 'fields' (it tries to guess). It also tries to convert the field values by default (but you can switch that off).

您可以使用通常在 elasticsearch.search() 中使用的任何参数,或指定不同的主机。您还可以选择是否包含 _id,并指定数据是在 '_source' 或 'fields' 中(它会尝试猜测)。它还尝试默认转换字段值(但您可以将其关闭)。

Here's the code:

这是代码:

from elasticsearch import Elasticsearch
import pandas as pd


class ElasticCom(object):

    def __init__(self, index, doc_type, hosts='localhost:9200', **kwargs):
        self.index = index
        self.doc_type = doc_type
        self.es = Elasticsearch(hosts=hosts, **kwargs)

    def search_and_export_to_dict(self, *args, **kwargs):
        _id = kwargs.pop('_id', True)
        data_key = kwargs.pop('data_key', kwargs.get('fields')) or '_source'
        kwargs = dict({'index': self.index, 'doc_type': self.doc_type}, **kwargs)
        if kwargs.get('size', None) is None:
            kwargs['size'] = 1
            t = self.es.search(*args, **kwargs)
            kwargs['size'] = t['hits']['total']

        return get_search_hits(self.es.search(*args, **kwargs), _id=_id, data_key=data_key)

    def search_and_export_to_df(self, *args, **kwargs):
        convert_numeric = kwargs.pop('convert_numeric', True)
        convert_dates = kwargs.pop('convert_dates', 'coerce')
        df = pd.DataFrame(self.search_and_export_to_dict(*args, **kwargs))
        if convert_numeric:
            df = df.convert_objects(convert_numeric=convert_numeric, copy=True)
        if convert_dates:
            df = df.convert_objects(convert_dates=convert_dates, copy=True)
        return df

def get_search_hits(es_response, _id=True, data_key=None):
    response_hits = es_response['hits']['hits']
    if len(response_hits) > 0:
        if data_key is None:
            for hit in response_hits:
                if '_source' in hit.keys():
                    data_key = '_source'
                    break
                elif 'fields' in hit.keys():
                    data_key = 'fields'
                    break
            if data_key is None:
                raise ValueError("Neither _source nor fields were in response hits")

        if _id is False:
            return [x.get(data_key, None) for x in response_hits]
        else:
            return [dict(_id=x['_id'], **x.get(data_key, {})) for x in response_hits]
    else:
        return []

回答by zwep

For anyone that encounters this question as well.. @CT Zhu has a nice answer, but I think it is a bit outdated.but when you are using the elasticsearch_dsl package. The result is a bit different. Try this in that case:

对于也遇到这个问题的任何人..@CT Zhu 有一个很好的答案,但我认为它有点过时了。但是当您使用 elasticsearch_dsl 包时。结果有点不同。在这种情况下试试这个:

# Obtain the results..
res = es_dsl.Search(using=con, index='_all')
res_content = res[0:100].execute()
# convert it to a list of dicts, by using the .to_dict() function
res_filtered = [x['_source'].to_dict() for x in res_content['hits']['hits']]

# Pass this on to the 'from_dict' function
A = pd.DataFrame.from_dict(res_filtered)

回答by HerrIvan

With elasticsearch_dslyou can search documents, get them by id, etc.

随着elasticsearch_dsl您可以搜索文件,由ID等让他们

For instance

例如

from elasticsearch_dsl import Document

# retrieve document whose _id is in the list of ids
s = Document.mget(ids,using=es_connection,index=myindex)

or

或者

from elasticsearch_dsl import Search

# get (up to) 100 documents from a given index
s = Search(using=es_connection,index=myindex).extra(size=100)

then, in case you want to create a DataFrameand use the elasticsearch ids in your dataframe index, you can do as follows:

然后,如果您想DataFrame在数据帧索引中创建并使用 elasticsearch id,您可以执行以下操作:

df = pd.DataFrame([{'id':r.meta.id, **r.to_dict()} 
                            for r 
                            in s.execute()]).set_index('id',drop=True)