Skip to main content

Kafka PHP Symfony

Kafka with PHP (Symfony)

I was looking for a simple documentation on using Kafka with PHP. I found a few but they were not simple enough for me, as they try to bundle everything (producer and consumer) in one code base. Plus, they assume I have Zookeeper installed on the same machine as Kafka, which I don't.

So I decided to write my own.

In this example, I have a producer and a consumer, each in its own code base, and zookeeper and kafka running in separate Docker containers.

Docker network

First, let's create a Docker network for our containers to communicate with each other.

docker network create --subnet=172.18.0.0/29 --gateway=172.18.0.1 messaging-network

Zookeper container

Kafka should go with Zookeeper, so let's create a Zookeeper container.

FROM archlinux:latest
# Steps from https://www.tutorialspoint.com/apache_kafka/apache_kafka_installation_steps.htm
# Install OpenJDK 11
RUN pacman -Syu --noconfirm
RUN pacman -S  wget curl --noconfirm

# Download OpenJDK 11
RUN wget https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
# Extract OpenJDK 11
RUN tar -xzf openjdk-11.0.2_linux-x64_bin.tar.gz
# Set JAVA_HOME
ENV JAVA_HOME=/jdk-11.0.2
# Set PATH
ENV PATH=$PATH:/jdk-11.0.2/bin

RUN wget https://downloads.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
RUN tar -xzf apache-zookeeper-3.8.1-bin.tar.gz

# Configure Zookeeper
RUN echo "tickTime=2000" > apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "dataDir=/tmp/zookeeper" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "clientPort=2181" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "clientPortAddress=0.0.0.0" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "initLimit=5" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "syncLimit=2" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg

# Update PATH with zookeeper
ENV PATH=$PATH:/apache-zookeeper-3.8.1-bin/bin

# Set entrypoint to start zookeeper
ENTRYPOINT ["zkServer.sh", "start-foreground"]
  

After building it, we are going to run it with:

docker run --name zookeeper --network messaging-network --publish 172.18.0.1:2181:2181 rakotomandimby/zookeeper:3.8.1

Kafka container

Then lets create a Kafka container.

FROM archlinux:latest

# Steps from https://www.tutorialspoint.com/apache_kafka/apache_kafka_installation_steps.htm
# Install OpenJDK 11
RUN pacman -Syu --noconfirm
RUN pacman -S  wget curl --noconfirm

# Download OpenJDK 11
RUN wget https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
# Extract OpenJDK 11
RUN tar -xzf openjdk-11.0.2_linux-x64_bin.tar.gz
# Set JAVA_HOME
ENV JAVA_HOME=/jdk-11.0.2
# Set PATH
ENV PATH=$PATH:/jdk-11.0.2/bin

# Download Kafka 
RUN wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
# Extract Kafka
RUN tar -xzf kafka_2.13-3.4.0.tgz
# Set KAFKA_HOME
ENV KAFKA_HOME=/kafka_2.13-3.4.0
# Set PATH
ENV PATH=$PATH:/kafka_2.13-3.4.0/bin

#Copy the server.properties file

COPY ./server.properties /kafka_2.13-3.4.0/config/server.properties
RUN find /tmp
RUN cat /kafka_2.13-3.4.0/config/server.properties

COPY ./entrypoint.sh /entrypoint.sh
RUN chmod 755 /entrypoint.sh

# Set the Entry Point to start Kafka Server
ENTRYPOINT ["/entrypoint.sh"]

The entrypoint.sh file is as follows:

#!/bin/bash
cd /
/kafka_2.13-3.4.0/bin/kafka-server-start.sh  /kafka_2.13-3.4.0/config/server.properties --override zookeeper.connect=zookeeper:2181
  

I really had to put the --override zookeeper.connect=zookeeper:2181 because otherwise it would not connect to zookeeper.

The server.properties file is as follows:

broker.id=7
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

After building it, we are going to run it with:

docker run --name kafka     --network messaging-network --publish 172.18.0.1:9092:9092 rakotomandimby/kafka:3.4.0

Symfony producer

Note that it requires to have some PHP extensions on the host machine, so we need to install them (I use Archlinux)

sudo pacman -S librdkafka
yay  -S php-rdkafka

On Ubuntu or Fedora systems, you might install them differently.

We are now creating a Symfony Web application, that will get the message from POSTed data, and produce that message to Kafka. We start by creating the application then we install the needed bundles.

symfony new symfony-producer
cd symfony-producer
composer require enqueue/enqueue enqueue/rdkafka

We need to create a Controller that will handle the POST request, and produce that message to Kafka (Yes, it's dirty code, but it's just for testing purposes).

namespace App\Controller;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Routing\Annotation\Route;

class DefaultController extends AbstractController{
    /**
     * @Route("/produce/{hostName}/{queueName}", name="default")
     */
    public function default(Request $request, String $queueName, String $hostName): Response
    {
        $connectionFactory = new RdKafkaConnectionFactory([
            'global' => [
                'metadata.broker.list' => $hostName.':9092'
            ],
            'topic' => [],
        ]);
        $context = $connectionFactory->createContext();
        $content = $request->getContent();
        $message = $context->createMessage($content);
        $fooQueue = $context->createQueue($queueName);
        $context->createProducer()->send($fooQueue, $message);
        return $this->json(['message' => $content,'queue' => $queueName]);
    }
}

Not related to the topic, but I use to launch my Symfony applications with the following command:

symfony server:start --no-tls

With that Symfony application, if we POST some data to , /produce/172.18.0.1/foo, it will produce the message to the queue named "foo" (with creation if it doesnt exist), on the Kafka broker located in the "172.18.0.1" IP address.

Consumer 1

A consumer doesn't have to be a Symfony application (neither any HTTP application), it can be a simple PHP script.
That PHP script will connect to Kafka and will block there, and consume the messages from the queue.
It is important to note that the script will block there, so it will not be able to handle any other request.

A consumer must have a unique group name.

The consumer will need some bundles first:

composer require enqueue/rdkafka enqueue/simple-client

The consumer will be as follows:

require __DIR__.'/vendor/autoload.php';
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Enqueue\Consumption\QueueConsumer;

$groupId = $argv[1];
$kafkaIp = $argv[2];
$queueName = $argv[3];

$connectionFactory = new RdKafkaConnectionFactory([
    'global' => [
        'group.id' => $groupId,
        'metadata.broker.list' => $kafkaIp.':9092',
        'enable.auto.commit' => 'false',
    ],
    'topic' => [
        'auto.offset.reset' => 'beginning',
    ],
]);
$context = $connectionFactory->createContext();
$queueConsumer = new QueueConsumer($context);
$queueConsumer->bindCallback($queueName, function(Message $message) {
    echo $message->getBody();
    return Processor::ACK;
});
$queueConsumer->consume();

We can run it with:

php ./consumer.php bbbb 172.18.0.1 foo

And as I said, it will block there and will not give the prompt back

Consumer 2

To have another consumer, we just need to run a copy of the script above with another group name

php ./consumer.php aaaa 172.18.0.1 foo

And as I said, it will block there and will not give the prompt back

Play

If we POST some data to , /produce/172.18.0.1/foo, we will see all the messages in the consumers terminals.

Popular posts from this blog

npm run build base-href

Using NPM to specify base-href When building an Angular application, people usually use "ng" and pass arguments to that invocation. Typically, when wanting to hard code "base-href" in "index.html", one will issue: ng build --base-href='https://ngx.rktmb.org/foo' I used to build my angular apps through Bamboo or Jenkins and they have a "npm" plugin. I got the habit to build the application with "npm run build" before deploying it. But the development team once asked me to set the "--base-href='https://ngx.rktmb.org/foo'" parameter. npm run build --base-href='https://ngx.rktmb.org/foo did not set the base href in indext.html After looking for a while, I found https://github.com/angular/angular-cli/issues/13560 where it says: You need to use −− to pass arguments to npm scripts. This did the job! The command to issue is then: npm run build -- --base-href='https://ngx.rktmb.org/foo&

Emacs Pulumi LSP

Install Pulumi Emacs Mode The source code is on https://github.com/pulumi/pulumi-lsp. At the very bottom of the page are the instructions on how to install. You Need to have make installed. $ make install emacs-client mkdir -p ./bin go build -ldflags "-X github.com/pulumi/pulumi-lsp/sdk/version.Version=v0.2.3-6-gec49054" -o ./bin -p 10 ./cmd/... go: downloading github.com/pulumi/pulumi/sdk/v3 v3.53.1 ... ... ... go install -ldflags "-X github.com/pulumi/pulumi-lsp/sdk/version.Version=v0.2.3-6-gec49054" ./cmd/... mkdir -p editors/emacs/bin cd editors/emacs && emacs -Q --batch --eval "(progn (setq package-user-dir \"$(pwd)/bin\" \ package-archives '((\"melpa\" . \"https://melpa.org/packages/\") \

emacs29 intelephense

Emacs 29 and PHP Intelephense I use to use Emacs and PHP Intelephense for PHP development. I recently upgraded to Emacs 29 and PHP Intelephense stopped working. I found a solution on Reddit Based on that, I rewrote my .emacs file to use eglot instead of lsp-mode, and this is the result. (use-package eglot :ensure t) (add-hook 'php-mode-hook 'eglot-ensure) (use-package php-mode :ensure t :mode ("\\.php\\'" . php-mode)) (add-to-list 'auto-mode-alist '("\\.php$" . php-mode)) (provide 'lang-php) (use-package company :ensure t :config (setq company-idle-delay 0.3) (global-company-mode 1) (global-set-key (kbd "M- ") 'company-complete)) (require 'eglot) (add-to-list 'eglot-server-programs '((php-mode :language-id "php") . ("intelephense" "--stdio" :initializationOptions (:licenseKey "98989898989898989898"