Message queue in PHP using Kafka with an ack's

Message queue in PHP using Kafka with an ack's

Production-ready flow to use Apache Kafka with PHP in Docker

On a new project that I am working on as PHP Tech Lead. The team was faced with the issue of establishing various kinds of decisions. Including the unification of technologies:

  • a single type of relational database
  • a single message broker
  • a unified framework in the context of a programming language
  • etc.

This article will talk about testing Apache Kafka as a message broker.

Kafka is currently already used in the company and this technology is more scalable than the same RabbitMQ, so we are looking in its direction. Before we start implementing Kafka in our projects, we will test whether this technology covers our needs.

Requirements for a message broker:

  • PHP compatibility
  • Stable work, fault tolerance, high SLA
  • Scalability
  • Fast speed of sending/reading messages to the queue
  • Confidence in message delivery
  • Confidence in receiving and processing a message (ack / nack)

In fact, all this is already there and works great (at small and medium volumes) for RabbitMQ. But as I wrote above, we do not want to breed a zoo of technologies.

1. Choosing a client for PHP

The Kafka official website has several links to the integration repositories:

Only one repository is "live" at the moment and has a lot of stars on GitHub - arnaud-lb / php-rdkafka.

2. Environment setup

In the 21st year, Docker is like a whip to Indiana Jones for a developer.

  1. I went to hub.docker.com and found the most popular official Kafka image.
  2. Further search in Google for a ready-made example of a docker-compose file with PHP, possibly even with Kafka. And about a surprise - the first line of the search result is Phillaf / php-kafka-demo. There is already an image from the first paragraph.
    git clone https://github.com/Phillaf/php-kafka-demo
    
  3. After cloning the repository, run:
    docker-compose up -d
    
    From the first try, of course, the project did not come together (at least on my MacOs BigSur). In order to fix the error, it was enough to simply remove the well-defined version of Kafka. image.png
  4. Everything is ready for the application, but what about without an additional bonus? In order to better understand what is happening with an unknown tool, I decided to immediately look for a visual manager. There were several paid versions that needed to be installed on a PC. But that's not the true way) So I installed opensource admin panel. To do this, add a few lines to our docker-compose.yml
    kafka-ui:
     image: provectuslabs/kafka-ui
     container_name: kafka-ui
     ports:
       - "8080:8080"
     restart: always
     environment:
       - KAFKA_CLUSTERS_0_NAME=local
       - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka:9092
       - KAFKA_CLUSTERS_0_ZOOKEEPER=zookeeper:2181
       - KAFKA_CLUSTERS_0_READONLY=false
    
  5. Ready, set, start ...
    • docker-compose up -d
    • Go to http://localhost - Adds messages to our queue (topic).
    • Go to http://localhost:8080 - open's our admin panel - where we will see our topic and the first messages in it.
    • Go into the container (I personally prefer to use docker in phpStorm), but you can use the docker GUI as well as using the console (I will not dwell on this here, I think that those who are already interested in Kafka - will definitely be able to enter the Docker container) and run consumer:
      php ./public/consumer_low.php
      
      We will see our messages in the console.

Congratulations, you are now Kafka adepts.

3. We adjust the configuration according to our requirements.

As I wrote above, we are interested in full control over messages, our application wants to receive a message exactly 1 time, no more and no less. At the same time, we want to be sure that if in the process of processing a message, the consumer suddenly ends his work (as a result of deployment, an error, or a server crash, ...) then after restarting, this message will be processed, and will not sink into oblivion.

  1. To do this, we need to turn off the automatic synchronization of Kafka about the saved offset:
    $conf->set('enable.auto.commit', 'false');
    
  2. In order for the server to save offset, we must assign a unique identifier to the consumer so that when reconnecting, Kafka knows where we left off:
    $conf->set('group.id', 'group_1');
    
  3. Start listening to the queue:

    $topic->consumeStart($partition, RD_KAFKA_OFFSET_STORED);
    

    1st parameter - number of the partition to be read (by default, the topic has the 1st partition and the value of the variable will be = 0). 2nd parameter - number - offset from which to start reading messages. There are 3 preset modes:

    • RD_KAFKA_OFFSET_BEGINNING - start reading from the first message in the partition
    • RD_KAFKA_OFFSET_END - read-only new messages that will appear in the queue only after the consumer is connected
    • RD_KAFKA_OFFSET_STORED - read from where you left off last time (this flag can only be used if - 'group.id' is specified). Here you can find this and a lot more configuration options.
  4. Receiving a message from the queue:

    $msg = $topic->consume($partition, 1000);
    
  5. After successful processing of the message (saving to the database or etc.), mark the message as read (increment the offset):
    $topic->offsetStore($partition, $msg->offset);
    

Voila, we have implemented consistent work with messages in the queue using Apache Kafka.

Link to the repository.

I would be glad to hear your questions or comments)