Kafka with PHP (Symfony)
I was looking for a simple documentation on using Kafka with PHP. I found a few but they were not simple enough for me, as they try to bundle everything (producer and consumer) in one code base. Plus, they assume I have Zookeeper installed on the same machine as Kafka, which I don't.
So I decided to write my own.
In this example, I have a producer and a consumer, each in its own code base, and zookeeper and kafka running in separate Docker containers.
Docker network
First, let's create a Docker network for our containers to communicate with each other.
docker network create --subnet=172.18.0.0/29 --gateway=172.18.0.1 messaging-network
Zookeper container
Kafka should go with Zookeeper, so let's create a Zookeeper container.
FROM archlinux:latest
# Steps from https://www.tutorialspoint.com/apache_kafka/apache_kafka_installation_steps.htm
# Install OpenJDK 11
RUN pacman -Syu --noconfirm
RUN pacman -S wget curl --noconfirm
# Download OpenJDK 11
RUN wget https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
# Extract OpenJDK 11
RUN tar -xzf openjdk-11.0.2_linux-x64_bin.tar.gz
# Set JAVA_HOME
ENV JAVA_HOME=/jdk-11.0.2
# Set PATH
ENV PATH=$PATH:/jdk-11.0.2/bin
RUN wget https://downloads.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
RUN tar -xzf apache-zookeeper-3.8.1-bin.tar.gz
# Configure Zookeeper
RUN echo "tickTime=2000" > apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "dataDir=/tmp/zookeeper" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "clientPort=2181" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "clientPortAddress=0.0.0.0" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "initLimit=5" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
RUN echo "syncLimit=2" >> apache-zookeeper-3.8.1-bin/conf/zoo.cfg
# Update PATH with zookeeper
ENV PATH=$PATH:/apache-zookeeper-3.8.1-bin/bin
# Set entrypoint to start zookeeper
ENTRYPOINT ["zkServer.sh", "start-foreground"]
After building it, we are going to run it with:
docker run --name zookeeper --network messaging-network --publish 172.18.0.1:2181:2181 rakotomandimby/zookeeper:3.8.1
Kafka container
Then lets create a Kafka container.
FROM archlinux:latest
# Steps from https://www.tutorialspoint.com/apache_kafka/apache_kafka_installation_steps.htm
# Install OpenJDK 11
RUN pacman -Syu --noconfirm
RUN pacman -S wget curl --noconfirm
# Download OpenJDK 11
RUN wget https://download.java.net/java/GA/jdk11/9/GPL/openjdk-11.0.2_linux-x64_bin.tar.gz
# Extract OpenJDK 11
RUN tar -xzf openjdk-11.0.2_linux-x64_bin.tar.gz
# Set JAVA_HOME
ENV JAVA_HOME=/jdk-11.0.2
# Set PATH
ENV PATH=$PATH:/jdk-11.0.2/bin
# Download Kafka
RUN wget https://downloads.apache.org/kafka/3.4.0/kafka_2.13-3.4.0.tgz
# Extract Kafka
RUN tar -xzf kafka_2.13-3.4.0.tgz
# Set KAFKA_HOME
ENV KAFKA_HOME=/kafka_2.13-3.4.0
# Set PATH
ENV PATH=$PATH:/kafka_2.13-3.4.0/bin
#Copy the server.properties file
COPY ./server.properties /kafka_2.13-3.4.0/config/server.properties
RUN find /tmp
RUN cat /kafka_2.13-3.4.0/config/server.properties
COPY ./entrypoint.sh /entrypoint.sh
RUN chmod 755 /entrypoint.sh
# Set the Entry Point to start Kafka Server
ENTRYPOINT ["/entrypoint.sh"]
The entrypoint.sh file is as follows:
#!/bin/bash
cd /
/kafka_2.13-3.4.0/bin/kafka-server-start.sh /kafka_2.13-3.4.0/config/server.properties --override zookeeper.connect=zookeeper:2181
I really had to put the --override zookeeper.connect=zookeeper:2181
because otherwise it would not connect to zookeeper.
The server.properties file is as follows:
broker.id=7
listeners=PLAINTEXT://:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=zookeeper:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
After building it, we are going to run it with:
docker run --name kafka --network messaging-network --publish 172.18.0.1:9092:9092 rakotomandimby/kafka:3.4.0
Symfony producer
Note that it requires to have some PHP extensions on the host machine, so we need to install them (I use Archlinux)
sudo pacman -S librdkafka
yay -S php-rdkafka
On Ubuntu or Fedora systems, you might install them differently.
We are now creating a Symfony Web application, that will get the message from POSTed data, and produce that message to Kafka. We start by creating the application then we install the needed bundles.
symfony new symfony-producer
cd symfony-producer
composer require enqueue/enqueue enqueue/rdkafka
We need to create a Controller that will handle the POST request, and produce that message to Kafka (Yes, it's dirty code, but it's just for testing purposes).
namespace App\Controller;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Routing\Annotation\Route;
class DefaultController extends AbstractController{
/**
* @Route("/produce/{hostName}/{queueName}", name="default")
*/
public function default(Request $request, String $queueName, String $hostName): Response
{
$connectionFactory = new RdKafkaConnectionFactory([
'global' => [
'metadata.broker.list' => $hostName.':9092'
],
'topic' => [],
]);
$context = $connectionFactory->createContext();
$content = $request->getContent();
$message = $context->createMessage($content);
$fooQueue = $context->createQueue($queueName);
$context->createProducer()->send($fooQueue, $message);
return $this->json(['message' => $content,'queue' => $queueName]);
}
}
Not related to the topic, but I use to launch my Symfony applications with the following command:
symfony server:start --no-tls
With that Symfony application, if we POST some data to , /produce/172.18.0.1/foo, it will produce the message to the queue named "foo" (with creation if it doesnt exist), on the Kafka broker located in the "172.18.0.1" IP address.
Consumer 1
A consumer doesn't have to be a Symfony application (neither any HTTP application), it can be a simple PHP script.
That PHP script will connect to Kafka and will block there, and consume the messages from the queue.
It is important to note that the script will block there, so it will not be able to handle any other request.
A consumer must have a unique group name.
The consumer will need some bundles first:
composer require enqueue/rdkafka enqueue/simple-client
The consumer will be as follows:
require __DIR__.'/vendor/autoload.php';
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Interop\Queue\Message;
use Interop\Queue\Processor;
use Enqueue\Consumption\QueueConsumer;
$groupId = $argv[1];
$kafkaIp = $argv[2];
$queueName = $argv[3];
$connectionFactory = new RdKafkaConnectionFactory([
'global' => [
'group.id' => $groupId,
'metadata.broker.list' => $kafkaIp.':9092',
'enable.auto.commit' => 'false',
],
'topic' => [
'auto.offset.reset' => 'beginning',
],
]);
$context = $connectionFactory->createContext();
$queueConsumer = new QueueConsumer($context);
$queueConsumer->bindCallback($queueName, function(Message $message) {
echo $message->getBody();
return Processor::ACK;
});
$queueConsumer->consume();
We can run it with:
php ./consumer.php bbbb 172.18.0.1 foo
And as I said, it will block there and will not give the prompt back
Consumer 2
To have another consumer, we just need to run a copy of the script above with another group name
php ./consumer.php aaaa 172.18.0.1 foo
And as I said, it will block there and will not give the prompt back
Play
If we POST some data to , /produce/172.18.0.1/foo, we will see all the messages in the consumers terminals.