How to connect kafka topic with web endpoint using Faust Python package?

Reading faust web documentation it doesnot seems to handle SSE.

@app.agent is callbacked when a kafka message is consumed and @app.page when an http request is processed. Combining them is probably not possible.

A alternative approach using faust.web is to poll from the javascript.
For instance using :

import faust
from faust.web import Response

app = faust.App("myapp", broker="kafka://kafka:9092", value_serializer="raw")
test_topic = app.topic("test")
app.lastmsg = ""

@app.agent(test_topic)
async def test_topic_agent(stream):
    async for value in stream:
        app.lastmsg = str(value)
        yield value

@app.page("/msg")
async def msg(self, request):
    return self.text(app.lastmsg)

@app.page("/")
async def index(self, request):
    body = """
        <html>
        <body>
            <script>
                setInterval(()=>{
                    let xhr = new XMLHttpRequest();
                    xhr.open('GET', '/msg');
                    xhr.send();
                    xhr.onload = function() {
                        if (xhr.status == 200) {
                            document.getElementById('response').innerText = xhr.response
                        }
                    }
                },1000);
            </script>
            <h1>Response from server:</h1>
            <div id="response"></div>
        </body>
    </html>
    """
    return self.html(body)

This naive implementation store the kafka message to app.lastmsg that could be get by the api /msg.

In order to use SSE you can use asyncio for the web part and faust for the kafka consumer.


The Faust worker will also expose a web server on every instance, that by default runs on port 6066.

The server will use the aiohttp HTTP server library and you can take advantage of this thing and create a server-side event streaming (SSE) like in your example code.

You can create an agent that will read from Kafka topic test and will update a variable last_message_from_topic with the last message from the topic, this variable will be visible also from your web pages.

In the index page (@app.page('/')) the EventSource interface is used to receive server-sent events. It connects to the server over HTTP and receives events in text/event-stream format from the page /hello without closing the connection.

The web page /hello at every second is sending a message text with the last message from the Kafka topic test and with the current time from the server.

here is my file my_worker.py code:

import asyncio
from datetime import datetime

import faust
from aiohttp.web import Response
from aiohttp_sse import sse_response

app = faust.App(
    "app1",
    broker='kafka://localhost:9092',
    value_serializer='json',
)
test_topic = app.topic("test")

last_message_from_topic = ['No messages yet']


@app.agent(test_topic)
async def greet(greetings):
    async for greeting in greetings:
        last_message_from_topic[0] = greeting


@app.page('/hello')
async def hello(self, request):
    loop = request.app.loop
    async with sse_response(request) as resp:
        while True:
            data = f'last message from topic_test: {last_message_from_topic[0]} | '
            data += f'Server Time : {datetime.now()}'

            print(data)
            await resp.send(data)
            await asyncio.sleep(1, loop=loop)
    return resp


@app.page('/')
async def index(self, request):
    d = """
        <html>
        <body>
            <script>
                var evtSource = new EventSource("/hello");
                evtSource.onmessage = function(e) {
                    document.getElementById('response').innerText = e.data
                }
            </script>
            <h1>Response from server:</h1>
            <div id="response"></div>
        </body>
    </html>
    """
    return Response(text=d, content_type='text/html')

now you have to start the Faust worker with the following command:

faust -A my_worker worker -l info

on your web browser you can access http://localhost:6066/:

enter image description here


here is the code to send messages to Kafka on the topic test (from another python file):

import time
import json

from kafka import  KafkaProducer

producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'))


for i in range(220):
    time.sleep(1)
    producer.send('test', value=f'Some message from kafka id {i}')