连接到 Hive 并使用 Pandas 创建表

声明:本页面是StackOverFlow热门问题的中英对照翻译,遵循CC BY-SA 4.0协议,如果您需要使用它,必须同样遵循CC BY-SA许可,注明原文地址和作者信息,同时你必须将它归于原作者(不是我):StackOverFlow 原文地址: http://stackoverflow.com/questions/47964385/
Warning: these are provided under cc-by-sa 4.0 license. You are free to use/share it, But you must attribute it to the original authors (not me): StackOverFlow

提示:将鼠标放在中文语句上可以显示对应的英文。显示中英文
时间:2020-09-14 04:57:24  来源:igfitidea点击:

Connect to Hive and create tables using pandas

pythonpandashive

提问by User12345

I have a pandasdata frame in PYTHON. I want to create/load this data frame into a hive table.

pandasPYTHON. 我想将此数据框创建/加载到配置单元表中。

I know that we can create sparkdata frame from pandasdata frame and create hive table.

我知道我们可以sparkpandas数据框创建数据框并创建配置单元表。

I would like to do this using pure pythonway not using pyspark.

我想使用纯粹的python方式来做到这一点,而不是使用pyspark.

I have installed pyhivepyhs2in my local python

我已经安装pyhivepyhs2在我的本地python

Using pyhive

使用 pyhive

conn_1 = hive.Connection(host=hive_host, port=10000, username=username)

I receive the following error:

我收到以下错误:

NotImplementedError: Wrong number of arguments for overloaded function 'Client_setAttr'.
  Possible C/C++ prototypes are:
    setAttr(saslwrapper::Client *,std::string const &,std::string const &)
    setAttr(saslwrapper::Client *,std::string const &,uint32_t)

Using pyhs2:

使用pyhs2

conn_1 = pyhs2.connect(host='host', port=10000,authMechanism="NOSASL", user='hive', database='default')

returns

回报

TTransportException: TSocket read 0 bytes

How can I connect to hiveand create hive tables using pandas

如何hive使用连接到配置单元表并创建配置单元表pandas

Error for pyhive

Error for pyhive

---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
<ipython-input-7-bf6a49c722d5> in <module>()
----> 1 conn_1 = hive.Connection(host=hive_host, port=10000, username=username)

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\pyhive\hive.pyc in __init__(self, host, port, username, database, auth, configuration, kerberos_service_name, password, thrift_transport)
    157 
    158         try:
--> 159             self._transport.open()
    160             open_session_req = ttypes.TOpenSessionReq(
    161                 client_protocol=protocol_version,

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\thrift_sasl\__init__.pyc in open(self)
     72         type=TTransportException.NOT_OPEN,
     73         message="Already open!")
---> 74     self.sasl = self.sasl_client_factory()
     75 
     76     ret, chosen_mech, initial_response = self.sasl.start(self.mechanism)

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\pyhive\hive.pyc in sasl_factory()
    134                 def sasl_factory():
    135                     sasl_client = sasl.Client()
--> 136                     sasl_client.setAttr('host', host)
    137                     if sasl_auth == 'GSSAPI':
    138                         sasl_client.setAttr('service', kerberos_service_name)

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\sasl\saslwrapper.pyc in setAttr(*args)
     89     __swig_destroy__ = _saslwrapper.delete_Client
     90     __del__ = lambda self : None;
---> 91     def setAttr(*args): return _saslwrapper.Client_setAttr(*args)
     92     def init(*args): return _saslwrapper.Client_init(*args)
     93     def start(*args): return _saslwrapper.Client_start(*args)

NotImplementedError: Wrong number of arguments for overloaded function 'Client_setAttr'.
  Possible C/C++ prototypes are:
    setAttr(saslwrapper::Client *,std::string const &,std::string const &)
    setAttr(saslwrapper::Client *,std::string const &,uint32_t)

Error for pyhs2

Error for pyhs2

---------------------------------------------------------------------------
TTransportException                       Traceback (most recent call last)
<ipython-input-6-01e06bdcc707> in <module>()
----> 1 conn_1 = pyhs2.connect(host='host', port=10000,authMechanism="NOSASL", user='hive', database='default')

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\pyhs2\__init__.pyc in connect(*args, **kwargs)
      5     """
      6     from .connections import Connection
----> 7     return Connection(*args, **kwargs)

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\pyhs2\connections.pyc in __init__(self, host, port, authMechanism, user, password, database, configuration, timeout)
     45         self.client = TCLIService.Client(TBinaryProtocol(transport))
     46         transport.open()
---> 47         res = self.client.OpenSession(TOpenSessionReq(username=user, password=password, configuration=configuration))
     48         self.session = res.sessionHandle
     49         if database is not None:

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\pyhs2\TCLIService\TCLIService.pyc in OpenSession(self, req)
    152     """
    153     self.send_OpenSession(req)
--> 154     return self.recv_OpenSession()
    155 
    156   def send_OpenSession(self, req):

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\pyhs2\TCLIService\TCLIService.pyc in recv_OpenSession(self)
    163 
    164   def recv_OpenSession(self, ):
--> 165     (fname, mtype, rseqid) = self._iprot.readMessageBegin()
    166     if mtype == TMessageType.EXCEPTION:
    167       x = TApplicationException()

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\thrift\protocol\TBinaryProtocol.pyc in readMessageBegin(self)
    146                 raise TProtocolException(type=TProtocolException.BAD_VERSION,
    147                                          message='No protocol version header')
--> 148             name = self.trans.readAll(sz)
    149             type = self.readByte()
    150             seqid = self.readI32()

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\thrift\transport\TTransport.pyc in readAll(self, sz)
     58         have = 0
     59         while (have < sz):
---> 60             chunk = self.read(sz - have)
     61             have += len(chunk)
     62             buff += chunk

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\thrift\transport\TTransport.pyc in read(self, sz)
    159         if len(ret) != 0:
    160             return ret
--> 161         self.__rbuf = BufferIO(self.__trans.read(max(sz, self.__rbuf_size)))
    162         return self.__rbuf.read(sz)
    163 

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\thrift\transport\TSocket.pyc in read(self, sz)
    130         if len(buff) == 0:
    131             raise TTransportException(type=TTransportException.END_OF_FILE,
--> 132                                       message='TSocket read 0 bytes')
    133         return buff
    134 

TTransportException: TSocket read 0 bytes

Error after trying @Alvaro Joao code

尝试@Alvaro Joao 代码后出错

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-19-a1d7696d3c89> in <module>()
----> 1 cursor = hive.connect(dsn= hive_host+':10000', user=username,password=password).cursor()

C:\Users\viru\AppData\Local\Continuum\Anaconda2\lib\site-packages\pyhive\hive.pyc in connect(*args, **kwargs)
     62     :returns: a :py:class:`Connection` object.
     63     """
---> 64     return Connection(*args, **kwargs)
     65 
     66 

TypeError: __init__() got an unexpected keyword argument 'dsn'

采纳答案by Alvaro Joao

right API call:

正确的 API 调用:

from pyhive import hive
cursor = hive.connect(dsn= hive_host+':10000', user=username,password=password).cursor()
cursor.execute('SELECT * FROM my_awesome_data LIMIT 10', async=True)

reference: https://pypi.python.org/pypi/PyHive

参考:https: //pypi.python.org/pypi/PyHive