Début avec Apache Kafka et Python

Début avec Apache Kafka et Python
Dans cette leçon, nous verrons comment nous pouvons utiliser Apache Kafka avec Python et faire un exemple d'application en utilisant le client Python pour Apache Kafka.

Pour terminer cette leçon, vous devez avoir une installation active pour Kafka sur votre machine. Lisez Installer Apache Kafka sur Ubuntu pour savoir comment faire cela.

Installation du client Python pour Apache Kafka

Avant de pouvoir commencer à travailler avec Apache Kafka dans le programme Python, nous devons installer le client Python pour Apache Kafka. Cela peut être fait en utilisant pépin (Index du package Python). Voici une commande pour y parvenir:

pip3 installer kafka-python

Ce sera une installation rapide sur le terminal:

Installation du client Python Kafka en utilisant PIP

Maintenant que nous avons une installation active pour Apache Kafka et que nous avons également installé le client Python Kafka, nous sommes prêts à commencer à coder.

Faire un producteur

La première chose à publier des messages sur Kafka est une application de producteur qui peut envoyer des messages à des sujets à Kafka.

Notez que les producteurs de Kafka sont des producteurs de messages asynchrones. Cela signifie que les opérations effectuées pendant qu'un message est publié sur Kafka Topic Partition ne bloque pas. Pour garder les choses simples, nous écrivons un éditeur JSON simple pour cette leçon.

Pour commencer, faites une instance pour le producteur de Kafka:

de Kafka Import Kafkaproducer
Importer JSON
Importer un empreinte
producteur = kafkaproducer (
bootstrap_servers = 'localhost: 9092',
Value_Serializer = Lambda V: JSON.décharges (v).Encode ('UTF-8'))

L'attribut Bootstrap_Servers Informe sur l'hôte et le port du serveur Kafka. L'attribut Value_Serializer est juste à des fins de sérialisation JSON des valeurs JSON rencontrées.

Pour jouer avec le producteur de Kafka, essayons d'imprimer les mesures liées au producteur et au cluster Kafka:

métriques = producteur.métrique()
tampon.pprint (métriques)

Nous verrons ce qui suit maintenant:

Kafka Mterics

Maintenant, essayons enfin d'envoyer un message à la file d'attente de Kafka. Un objet JSON simple sera un bon exemple:

producteur.Envoyer ('LinuxHint', 'Topic': 'Kafka')

Le linuxhint La partition de sujet est-elle sur laquelle l'objet JSON sera envoyé sur. Lorsque vous exécutez le script, vous n'obtiendrez aucune sortie car le message est juste envoyé à la partition du sujet. Il est temps d'écrire un consommateur afin que nous puissions tester notre application.

Faire un consommateur

Maintenant, nous sommes prêts à établir une nouvelle connexion en tant qu'application de consommation et à obtenir les messages du sujet Kafka. Commencez par faire une nouvelle instance pour le consommateur:

de Kafka Import Kafkaconsumer
De Kafka Import TopicPartition
Imprimer («faire une connexion.')
consommateur = kafkaconsumer (bootstrap_servers = 'localhost: 9092')

Maintenant, attribuez un sujet à cette connexion et une possible valeur de décalage également.

Imprimer ('Assignation du sujet.')
consommateur.Assign ([topicPartition ('Linuxhint', 2)])

Enfin, nous sommes prêts à imprimer le MSSage:

Imprimer ('Obtenir un message.')
pour le message dans le consommateur:
print ("offset:" + str (message [0]) + "\ t msg:" + str (message)))

Grâce à cela, nous obtiendrons une liste de tous les messages publiés sur la partition du sujet du consommateur Kafka. La sortie de ce programme sera:

Consommateur kafka

Juste pour une référence rapide, voici le script complet du producteur:

de Kafka Import Kafkaproducer
Importer JSON
Importer un empreinte
producteur = kafkaproducer (
bootstrap_servers = 'localhost: 9092',
Value_Serializer = Lambda V: JSON.décharges (v).Encode ('UTF-8'))
producteur.Envoyer ('LinuxHint', 'Topic': 'Kafka')
# Metrics = producteur.métrique()
# print.pprint (métriques)

Et voici le programme complet des consommateurs que nous avons utilisé:

de Kafka Import Kafkaconsumer
De Kafka Import TopicPartition
Imprimer («faire une connexion.')
consommateur = kafkaconsumer (bootstrap_servers = 'localhost: 9092')
Imprimer ('Assignation du sujet.')
consommateur.Assign ([topicPartition ('Linuxhint', 2)])
Imprimer ('Obtenir un message.')
pour le message dans le consommateur:
print ("offset:" + str (message [0]) + "\ t msg:" + str (message)))

Conclusion

Dans cette leçon, nous avons examiné comment nous pouvons installer et commencer à utiliser Apache Kafka dans nos programmes Python. Nous avons montré à quel point il est facile d'effectuer des tâches simples liées à Kafka à Python avec le client Kafka démontré pour Python.