अक्का प्रवाह इनपुट (`में`) के रूप में उत्पादन (`बाहर`)

0

सवाल

मैं कोशिश कर रहा हूँ लिखने के लिए कोड का एक टुकड़ा है जो निम्नलिखित हैं:-

  1. पढ़ता है एक बड़ी सीएसवी फ़ाइल से दूरदराज के स्रोत की तरह s3.
  2. प्रक्रिया फ़ाइल रिकॉर्ड द्वारा रिकॉर्ड.
  3. अधिसूचना भेज करने के लिए उपयोगकर्ता
  4. लिखने उत्पादन करने के लिए एक दूरस्थ स्थान

नमूना रिकॉर्ड में इनपुट सीएसवी:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

मेरे इनपुट के मामले में वर्ग का प्रतिनिधित्व करता है जो एक रिकॉर्ड में इनपुट सीएसवी:

case class InputRecord(recordId: String, name: String, salary: Long)

नमूना में रिकॉर्ड उत्पादन सीएसवी (की जरूरत है कि करने के लिए लिखा जा सकता है):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

मेरी उत्पादन के मामले में वर्ग का प्रतिनिधित्व करता है जो एक रिकॉर्ड में इनपुट सीएसवी:

case class OutputRecord(recordId: String, name: String, designation: String)

पढ़ने के लिए एक का उपयोग कर रिकॉर्ड अक्का धारा सीएसवी (का उपयोग करता है Alpakka प्रतिक्रियाशील s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

अब मैं एक समारोह के लिए प्रक्रिया को रिकॉर्ड:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

समारोह में लिखने के लिए OutputRecord सीएसवी के रूप में

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

समारोह भेजने के लिए ईमेल अधिसूचना:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

यह सब एक साथ सिलाई

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

लाइन पर 15 और 16 मैं एक त्रुटि हो रही है, मैं कर रहा हूँ या तो जोड़ने के लिए सक्षम लाइन 15 या लाइन 16 लेकिन दोनों नहीं के बाद से दोनों notify & writeOutput की जरूरत है outputRecord. एक बार को सूचित कहा जाता है मुझे और मेरी भारतीय पत्नी outputRecord.

वहाँ एक तरीका है मैं जोड़ सकते हैं दोनों notify और writeOutput करने के लिए एक ही ग्राफ?

मैं नहीं देख रहा हूँ के लिए समानांतर निष्पादन के रूप में मैं चाहता हूँ करने के लिए पहली कॉल notify और फिर केवल writeOutput. तो, यह मददगार नहीं है: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

उपयोग के मामले में बहुत आसान लगता है मेरे लिए, लेकिन कुछ कैसे मैं कर रहा हूँ में सक्षम नहीं खोजने के लिए एक साफ समाधान है ।

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

सबसे अच्छा जवाब

1

उत्पादन की notify एक PushResultहै , लेकिन इनपुट के writeOutput है ByteString. एक बार जब आप परिवर्तन है कि यह संकलन होगा । मामले में आप की जरूरत है ByteString, एक ही से OutputRecord.

BTW, में नमूना कोड है कि आप प्रदान की है, इसी तरह की एक त्रुटि में मौजूद है readCSV और process.

2021-11-24 03:36:16

अन्य भाषाओं में

यह पृष्ठ अन्य भाषाओं में है

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................

इस श्रेणी में लोकप्रिय

लोकप्रिय सवाल इस श्रेणी में