Ako premeniť Mono do skutočne asynchrónne (nie reaktívne!) metóda zavolať?

0

Otázka

Mám metóda

@Service
public class MyService {
    public Mono<Integer> processData() {
        ... // very long reactive operation
    }
}

V normálnom priebehu programu, pozývam táto metóda asynchrónne cez Kafka udalosti.

Pre účely testovania potrebujem vystaviť metódu, ako webovú službu, ale postup by mal byť vystavené asynchrónne: návrate len HTTP kódom 200 OK ("žiadosť prijatá") a ďalšieho spracovania údajov v pozadí.

Je to OK (= nemá to nejaké nežiaduce vedľajšie účinky) stačí zavolať Mono#subscribe() a vrátiť sa z regulátora metódou?

@RestController
@RequiredArgsConstructor
public class MyController {
    private final MyService service;

    @GetMapping
    public void processData() {
        service.processData()
            .subscribeOn(Schedulers.boundedElastic())
            .subscribe();
    }
}

Alebo je lepšie na to, ako je tento (tu som zmätený varovanie z IntelliJ, možno rovnaké ako https://youtrack.jetbrains.com/issue/IDEA-276018 ?):

public Mono<Void> processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe(); // IntelliJ complains "Inappropriate 'subscribe' call" but I think it's a false alarm in my case(?)
    return Mono.empty();
}

Alebo nejaké iné riešenie?

2

Najlepšiu odpoveď

3

Je to OK (= nemá to nejaké nežiaduce vedľajšie účinky) stačí zavolať Mono#vyžiadať() a návrat z regulátora metódou?

Existujú vedľajšie účinky, ale môže to byť ok žijúce s nimi:

  • To naozaj je oheň a zabudnúť - čo znamená, zatiaľ čo vy budete nikdy byť informovaný o úspech (čo väčšina ľudí realizovať), budete nikdy byť informovaný o zlyhania (ktorý sa podstatne menej ľudia myslia.)
  • Ak proces zablokuje z nejakého dôvodu, že vydavateľ nebude nikdy dokončené, a budete mať žiadny spôsob, ako vedel. Pretože ste sa upísali na ohraničenej elastické threadpool, to bude tiež viazať jeden z tých, ktorí majú obmedzenú vlákna na neurčito príliš.

Prvé miesto, ktoré by mohlo byť v poriadku, alebo budete chcieť dať nejaké zapisovanie chýb do denníkov ďalej stanovuje, že reaktívne reťazca, ako vedľajší efekt nejako tak vás aspoň interné oznámenia, ak sa niečo pokazí.

Pre druhý bod - ja by som odporučil, uvedenie (veľkorysý) časový limit na svoj spôsob hovoru, takže to aspoň bude zrušený, ak ešte nie je dokončená v nastavenom čase, a je už visí okolo náročné zdrojov. Ak používate asynchrónnej úlohy, potom to nie je masívny problém, ako to bude len konzumovať trochu pamäte. Ak ste balenie blokovanie hovoru na elastickom plánovač potom to je ešte horšie však, ako ste viazanie do niť, že v threadpool na dobu neurčitú.

Ja by som tiež otázka, prečo je potrebné používať ohraničenej elastické plánovač na všetkých tu - používa sa na balenie blokovanie hovorov, ktorá sa nezdá byť základom tento prípad použitia. (Aby bolo jasné, ak váš poskytovateľ blokuje, potom by ste mali úplne obaliť to na elastickom plánovač - ale ak nie, tak nie je dôvod, aby tak urobili.)

Nakoniec, v tomto príklade:

public Mono<Void> processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();
    return Mono.empty();
}

...je to skvelý príklad toho, čo sa nesmie robiť, ako si vytvoriť akýsi "oblasť reaktívne metódy" - niekto môže veľmi rozumne objednať, ktoré sa vrátili vydavateľ myslieť si, že bude dokončená, keď podkladové vydavateľ dokončí, čo očividne nie je to, čo sa tu deje. Pomocou void návrat typ, a teda nie návrate všetko je správne, čo robiť v tejto situácii.

2021-11-23 16:54:58
1

Váš variant s nasledujúci kód je vlastne ok:

@GetMapping
public void processData() {
    service.processData()
        .subscribeOn(Schedulers.boundedElastic())
        .subscribe();
}

To je vlastne to, čo robíte v @Scheduled metóda, ktorá jednoducho vráti nič a explicitne objednania Mono alebo Flux tak, že prvky, ktoré sú emitované.

2021-11-23 08:36:44

V iných jazykoch

Táto stránka je v iných jazykoch

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................