Skip to main content

Kafka PHP Symfony

Kafka with PHP (Symfony)

I was looking for a simple documentation on using Kafka with PHP. I found a few but they were not simple enough for me, as they try to bundle everything (producer and consumer) in one code base. Plus, they assume I have Zookeeper installed on the same machine as Kafka, which I don't.

So I decided to write my own.

In this example, I have a producer and a consumer, each in its own code base, and zookeeper and kafka running in separate Docker containers.

Docker network

First, let's create a Docker network for our containers to communicate with each other.

docker network create --subnet=172.18.0.0/29 --gateway=172.18.0.1 messaging-network

Zookeper container

Kafka should go with Zookeeper, so let's create a Zookeeper container.

FROM archlinux:latest
# Steps from https://www.tutorialspoint.com/apache_kafka/apache_kafka_installation_steps.htm
# Install OpenJDK 11
RUN pacman -Syu --noconfirm
RUN pacman -S  wget curl --noconfirm

# Download OpenJDK 11
RUN wget https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
# Extract OpenJDK 11
RUN tar -xzf openjdk-11.0.2_linux-x64_bin.tar.gz
# Set JAVA_HOME
ENV JAVA_HOME=/jdk-11.0.2
# Set PATH
ENV PATH=$PATH:/jdk-11.0.2/bin

RUN wget https://downloads.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
RUN tar -xzf apache-zookeeper-3.8.1-bin.tar.gz

# Configure Zookeeper
RUN echo "tickTime=2000" > apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "dataDir=/tmp/zookeeper" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "clientPort=2181" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "clientPortAddress=0.0.0.0" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "initLimit=5" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "syncLimit=2" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg

# Update PATH with zookeeper
ENV PATH=$PATH:/apache-zookeeper-3.8.1-bin/bin

# Set entrypoint to start zookeeper
ENTRYPOINT ["zkServer.sh", "start-foreground"]
  

After building it, we are going to run it with:

docker run --name zookeeper --network messaging-network --publish 172.18.0.1:2181:2181 rakotomandimby/zookeeper:3.8.1

Kafka container

Then lets create a Kafka container.

FROM archlinux:latest

# Steps from https://www.tutorialspoint.com/apache_kafka/apache_kafka_installation_steps.htm
# Install OpenJDK 11
RUN pacman -Syu --noconfirm
RUN pacman -S  wget curl --noconfirm

# Download OpenJDK 11
RUN wget https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
# Extract OpenJDK 11
RUN tar -xzf openjdk-11.0.2_linux-x64_bin.tar.gz
# Set JAVA_HOME
ENV JAVA_HOME=/jdk-11.0.2
# Set PATH
ENV PATH=$PATH:/jdk-11.0.2/bin

# Download Kafka 
RUN wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
# Extract Kafka
RUN tar -xzf kafka_2.13-3.4.0.tgz
# Set KAFKA_HOME
ENV KAFKA_HOME=/kafka_2.13-3.4.0
# Set PATH
ENV PATH=$PATH:/kafka_2.13-3.4.0/bin

#Copy the server.properties file

COPY ./server.properties /kafka_2.13-3.4.0/config/server.properties
RUN find /tmp
RUN cat /kafka_2.13-3.4.0/config/server.properties

COPY ./entrypoint.sh /entrypoint.sh
RUN chmod 755 /entrypoint.sh

# Set the Entry Point to start Kafka Server
ENTRYPOINT ["/entrypoint.sh"]

The entrypoint.sh file is as follows:

#!/bin/bash
cd /
/kafka_2.13-3.4.0/bin/kafka-server-start.sh  /kafka_2.13-3.4.0/config/server.properties --override zookeeper.connect=zookeeper:2181
  

I really had to put the --override zookeeper.connect=zookeeper:2181 because otherwise it would not connect to zookeeper.

The server.properties file is as follows:

broker.id=7
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

After building it, we are going to run it with:

docker run --name kafka     --network messaging-network --publish 172.18.0.1:9092:9092 rakotomandimby/kafka:3.4.0

Symfony producer

Note that it requires to have some PHP extensions on the host machine, so we need to install them (I use Archlinux)

sudo pacman -S librdkafka
yay  -S php-rdkafka

On Ubuntu or Fedora systems, you might install them differently.

We are now creating a Symfony Web application, that will get the message from POSTed data, and produce that message to Kafka. We start by creating the application then we install the needed bundles.

symfony new symfony-producer
cd symfony-producer
composer require enqueue/enqueue enqueue/rdkafka

We need to create a Controller that will handle the POST request, and produce that message to Kafka (Yes, it's dirty code, but it's just for testing purposes).

namespace App\Controller;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Routing\Annotation\Route;

class DefaultController extends AbstractController{
    /**
     * @Route("/produce/{hostName}/{queueName}", name="default")
     */
    public function default(Request $request, String $queueName, String $hostName): Response
    {
        $connectionFactory = new RdKafkaConnectionFactory([
            'global' => [
                'metadata.broker.list' => $hostName.':9092'
            ],
            'topic' => [],
        ]);
        $context = $connectionFactory->createContext();
        $content = $request->getContent();
        $message = $context->createMessage($content);
        $fooQueue = $context->createQueue($queueName);
        $context->createProducer()->send($fooQueue, $message);
        return $this->json(['message' => $content,'queue' => $queueName]);
    }
}

Not related to the topic, but I use to launch my Symfony applications with the following command:

symfony server:start --no-tls

With that Symfony application, if we POST some data to , /produce/172.18.0.1/foo, it will produce the message to the queue named "foo" (with creation if it doesnt exist), on the Kafka broker located in the "172.18.0.1" IP address.

Consumer 1

A consumer doesn't have to be a Symfony application (neither any HTTP application), it can be a simple PHP script.
That PHP script will connect to Kafka and will block there, and consume the messages from the queue.
It is important to note that the script will block there, so it will not be able to handle any other request.

A consumer must have a unique group name.

The consumer will need some bundles first:

composer require enqueue/rdkafka enqueue/simple-client

The consumer will be as follows:

require __DIR__.'/vendor/autoload.php';
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Enqueue\Consumption\QueueConsumer;

$groupId = $argv[1];
$kafkaIp = $argv[2];
$queueName = $argv[3];

$connectionFactory = new RdKafkaConnectionFactory([
    'global' => [
        'group.id' => $groupId,
        'metadata.broker.list' => $kafkaIp.':9092',
        'enable.auto.commit' => 'false',
    ],
    'topic' => [
        'auto.offset.reset' => 'beginning',
    ],
]);
$context = $connectionFactory->createContext();
$queueConsumer = new QueueConsumer($context);
$queueConsumer->bindCallback($queueName, function(Message $message) {
    echo $message->getBody();
    return Processor::ACK;
});
$queueConsumer->consume();

We can run it with:

php ./consumer.php bbbb 172.18.0.1 foo

And as I said, it will block there and will not give the prompt back

Consumer 2

To have another consumer, we just need to run a copy of the script above with another group name

php ./consumer.php aaaa 172.18.0.1 foo

And as I said, it will block there and will not give the prompt back

Play

If we POST some data to , /produce/172.18.0.1/foo, we will see all the messages in the consumers terminals.

Popular posts from this blog

Undefined global vim

Defining vim as global outside of Neovim When developing plugins for Neovim, particularly in Lua, developers often encounter the "Undefined global vim" warning. This warning can be a nuisance and disrupt the development workflow. However, there is a straightforward solution to this problem by configuring the Lua Language Server Protocol (LSP) to recognize 'vim' as a global variable. Getting "Undefined global vim" warning when developing Neovim plugin While developing Neovim plugins using Lua, the Lua language server might not recognize the 'vim' namespace by default. This leads to warnings about 'vim' being an undefined global variable. These warnings are not just annoying but can also clutter the development environment with unnecessary alerts, potentially hiding other important warnings or errors. Defining vim as global in Lua LSP configuration to get rid of the warning To resolve the "Undefined global vi...

npm run build base-href

Using NPM to specify base-href When building an Angular application, people usually use "ng" and pass arguments to that invocation. Typically, when wanting to hard code "base-href" in "index.html", one will issue: ng build --base-href='https://ngx.rktmb.org/foo' I used to build my angular apps through Bamboo or Jenkins and they have a "npm" plugin. I got the habit to build the application with "npm run build" before deploying it. But the development team once asked me to set the "--base-href='https://ngx.rktmb.org/foo'" parameter. npm run build --base-href='https://ngx.rktmb.org/foo did not set the base href in indext.html After looking for a while, I found https://github.com/angular/angular-cli/issues/13560 where it says: You need to use −− to pass arguments to npm scripts. This did the job! The command to issue is then: npm run build -- --base-href='https://ngx.rktmb.org/foo...

CopilotChat GlobFile Configuration

CopilotChat GlobFile Configuration Want to feed multiple files into GitHub Copilot Chat from Neovim without listing each one manually? Let's add a tiny feature that does exactly that: a file glob that includes full file contents . In this post, we'll walk through what CopilotChat.nvim offers out of the box, why the missing piece matters, and how to implement a custom #file_glob:<pattern> function to include the contents of all files matching a glob. Using Copilot Chat with Neovim CopilotChat.nvim brings GitHub Copilot's chat right into your editing flow. No context switching, no browser hopping — just type your prompt in a Neovim buffer and let the AI help you refactor code, write tests, or explain tricky functions. You can open the chat (for example) with a command like :CopilotChat , then provide extra context using built-in functions. That “extra context” is where the magic really happens. Built-in functio...