इसके docstring, elasticsearch.helpers.async_bulk
खुद का वर्णन के रूप में एक
सहायक के लिए :मेथ:
~elasticsearch.AsyncElasticsearch.bulk
एपीआई प्रदान करता है कि एक और अधिक मानव के अनुकूल इंटरफ़ेस - यह खपत एक iterator के कार्यों और के लिए उन्हें भेजता है elasticsearch में हिस्सा । स्रोत
संदर्भ
मैं का उपयोग किया गया है AsyncElasticsearch.bulk()
सफलतापूर्वक भेजने के लिए पंडों dataframes करने के लिए कुछ ES उदाहरण
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
yield ('{ "index" : { "_index" : "%s" }}' % (self.index))
yield (json.dumps(record, default=int))
async def send_to_elasticsearch(self, df: DataFrame):
logger.info(f"{self.stage_name} sending batch to elastic")
await self.elastic_client.bulk(self._rec_to_actions(df))
मुद्दा
हालांकि, जब यह आता है करने के लिए async_bulk
, मैं हो रही हूँ index is missing
त्रुटियों.
async def send_to_elasticsearch(self, df: DataFrame):
await async_bulk(self.elastic_client, self._rec_to_actions(df))
की कोशिश की धुन करने के लिए _rec_to_actions()
कई मायनों में बिना ज्यादा प्रभाव पड़ता है ।
def _rec_to_actions(self, df):
for record in df.to_dict(orient="records"):
record["index"] = self.index
yield (json.dumps(record, default=int))
मुझे लगता है कि मुख्य समस्या यह है कि मैं नहीं कर रहा हूँ बहुत यकीन है कि पता करने के लिए क्या है एक कार्रवाईके संदर्भ में, elasticsearch. इस धारणा में हर जगह है प्रलेखन लेकिन नहीं है एक स्पष्ट डेटा संरचना समकक्ष में इस पुस्तकालय के स्रोत कोड (कोई नहीं है कि मैं मिल सकता है, वैसे भी)
क्या वास्तव में एक कार्रवाई और कैसे चाहिए मैं धुन मेरी जनरेटर भेजने के लिए df के डेटा के लिए self.index
?
पर्यावरण
- अजगर = "3.9.5"
- elasticsearch = "7.14.1"