Comment lire les données de Kafka avec Python

Comment lire les données de Kafka avec Python
Kafka est un système de messagerie distribué open source pour envoyer le message en sujets partitionnés et différents. Le streaming de données en temps réel peut être mis en œuvre en utilisant Kafka pour recevoir des données entre les applications. Il a trois pièces principales. Ce sont des producteurs, des consommateurs et des sujets. Le producteur est utilisé pour envoyer un message à un sujet particulier et chaque message est joint avec une clé. Le consommateur est utilisé pour lire un message sur un sujet particulier de l'ensemble des partitions. Les données reçues du producteur et stockées sur les partitions sur la base d'un sujet particulier. De nombreuses bibliothèques existent à Python pour créer un producteur et un consommateur pour construire un système de messagerie à l'aide de Kafka. Comment les données de Kafka peuvent être lues à l'aide de Python est affichée dans ce tutoriel.

Prérequis

Vous devez installer la bibliothèque Python nécessaire pour lire les données de Kafka. Python3 est utilisé dans ce tutoriel pour écrire le script du consommateur et du producteur. Si le package PIP n'est pas installé auparavant dans votre système d'exploitation Linux, vous devez installer PIP avant d'installer la bibliothèque Kafka pour Python. python3-kafka est utilisé dans ce tutoriel pour lire les données de Kafka. Exécutez la commande suivante pour installer la bibliothèque.

$ pip install python3-kafka

Lire des données de texte simples de Kafka

Différents types de données peuvent être envoyés par le producteur sur un sujet particulier qui peut être lu par le consommateur. Comment une simple données de texte peut être envoyée et reçue de Kafka à l'aide du producteur et du consommateur est montrée dans cette partie de ce tutoriel.

Créer un fichier nommé producteur1.py avec le script Python suivant. Kafkaproducteur Le module est importé de la bibliothèque Kafka. La liste des courtiers doit définir au moment de l'initialisation des objets de producteur pour se connecter avec le serveur Kafka. Le port par défaut de Kafka est '9092'. L'argument Bootstrap_Servers est utilisé pour définir le nom d'hôte avec le port. 'First_topic'est défini comme nom de sujet par lequel le message texte sera envoyé par le producteur. Ensuite, un simple message texte, 'Bonjour de Kafka'est envoyé en utilisant envoyer() méthode de Kafkaproducteur au sujet, 'First_topic'.

producteur1.py:

# Importer Kafkaproducer de la bibliothèque Kafka
de Kafka Import Kafkaproducer
# Définir le serveur avec le port
bootstrap_servers = ['localhost: 9092']
# Définir le nom du sujet où le message publiera
topicName = 'first_topic'
# Initialiser la variable du producteur
producteur = kafkaproducer (bootstrap_servers = bootstrap_servers)
# Publier du texte dans le sujet défini
producteur.Envoyer (thèsename, b'hello de Kafka… ')
# Message d'impression
imprimer ("message envoyé")

Créer un fichier nommé Consumer1.py avec le script Python suivant. Kafkaconsumer Le module est importé de la bibliothèque Kafka pour lire les données de Kafka. système Le module est utilisé ici pour terminer le script. Le même nom d'hôte et le même numéro de port du producteur sont utilisés dans le script du consommateur pour lire les données de Kafka. Le nom de sujet du consommateur et du producteur doit être le même que 'First_topic'. Ensuite, l'objet consommateur est initialisé avec les trois arguments. Nom du sujet, ID de groupe et informations sur le serveur. pour LOOP est utilisé ici pour lire le texte Send du producteur de Kafka.

Consumer1.py:

# Importer Kafkaconsumer de la bibliothèque Kafka
de Kafka Import Kafkaconsumer
# Importer le module sys
importer sys
# Définir le serveur avec le port
bootstrap_servers = ['localhost: 9092']
# Définir le nom du sujet d'où le message recevra
topicName = 'first_topic'
# Initialiser la variable des consommateurs
Consumer = kafkaconsumer (topicName, groupe_id = 'group1', bootstrap_servers =
bootstrap_servers)
# Lire et imprimer le message du consommateur
Pour MSG chez le consommateur:
print ("Nom du sujet =% S, message =% s"% (msg.Sujet, MSG.valeur))
# Terminer le script
système.sortie()

Sortir:

Exécutez la commande suivante à partir d'un terminal pour exécuter le script producteur.

$ Python3 producteur1.py

La sortie suivante apparaîtra après avoir envoyé le message.

Exécutez la commande suivante à partir d'un autre terminal pour exécuter le script grand public.

$ python3 Consumer1.py

La sortie montre le nom du sujet et le message texte envoyé par le producteur.

Lire des données formatées JSON de Kafka

Les données formatées JSON peuvent être envoyées par le producteur de Kafka et lues par le consommateur Kafka en utilisant le json module de python. Comment les données JSON peuvent être sérialisées et désérialisées avant d'envoyer et de recevoir les données à l'aide du module Python-Kafka est indiquée dans cette partie de ce tutoriel.

Créer un script python nommé producteur2.py avec le script suivant. Un autre module nommé JSON est importé avec Kafkaproducteur module ici. valeur_serializer L'argument est utilisé avec bootstrap_servers Argument ici pour initialiser l'objet du producteur de Kafka. Cet argument indique que les données JSON seront codées en utilisant 'UTF-8'Des personnages se déroulent au moment de l'envoi. Ensuite, les données formatées JSON sont envoyées au sujet nommé Jsontopique.

producteur2.py:

# Importer Kafkaproducer de la bibliothèque Kafka
de Kafka Import Kafkaproducer
# Importer le module JSON pour sérialiser les données
Importer JSON
# Initialiser la variable du producteur et définir le paramètre pour JSON Encode
producteur = kafkaproducer (bootstrap_servers =
['LocalHost: 9092'], Value_Serializer = Lambda V: JSON.décharges (v).Encode ('UTF-8'))
# Envoyer des données au format JSON
producteur.Envoyer ('JSontopic', 'name': 'Fahmida', 'Email': 'Fahmida @ gmail.com ')
# Message d'impression
print ("Message envoyé à jSontopic")

Créer un script python nommé Consumer2.py avec le script suivant. Kafkaconsumer, système et les modules JSON sont importés dans ce script. Kafkaconsumer Le module est utilisé pour lire les données formatées JSON du Kafka. Le module JSON est utilisé pour décoder les données JSON codées envoyées par le producteur de Kafka. Système Le module est utilisé pour terminer le script. Value_deserializer L'argument est utilisé avec bootstrap_servers Pour définir comment les données JSON seront décodées. Suivant, pour LOOP est utilisé pour imprimer tous les enregistrements de consommation et les données JSON récupérées de Kafka.

Consumer2.py:

# Importer Kafkaconsumer de la bibliothèque Kafka
de Kafka Import Kafkaconsumer
# Importer le module sys
importer sys
# Importer le module JSON pour sérialiser les données
Importer JSON
# Initialiser la variable des consommateurs et définir la propriété pour JSON Decode
Consumer = kafkaconsumer ('jSontopic', bootstrap_servers = ['localhost: 9092'],
Value_deserializer = Lambda M: JSON.charges (m.decode ('utf-8')))
# Lire les données de Kafka
pour le message dans le consommateur:
imprimer ("enregistrements de consommation: \ n")
Imprimer (message)
print ("\ nreading from json data \ n")
print ("name:", message [6] ['name'])
print ("e-mail:", message [6] ['e-mail'])
# Terminer le script
système.sortie()

Sortir:

Exécutez la commande suivante à partir d'un terminal pour exécuter le script producteur.

$ Python3 producteur2.py

Le script imprimera le message suivant après avoir envoyé les données JSON.

Exécutez la commande suivante à partir d'un autre terminal pour exécuter le script grand public.

$ python3 Consumer2.py

La sortie suivante apparaîtra après l'exécution du script.

Conclusion:

Les données peuvent être envoyées et reçues dans différents formats de Kafka à l'aide de Python. Les données peuvent également être stockées dans la base de données et récupérées de la base de données à l'aide de Kafka et Python. Je suis à la maison, ce tutoriel aidera l'utilisateur Python à commencer à travailler avec Kafka.