मैं कोशिश कर रहा हूँ लिखने के लिए कोड का एक टुकड़ा है जो निम्नलिखित हैं:-
- पढ़ता है एक बड़ी सीएसवी फ़ाइल से दूरदराज के स्रोत की तरह s3.
- प्रक्रिया फ़ाइल रिकॉर्ड द्वारा रिकॉर्ड.
- अधिसूचना भेज करने के लिए उपयोगकर्ता
- लिखने उत्पादन करने के लिए एक दूरस्थ स्थान
नमूना रिकॉर्ड में इनपुट सीएसवी:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
मेरे इनपुट के मामले में वर्ग का प्रतिनिधित्व करता है जो एक रिकॉर्ड में इनपुट सीएसवी:
case class InputRecord(recordId: String, name: String, salary: Long)
नमूना में रिकॉर्ड उत्पादन सीएसवी (की जरूरत है कि करने के लिए लिखा जा सकता है):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
मेरी उत्पादन के मामले में वर्ग का प्रतिनिधित्व करता है जो एक रिकॉर्ड में इनपुट सीएसवी:
case class OutputRecord(recordId: String, name: String, designation: String)
पढ़ने के लिए एक का उपयोग कर रिकॉर्ड अक्का धारा सीएसवी (का उपयोग करता है Alpakka प्रतिक्रियाशील s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
अब मैं एक समारोह के लिए प्रक्रिया को रिकॉर्ड:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
समारोह में लिखने के लिए OutputRecord सीएसवी के रूप में
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
समारोह भेजने के लिए ईमेल अधिसूचना:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
यह सब एक साथ सिलाई
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
लाइन पर 15 और 16 मैं एक त्रुटि हो रही है, मैं कर रहा हूँ या तो जोड़ने के लिए सक्षम लाइन 15 या लाइन 16 लेकिन दोनों नहीं के बाद से दोनों notify
& writeOutput
की जरूरत है outputRecord
. एक बार को सूचित कहा जाता है मुझे और मेरी भारतीय पत्नी outputRecord
.
वहाँ एक तरीका है मैं जोड़ सकते हैं दोनों notify
और writeOutput
करने के लिए एक ही ग्राफ?
मैं नहीं देख रहा हूँ के लिए समानांतर निष्पादन के रूप में मैं चाहता हूँ करने के लिए पहली कॉल notify
और फिर केवल writeOutput
. तो, यह मददगार नहीं है: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
उपयोग के मामले में बहुत आसान लगता है मेरे लिए, लेकिन कुछ कैसे मैं कर रहा हूँ में सक्षम नहीं खोजने के लिए एक साफ समाधान है ।