Skip to main content

Kafka PHP Symfony

Kafka with PHP (Symfony)

I was looking for a simple documentation on using Kafka with PHP. I found a few but they were not simple enough for me, as they try to bundle everything (producer and consumer) in one code base. Plus, they assume I have Zookeeper installed on the same machine as Kafka, which I don't.

So I decided to write my own.

In this example, I have a producer and a consumer, each in its own code base, and zookeeper and kafka running in separate Docker containers.

Docker network

First, let's create a Docker network for our containers to communicate with each other.

docker network create --subnet=172.18.0.0/29 --gateway=172.18.0.1 messaging-network

Zookeper container

Kafka should go with Zookeeper, so let's create a Zookeeper container.

FROM archlinux:latest
# Steps from https://www.tutorialspoint.com/apache_kafka/apache_kafka_installation_steps.htm
# Install OpenJDK 11
RUN pacman -Syu --noconfirm
RUN pacman -S  wget curl --noconfirm

# Download OpenJDK 11
RUN wget https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
# Extract OpenJDK 11
RUN tar -xzf openjdk-11.0.2_linux-x64_bin.tar.gz
# Set JAVA_HOME
ENV JAVA_HOME=/jdk-11.0.2
# Set PATH
ENV PATH=$PATH:/jdk-11.0.2/bin

RUN wget https://downloads.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
RUN tar -xzf apache-zookeeper-3.8.1-bin.tar.gz

# Configure Zookeeper
RUN echo "tickTime=2000" > apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "dataDir=/tmp/zookeeper" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "clientPort=2181" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "clientPortAddress=0.0.0.0" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "initLimit=5" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "syncLimit=2" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg

# Update PATH with zookeeper
ENV PATH=$PATH:/apache-zookeeper-3.8.1-bin/bin

# Set entrypoint to start zookeeper
ENTRYPOINT ["zkServer.sh", "start-foreground"]
  

After building it, we are going to run it with:

docker run --name zookeeper --network messaging-network --publish 172.18.0.1:2181:2181 rakotomandimby/zookeeper:3.8.1

Kafka container

Then lets create a Kafka container.

FROM archlinux:latest

# Steps from https://www.tutorialspoint.com/apache_kafka/apache_kafka_installation_steps.htm
# Install OpenJDK 11
RUN pacman -Syu --noconfirm
RUN pacman -S  wget curl --noconfirm

# Download OpenJDK 11
RUN wget https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
# Extract OpenJDK 11
RUN tar -xzf openjdk-11.0.2_linux-x64_bin.tar.gz
# Set JAVA_HOME
ENV JAVA_HOME=/jdk-11.0.2
# Set PATH
ENV PATH=$PATH:/jdk-11.0.2/bin

# Download Kafka 
RUN wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
# Extract Kafka
RUN tar -xzf kafka_2.13-3.4.0.tgz
# Set KAFKA_HOME
ENV KAFKA_HOME=/kafka_2.13-3.4.0
# Set PATH
ENV PATH=$PATH:/kafka_2.13-3.4.0/bin

#Copy the server.properties file

COPY ./server.properties /kafka_2.13-3.4.0/config/server.properties
RUN find /tmp
RUN cat /kafka_2.13-3.4.0/config/server.properties

COPY ./entrypoint.sh /entrypoint.sh
RUN chmod 755 /entrypoint.sh

# Set the Entry Point to start Kafka Server
ENTRYPOINT ["/entrypoint.sh"]

The entrypoint.sh file is as follows:

#!/bin/bash
cd /
/kafka_2.13-3.4.0/bin/kafka-server-start.sh  /kafka_2.13-3.4.0/config/server.properties --override zookeeper.connect=zookeeper:2181
  

I really had to put the --override zookeeper.connect=zookeeper:2181 because otherwise it would not connect to zookeeper.

The server.properties file is as follows:

broker.id=7
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

After building it, we are going to run it with:

docker run --name kafka     --network messaging-network --publish 172.18.0.1:9092:9092 rakotomandimby/kafka:3.4.0

Symfony producer

Note that it requires to have some PHP extensions on the host machine, so we need to install them (I use Archlinux)

sudo pacman -S librdkafka
yay  -S php-rdkafka

On Ubuntu or Fedora systems, you might install them differently.

We are now creating a Symfony Web application, that will get the message from POSTed data, and produce that message to Kafka. We start by creating the application then we install the needed bundles.

symfony new symfony-producer
cd symfony-producer
composer require enqueue/enqueue enqueue/rdkafka

We need to create a Controller that will handle the POST request, and produce that message to Kafka (Yes, it's dirty code, but it's just for testing purposes).

namespace App\Controller;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Routing\Annotation\Route;

class DefaultController extends AbstractController{
    /**
     * @Route("/produce/{hostName}/{queueName}", name="default")
     */
    public function default(Request $request, String $queueName, String $hostName): Response
    {
        $connectionFactory = new RdKafkaConnectionFactory([
            'global' => [
                'metadata.broker.list' => $hostName.':9092'
            ],
            'topic' => [],
        ]);
        $context = $connectionFactory->createContext();
        $content = $request->getContent();
        $message = $context->createMessage($content);
        $fooQueue = $context->createQueue($queueName);
        $context->createProducer()->send($fooQueue, $message);
        return $this->json(['message' => $content,'queue' => $queueName]);
    }
}

Not related to the topic, but I use to launch my Symfony applications with the following command:

symfony server:start --no-tls

With that Symfony application, if we POST some data to , /produce/172.18.0.1/foo, it will produce the message to the queue named "foo" (with creation if it doesnt exist), on the Kafka broker located in the "172.18.0.1" IP address.

Consumer 1

A consumer doesn't have to be a Symfony application (neither any HTTP application), it can be a simple PHP script.
That PHP script will connect to Kafka and will block there, and consume the messages from the queue.
It is important to note that the script will block there, so it will not be able to handle any other request.

A consumer must have a unique group name.

The consumer will need some bundles first:

composer require enqueue/rdkafka enqueue/simple-client

The consumer will be as follows:

require __DIR__.'/vendor/autoload.php';
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Enqueue\Consumption\QueueConsumer;

$groupId = $argv[1];
$kafkaIp = $argv[2];
$queueName = $argv[3];

$connectionFactory = new RdKafkaConnectionFactory([
    'global' => [
        'group.id' => $groupId,
        'metadata.broker.list' => $kafkaIp.':9092',
        'enable.auto.commit' => 'false',
    ],
    'topic' => [
        'auto.offset.reset' => 'beginning',
    ],
]);
$context = $connectionFactory->createContext();
$queueConsumer = new QueueConsumer($context);
$queueConsumer->bindCallback($queueName, function(Message $message) {
    echo $message->getBody();
    return Processor::ACK;
});
$queueConsumer->consume();

We can run it with:

php ./consumer.php bbbb 172.18.0.1 foo

And as I said, it will block there and will not give the prompt back

Consumer 2

To have another consumer, we just need to run a copy of the script above with another group name

php ./consumer.php aaaa 172.18.0.1 foo

And as I said, it will block there and will not give the prompt back

Play

If we POST some data to , /produce/172.18.0.1/foo, we will see all the messages in the consumers terminals.

Popular posts from this blog

npm run build base-href

Using NPM to specify base-href When building an Angular application, people usually use "ng" and pass arguments to that invocation. Typically, when wanting to hard code "base-href" in "index.html", one will issue: ng build --base-href='https://ngx.rktmb.org/foo' I used to build my angular apps through Bamboo or Jenkins and they have a "npm" plugin. I got the habit to build the application with "npm run build" before deploying it. But the development team once asked me to set the "--base-href='https://ngx.rktmb.org/foo'" parameter. npm run build --base-href='https://ngx.rktmb.org/foo did not set the base href in indext.html After looking for a while, I found https://github.com/angular/angular-cli/issues/13560 where it says: You need to use −− to pass arguments to npm scripts. This did the job! The command to issue is then: npm run build -- --base-href='https://ngx.rktmb.org/foo...

wget maven ntlm proxy

How to make wget, curl and Maven download behind an NTLM Proxy Working on CentOS, behind an NTLM proxy: yum can deal without problem with a NTLM Proxy wget, curl and Maven cannot The solution is to use " cntlm ". " cntlm " is a NTLM client for proxies requiring NTLM authentication. How it works Install "cntlm" Configure "cntlm"  by giving it your credentials by giving it the NTLM Proxy Start "cntlm" deamon (it listens to "127.0.0.1:3128") Configure wget, curl and Maven to use "cntlm" instead of using directly the NTLM Proxy Note: You will have then a kind of 2 stages Proxy : cntlm + the NTLM proxy Configure CNTLM After installing cntlm, the configuration file is in "cntlm.conf". You must have your domain (in the Windows meaning), proxy login and  proxy password. Mine are respectively: rktmb.org, mihamina, 1234abcd (yes, just for the example) You must have you NTLM Proxy Hostnama or IP ...

VMWare Keyboard Latency

Workstation VM UI lag when typing When using a VMWare Workstation VM, I noticed there is a latency when typing in the keyboard and the real appearance of the typed character. I searched and found: Noticeable typing lag in Linux VM terminals since v16.2 upgrade on Linux host To make it short, what solved it for me: Disable 3D acceleration in the VM setting .