V jeho docstring, elasticsearch.helpers.async_bulk
opisuje seba ako
Pomocník pre :meth:
~elasticsearch.AsyncElasticsearch.bulk
api, ktoré poskytuje viac ľudskej priateľské rozhranie - spotrebuje o iterator akcií a posiela ich, aby elasticsearch na kusy. zdroj
Súvislosti
Bol som pomocou AsyncElasticsearch.bulk()
úspešne poslať pandy dataframes na niektoré ES stupňa
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
Problém
Avšak, keď príde na async_bulk
, Som stále index is missing
chyby.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
Pokúsil naladiť _rec_to_actions()
v niekoľko spôsobov, ako bez veľkého účinku.
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
Myslím, že hlavným problémom je, že nie som celkom istý, vedieť, čo je akcia, v rámci elasticsearch. Táto predstava je všade v dokumentácii, ale nemá jasnú štruktúru dát náprotivok v tejto knižnici zdrojový kód (žiadne, ktoré by som mohol nájsť, rovnako)
Čo je presne to akčné a ako by som mal naladiť moje generátor poslať df údajov na self.index
?
životné prostredie
- python = "3.9.5"
- elasticsearch = "7.14.1"