Poslať/produkujú json správy prostredníctvom kafka

0

Otázka

Toto je môj prvý pomocou Kafka a ja som v pláne použiť kafka s .čistý

Chcel som vedieť, či môžem poslať JSON, ako sa správa, keď som sa vyrábajú udalosť

Som po tutorial: https://developer.confluent.io/get-started/dotnet/#build-producer

Tiež, existuje spôsob, ako pre ktoré hodnoty majú byť priradené k modelu tak, aby hodnota/json štruktúra je vždy viazaná na tento model

Tak napríklad: ak chcem, aby moje json hodnota sa

{
  "customerName":"anything",
  "eventType":"one-of-three-enums",
  "columnsChanged": "string value or something"
}

Väčšina príkladov, že môžem zistiť, ako je tento:

using Confluent.Kafka;
using System;
using Microsoft.Extensions.Configuration;

class Producer {
    static void Main(string[] args)
    {
        if (args.Length != 1) {
            Console.WriteLine("Please provide the configuration file path as a command line argument");
        }

        IConfiguration configuration = new ConfigurationBuilder()
            .AddIniFile(args[0])
            .Build();

        const string topic = "purchases";

        string[] users = { "eabara", "jsmith", "sgarcia", "jbernard", "htanaka", "awalther" };
        string[] items = { "book", "alarm clock", "t-shirts", "gift card", "batteries" };

        using (var producer = new ProducerBuilder<string, string>(
            configuration.AsEnumerable()).Build())
        {
            var numProduced = 0;
            const int numMessages = 10;
            for (int i = 0; i < numMessages; ++i)
            {
                Random rnd = new Random();
                var user = users[rnd.Next(users.Length)];
                var item = items[rnd.Next(items.Length)];

                producer.Produce(topic, new Message<string, string> { Key = user, Value = item },
                    (deliveryReport) =>
                    {
                        if (deliveryReport.Error.Code != ErrorCode.NoError) {
                            Console.WriteLine($"Failed to deliver message: {deliveryReport.Error.Reason}");
                        }
                        else {
                            Console.WriteLine($"Produced event to topic {topic}: key = {user,-10} value = {item}");
                            numProduced += 1;
                        }
                    });
            }

            producer.Flush(TimeSpan.FromSeconds(10));
            Console.WriteLine($"{numProduced} messages were produced to topic {topic}");
        }
    }
}

Chcela by som, aby položku triedy v json štruktúry.

.net apache-kafka asp.net-core
2021-11-23 21:53:21
1

Najlepšiu odpoveď

0

chcel vedieť, či môžem poslať JSON, ako sa správa, keď som sa vyrábajú udalosť

Áno. Kafka obchodov, bytov a prevedie bajtov pomocou Serializers. Pri budovaní Výrobcov, máte možnosť telefonovanie SetValueSerializer.

Niektoré vstavané serializers možno nájsť na - https://github.com/confluentinc/confluent-kafka-dotnet/blob/master/src/Confluent.Kafka/Serializers.cs

Budete potrebovať, ak chcete napísať svoj vlastný všeobecne nazývaných zvládnuť akúkoľvek JSON typy modelov.

Pri používaní Utf8Serializer pre reťazce, budete musieť vopred zoradenie objektov, z model triedy a potom odoslať, že ako hodnota. Vo vašom príklade by ste vymeniť var item s niektorými pokračovanie objektu.

Ako sa mám obrátiť, C# objektu do JSON reťazec .ČISTÝ?

Pri použití modelu tried, vaše údaje sa zvyčajne silno zadali kým začnete manuálne písať JSON, alebo používať Slovník typy. Ak ste chceli externé hlásenie overenie, že Splývajúce Schému databázy Registry je jeden príklad, ktorý podporuje JSONSchema a JsonSerializer z confluent-dotnet-kafka projekt podporuje.

2021-11-23 22:27:28

Len sledovať otázku. Viete, že môžem limit pre veľkosť správy a existuje spôsob, ako pre výrobcov, skontrolovať, čo je veľkosť prípade pred odoslaním a nedovoľte, aby sa chcete správu odoslať, ak veľkosť je viac ako je limit?
Learn AspNet

Kafka má predvolený limit 1MB správa šarží. Ak ste si veľkosti na pokračovanie byte pole, ktoré by malo byť v blízkosti aproximácie jednotlivých záznam veľkosť (tam je ďalšie nadzemné ako záznam hlavičky a časy, keď)
OneCricketeer

Ďakujem. Môžete mi prosím odpoveď: stackoverflow.com/questions/70097676/...
Learn AspNet

V iných jazykoch

Táto stránka je v iných jazykoch

Русский
..................................................................................................................
Italiano
..................................................................................................................
Polski
..................................................................................................................
Română
..................................................................................................................
한국어
..................................................................................................................
हिन्दी
..................................................................................................................
Français
..................................................................................................................
Türk
..................................................................................................................
Česk
..................................................................................................................
Português
..................................................................................................................
ไทย
..................................................................................................................
中文
..................................................................................................................
Español
..................................................................................................................