pandas Python将Cassandra数据读入pandas

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/41247345/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-14 02:39:05  来源:igfitidea点击:

Python read Cassandra data into pandas

pythonpandascassandra

提问by ragesz

What is the proper and fastest wayto read Cassandra data into pandas? Now I use the following code but it's very slow...

将 Cassandra 数据读入Pandas的正确和最快的方法是什么?现在我使用以下代码,但速度很慢...

import pandas as pd

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

df = pd.DataFrame()

for row in session.execute(sql_query):
    df = df.append(pd.DataFrame(row, index=[0]))

df = df.reset_index(drop=True).fillna(pd.np.nan)

Reading 1000 rows takes 1 minute, and I have a "bit more"... If I run the same query eg. in DBeaver, I get the whole results (~40k rows) within a minute.

读取 1000 行需要 1 分钟,我还有“多一点”......如果我运行相同的查询,例如。在 DBeaver 中,我在一分钟内得到整个结果(~40k 行)。

Thank you!!!

谢谢!!!

回答by ragesz

I got the answer at the official mailing list(it works perfectly):

我在官方邮件列表中得到了答案(效果很好):

Hi,

try to define your own pandas row factory:

def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

session.row_factory = pandas_factory
session.default_fetch_size = None

query = "SELECT ..."
rslt = session.execute(query, timeout=None)
df = rslt._current_rows

That's the way i do it - an it should be faster...

If you find a faster method - i'm interested in :)

Michael

你好,

尝试定义您自己的 Pandas 行工厂:

def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

session.row_factory = pandas_factory
session.default_fetch_size = None

query = "SELECT ..."
rslt = session.execute(query, timeout=None)
df = rslt._current_rows

这就是我这样做的方式 - 它应该更快......

如果您找到更快的方法 - 我对 :)

迈克尔

回答by George C

What I do (in python 3) is :

我所做的(在 python 3 中)是:

query = "SELECT ..."
df = pd.DataFrame(list(session.execute(query)))

回答by Alok Garg

I have been working on moving data from Cassandra to mssql, and used answers given here for the reference, I am able to move data but my source table in cassandra is huge and my query is getting timeout error from cassandra , the thing is we cannot increase the timeout and I am only left with the option of selecting rows in batches in my query, my code is also converting the cassandra collection data types to str as I want to insert those in mssql and then parse it, please let me know if anyone faces similar issue, the code I built is given below:

我一直致力于将数据从 Cassandra 移动到 mssql,并使用此处给出的答案作为参考,我能够移动数据,但我在 cassandra 中的源表很大,而且我的查询从 cassandra 收到超时错误,问题是我们不能增加超时时间,我只剩下在查询中批量选择行的选项,我的代码也将 cassandra 集合数据类型转换为 str 因为我想在 mssql 中插入它们然后解析它,请让我知道是否任何人都面临类似的问题,我构建的代码如下:

import sys
import pandas as pd
import petl as etl
import pyodbc
import sqlalchemy
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from sqlalchemy import *
from cassandra.query import SimpleStatement


def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)
    engine = sqlalchemy.create_engine('sql_server_connection string')

cluster = Cluster(
    contact_points=['cassandra_host'], 
    auth_provider = PlainTextAuthProvider(username='username', password='passwrd')
)

session = cluster.connect('keyspace',wait_for_all_pools=True)

session.row_factory = pandas_factory
request_timeout = 60000
query = "SELECT * FROM cassandratable"
statement = SimpleStatement(query, fetch_size=5000) 
rows = session.execute(statement)

df = rows._current_rows
df['attributes'] = df.attributes.astype(str)
df['attributesgenerated'] = df.attributesgenerated.astype(str)
df['components'] = df.components.astype(str)
df['distributioncenterinfo'] = df.distributioncenterinfo.astype(str)
df['images'] = df.images.astype(str)
df['itemcustomerzonezoneproductids'] = 
df.itemcustomerzonezoneproductids.astype(str)
df['itempodconfigids'] = df.itempodconfigids.astype(str)
df['keywords'] = df.keywords.astype(str)
df['validationmessages'] = df.validationmessages.astype(str)
df['zones'] = df.zones.astype(str)
#error_bad_lines=False
#print(df)
df.to_sql(
           name='mssql_table_name',
           con=engine,
           index=False,
           if_exists='append',
           chunksize=1
         )

回答by KRISHNA

Fastest way to read Cassandra data into pandas with automatic iteration of pages. Create dictionary and add each to it by automatically iterating all pages. Then, create dataframe with this dictionary.

通过页面自动迭代将 Cassandra 数据读入 Pandas 的最快方法。创建字典并通过自动迭代所有页面将每个字典添加到其中。然后,使用此字典创建数据框。

import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

dictionary ={"column1":[],"column2":[]}

for row in session.execute(sql_query):
    dictionary["column1"].append(row.column1)
    dictionary["column1"].append(row.column1)

df = pd.DataFrame(dictionary)