अक्का स्ट्रीम लगातार खपत websocket

0

सवाल

Im थोड़े नया करने के लिए स्काला और अक्का स्ट्रीम और आईएम की कोशिश कर प्राप्त करने के लिए JSON स्ट्रिंग संदेश से एक websocket और उन्हें धक्का करने के लिए एक काफ्का विषय है ।

अभी के लिए मैं केवल हूँ पर काम कर रहा है "संदेश प्राप्त से एक ws" हिस्सा है ।

आने वाले संदेशों से websocket इस तरह दिखता है :

{  
   "bitcoin":"6389.06534240",
   "ethereum":"192.93111286",
   "monero":"108.90302506",
   "litecoin":"52.25484165"
}

मैं चाहता हूँ कि विभाजित करने के लिए इस JSON संदेश को कई संदेश :

   {"coin": "bitcoin", "price": "6389.06534240"}
   {"coin": "ethereum", "price": "192.93111286"}
   {"coin": "monero", "price": "108.90302506"}
   {"coin": "litecoin", "price": "52.25484165"}

और फिर धक्का इन संदेशों में से प्रत्येक के लिए एक काफ्का विषय है ।

यहाँ है कि मैं क्या हासिल किया है अब तक :

val message_decomposition: Flow[Message, String, NotUsed] = Flow[Message].mapConcat(
    msg => msg.toString.replaceAll("[{})(]", "").split(",")
  ).map( msg => {
    val splitted = msg.split(":")
    s"{'coin': ${splitted(0)}, 'price': ${splitted(1)}}"
  })

val sink: Sink[String, Future[Done]] = Sink.foreach[String](println)

val flow: Flow[Message, Message, Promise[Option[Message]]] =
    Flow.fromSinkAndSourceMat(
      message_decomposition.to(sink),
      Source.maybe[Message])(Keep.right)

val (upgradeResponse, promise) = Http().singleWebSocketRequest(
      WebSocketRequest("wss://ws.coincap.io/prices?assets=ALL"),
      flow)

यह काम कर रहा im प्राप्त करने की उम्मीद उत्पादन Json संदेश है, लेकिन मैं सोच रहा था कि अगर मैं लिख सकता है इस निर्माता में एक अधिक "अक्का-ish" शैली की तरह, का उपयोग कर GraphDSL. तो मैं कुछ सवाल है :

  • यह संभव है करने के लिए लगातार खपत एक WebSocket का उपयोग कर एक GraphDSL ? यदि हाँ, तुम मुझे दिखा सकते हैं एक उदाहरण के के लिए कृपया ?
  • यह एक अच्छा विचार है का उपभोग करने के लिए हमें का उपयोग कर एक GraphDSL ?
  • चाहिए मैं विघटित प्राप्त Json संदेश की तरह im करने के लिए भेजने से पहले काफ्का ? या यह बेहतर करने के लिए इसे भेजने के रूप में यह है के लिए कम विलंबता ?
  • के बाद उत्पादन के लिए संदेश काफ्का, मैं योजना बना रहा हूँ यह उपभोग करने के लिए का उपयोग अपाचे तूफान, यह एक अच्छा विचार है ? या मैं छड़ी के साथ अक्का ?

पढ़ने के लिए धन्यवाद मुझे, संबंध, Arès

akka akka-stream apache-kafka scala
2021-11-20 14:01:02
1

सबसे अच्छा जवाब

1

है कि कोड के बहुत सारे है अक्का-ish: scaladsl बस के रूप में अक्का के रूप में GraphDSL को लागू करने या एक कस्टम GraphStage. केवल कारण है, आईएमओ/ई, के लिए जाने के लिए GraphDSL है, तो वास्तविक आकार के ग्राफ नहीं है आसानी से व्यक्त करने योग्य में scaladsl.

मैं व्यक्तिगत रूप से जाने के मार्ग को परिभाषित करने के लिए एक CoinPrice वर्ग बनाने के लिए मॉडल स्पष्ट

case class CoinPrice(coin: String, price: BigDecimal)

और फिर एक Flow[Message, CoinPrice, NotUsed] जो डेसिमल 1 आवक संदेश में शून्य या अधिक CoinPriceएस. कुछ (प्ले का उपयोग कर JSON के यहाँ) की तरह:

val toCoinPrices =
  Flow[Message]
    .mapConcat { msg =>
      Json.parse(msg.toString)
        .asOpt[JsObject]
        .toList
        .flatMap { json =>
          json.underlying.flatMap { kv =>
            import scala.util.Try

            kv match {
              case (coin, JsString(priceStr)) =>
                Try(BigDecimal(priceStr)).toOption
                  .map(p => CoinPrice(coin, p))                

              case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
              case _ => None
            }
          }
        }
    }

आप हो सकता है पर निर्भर करता है, क्या आकार के JSONs संदेश में कर रहे हैं, तोड़ने के लिए चाहते हैं कि में अलग-अलग स्ट्रीम के चरणों करने के लिए अनुमति देने के लिए एक async के बीच सीमा JSON पार्स और निष्कर्षण में CoinPriceएस. उदाहरण के लिए,

Flow[Message]
  .mapConcat { msg =>
    Json.parse(msg.toString).asOpt[JsObject].toList
  }
  .async
  .mapConcat { json =>
    json.underlying.flatMap { kv =>
      import scala.util.Try

      kv match {
        case (coin, JsString(priceStr)) =>
          Try(BigDecimal(priceStr)).toOption
            .map(p => CoinPrice(coin, p))

        case (coin, JsNumber(price)) => Some(CoinPrice(coin, price))
        case _ => None
      }
    }
  }

में, ऊपर के चरणों पर दोनों ओर के async सीमा पर अमल करेंगे में अलग-अलग अभिनेताओं और इस प्रकार, संभवतः समवर्ती (यदि वहाँ पर्याप्त सीपीयू कोर उपलब्ध हैं आदि.), की कीमत पर अतिरिक्त भूमि के ऊपर के लिए अभिनेताओं का समन्वय करने के लिए और विनिमय संदेश । कि अतिरिक्त समन्वय/संचार ओवरहेड (cf. गुंठर के सार्वभौमिक Scalability कानून) है केवल करने के लिए जा रहा लायक हो सकता है अगर यह JSON वस्तुओं रहे हैं, पर्याप्त बड़ी है और में आ रहा है पर्याप्त रूप से जल्दी से (लगातार में आने से पहले पिछले एक समाप्त हो गया है प्रसंस्करण).

अगर आपका इरादा है का उपभोग करने के लिए websocket जब तक कार्यक्रम बंद हो जाता है, आपको मिल सकता है यह स्पष्ट करने के लिए बस का उपयोग करें Source.never[Message].

2021-11-21 12:42:30

धन्यवाद आप के लिए जवाब है, यह बहुत स्पष्ट है, मैं बस एक प्रश्न है, यद्यपि. कैसे कर सकते हैं मैं मेरी प्रतिक्रिया में अलग-अलग स्ट्रीम के चरणों ? आप कर सकते हैं सिर्फ एक उदाहरण है कृपया ? या ओरिएंट करने के लिए मेरे उचित हिस्सा के दस्तावेज ?
Arès

अन्य भाषाओं में

यह पृष्ठ अन्य भाषाओं में है

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................
Slovenský
..................................................................................................................

इस श्रेणी में लोकप्रिय

लोकप्रिय सवाल इस श्रेणी में