Concatinating दो प्रवाह में अक्का स्ट्रीम

0

सवाल

मैं कोशिश कर रहा हूँ करने के लिए concat दो बहती है और मैं नहीं कर रहा हूँ समझाने के लिए सक्षम उत्पादन के कार्यान्वयन.

val source = Source(1 to 10)
val sink = Sink.foreach(println)

val flow1 = Flow[Int].map(s => s + 1)
val flow2 = Flow[Int].map(s => s * 10)

val flowGraph = Flow.fromGraph(
    GraphDSL.create() { implicit builder =>
      import GraphDSL.Implicits._

      val concat = builder.add(Concat[Int](2))
      val broadcast = builder.add(Broadcast[Int](2))

      broadcast ~> flow1 ~> concat.in(0)
      broadcast ~> flow2 ~> concat.in(1)

      FlowShape(broadcast.in, concat.out)
    }
  )

source.via(flowGraph).runWith(sink)

मैं उम्मीद निम्न आउटपुट से इस कोड.

2
3
4
.
.
.
11
10
20
.
.
.
100

इसके बजाय, मैं देख रहा हूँ केवल "2" छपी है । कर सकते हैं आप गलत क्या है समझाने कृपया मेरी implmentation और मैं कैसे करना चाहिए कार्यक्रम बदलने के लिए वांछित उत्पादन प्राप्त.

akka akka-stream scala
2021-10-21 17:29:00
2

सबसे अच्छा जवाब

3

से अक्का स्ट्रीम एपीआई डॉक्स:

Concat:

का उत्सर्जन करता है जब वर्तमान स्ट्रीम है एक तत्व उपलब्ध; यदि वर्तमान इनपुट से पूरा करती है, यह की कोशिश करता है, अगले एक

Broadcast:

का उत्सर्जन करता है जब सभी outputs के बंद हो जाता है backpressuring और वहाँ है एक इनपुट तत्व उपलब्ध

दो ऑपरेटरों काम नहीं करेगा संयोजन के रूप में वहाँ के रूप में एक संघर्ष में वे कैसे काम करते हैं -- Concat की कोशिश करता है खींच करने के लिए सभी तत्वों में से एक से Broadcast's आउटपुट स्विचन से पहले एक दूसरे के लिए, जबकि Broadcast नहीं होंगे फेंकना जब तक कि वहाँ की मांग है की सभी के लिए अपने outputs.

आप क्या जरूरत के लिए, तुम सकता है जुटना का उपयोग कर concat के रूप में सुझाव दिया द्वारा टिप्पणीकर्ताओं:

source.via(flow1).concat(source.via(flow2)).runWith(sink)

या यों का उपयोग करें Source.combine नीचे की तरह:

Source.combine(source.via(flow1), source.via(flow2))(Concat[Int](_)).runWith(sink)
2021-10-21 22:34:04
0

का उपयोग कर GraphDSLहै , जो एक सरलीकृत संस्करण के कार्यान्वयन का स्रोत है । गठबंधन:

val sg = Source.fromGraph(
  GraphDSL.create(){ implicit builder =>
    import GraphDSL.Implicits._

    val concat = builder.add(Concat[Int](2))

    source ~> flow1 ~> concat
    source ~> flow2 ~> concat

    SourceShape(concat.out)
  }
)

sg.runWith(sink)
2021-10-26 19:23:56

अन्य भाषाओं में

यह पृष्ठ अन्य भाषाओं में है

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................

इस श्रेणी में लोकप्रिय

लोकप्रिय सवाल इस श्रेणी में