Snažím sa písať kus kódu, ktorý sa takto:-
- Číta veľké csv súboru zo vzdialeného zdroja ako s3.
- Spracovať súbor záznam záznam.
- Pošli upozornenie pre užívateľa
- Napíšte výstup na vzdialené miesto
Záznamový vo vstupnom formáte csv:
recordId,name,salary
1,Aiden,20000
2,Tom,18000
3,Jack,25000
Môj vstup prípade, trieda, ktorá predstavuje záznam vo vstupnom formáte csv:
case class InputRecord(recordId: String, name: String, salary: Long)
Záznamový v csv výstup (ktorý musí byť napísaný):
recordId,name,designation
1,Aiden,Programmer
2,Tom,Web Developer
3,Jack,Manager
Môj výstup prípade, trieda, ktorá predstavuje záznam vo vstupnom formáte csv:
case class OutputRecord(recordId: String, name: String, designation: String)
Čítanie záznam pomocou akka stream csv (používa Alpakka reaktívne s3 https://doc.akka.io/docs/alpakka/current/s3.html):
def readAsCSV: Future[Source[Map[String, ByteString], NotUsed]] =
S3.download(s3Object.bucket, s3Object.path)
.runWith(Sink.head)
// This is then converted to csv
Teraz som funkciu na spracovanie záznamov:
def process(input: InputRecord): OutputRecord =
//if salary > avg(salary) then Manager
//else Programmer
Funkcia písať OutputRecord ako csv
def writeOutput:Sink[ByteString, Future[MultipartUploadResult]] =
S3.multipartUpload(s3Object.bucket,
s3Object.path,
metaHeaders = MetaHeaders(Map())
Funkcia na odosielanie e-mailu:
def notify : Flow[OutputRecord, PushResult, NotUsed]
//if notification is sent successfully PushResult has some additional info
Šitie to všetko spolu
readAsCSV.flatMap { recordSource =>
recordSource.map { record
val outputRecord = process(record)
outputRecord
}
.via(notify) //Error: Line 15
.to(writeOutput) //Error: Line 16
.run()
}
Na Riadku 15 a 16 ja som stále chybu, som buď môcť pridať Riadok 15 alebo Riadok 16, ale nie oboje oboje notify
& writeOutput
potreby outputRecord
. Raz informovať sa nazýva I voľné môj outputRecord
.
Existuje spôsob, ako môžem pridať oboch notify
a writeOutput
rovnaký graf?
Nie som hľadal paralelné vykonanie, ako chcem na prvej výzvy notify
a potom len writeOutput
. Takže to nie je veľmi užitočné: https://doc.akka.io/docs/akka/current/stream/stream-parallelism.html#parallel-processing
Prípad použitia sa zdá byť veľmi jednoduché pre mňa, ale niektoré, ako nie som schopný nájsť čisté riešenie.