Akka tok Textu ("V") ako Výstup (`Z`)

0

Otázka

Snažím sa písať kus kódu, ktorý sa takto:-

  1. Číta veľké csv súboru zo vzdialeného zdroja ako s3.
  2. Spracovať súbor záznam záznam.
  3. Pošli upozornenie pre užívateľa
  4. Napíšte výstup na vzdialené miesto

Záznamový vo vstupnom formáte csv:

recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000

Môj vstup prípade, trieda, ktorá predstavuje záznam vo vstupnom formáte csv:

case class InputRecord(recordId: String, name: String, salary: Long)

Záznamový v csv výstup (ktorý musí byť napísaný):

recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager

Môj výstup prípade, trieda, ktorá predstavuje záznam vo vstupnom formáte csv:

case class OutputRecord(recordId: String, name: String, designation: String)

Čítanie záznam pomocou akka stream csv (používa Alpakka reaktívne s3 https://doc.akka.io/docs/alpakka/current/s3.html):

def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] = 
S3.download(s3Object.bucket, s3Object.path)
      .runWith(Sink.head)
// This is then converted to csv

Teraz som funkciu na spracovanie záznamov:

def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer

Funkcia písať OutputRecord ako csv

def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] = 
S3.multipartUpload(s3Object.bucket,
                       s3Object.path,
                       metaHeaders = MetaHeaders(Map())

Funkcia na odosielanie e-mailu:

def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info

Šitie to všetko spolu

readAsCSV.flatMap { recordSource =>
  recordSource.map { record
    val outputRecord = process(record)
    outputRecord
  }
  .via(notify) //Error: Line 15
  .to(writeOutput) //Error: Line 16
  .run()
}

Na Riadku 15 a 16 ja som stále chybu, som buď môcť pridať Riadok 15 alebo Riadok 16, ale nie oboje oboje notify & writeOutput potreby outputRecord. Raz informovať sa nazýva I voľné môj outputRecord.

Existuje spôsob, ako môžem pridať oboch notify a writeOutput rovnaký graf?

Nie som hľadal paralelné vykonanie, ako chcem na prvej výzvy notify a potom len writeOutput. Takže to nie je veľmi užitočné: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing

Prípad použitia sa zdá byť veľmi jednoduché pre mňa, ale niektoré, ako nie som schopný nájsť čisté riešenie.

akka akka-stream alpakka amazon-s3
2021-11-23 22:36:54
1

Najlepšiu odpoveď

1

Výstup notify je PushResult,, ale vstup writeOutput je ByteString. Keď zmeníte, že to bude zostavovaný. V prípade, že potrebujete ByteString,, si rovnaká z OutputRecord.

BTW, v kód vzorky, ktoré ste poskytli, podobné chybové existuje v readCSV a process.

2021-11-24 03:36:16

V iných jazykoch

Táto stránka je v iných jazykoch

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................