How to connect kafka topic with web endpoint using Faust Python package?
Reading faust web documentation it doesnot seems to handle SSE.
@app.agent
is callbacked when a kafka message is consumed and @app.page
when an http request is processed. Combining them is probably not possible.
A alternative approach using faust.web
is to poll from the javascript.
For instance using :
import faust
from faust.web import Response
app = faust.App("myapp", broker="kafka://kafka:9092", value_serializer="raw")
test_topic = app.topic("test")
app.lastmsg = ""
@app.agent(test_topic)
async def test_topic_agent(stream):
async for value in stream:
app.lastmsg = str(value)
yield value
@app.page("/msg")
async def msg(self, request):
return self.text(app.lastmsg)
@app.page("/")
async def index(self, request):
body = """
<html>
<body>
<script>
setInterval(()=>{
let xhr = new XMLHttpRequest();
xhr.open('GET', '/msg');
xhr.send();
xhr.onload = function() {
if (xhr.status == 200) {
document.getElementById('response').innerText = xhr.response
}
}
},1000);
</script>
<h1>Response from server:</h1>
<div id="response"></div>
</body>
</html>
"""
return self.html(body)
This naive implementation store the kafka message to app.lastmsg
that could be get by the api /msg
.
In order to use SSE you can use asyncio
for the web part and faust
for the kafka consumer.
The Faust worker will also expose a web server on every instance, that by default runs on port 6066.
The server will use the aiohttp HTTP server library and you can take advantage of this thing and create a server-side event streaming (SSE) like in your example code.
You can create an agent that will read from Kafka topic test
and will update a variable last_message_from_topic
with the last message from the topic, this variable will be visible also from your web pages.
In the index page (@app.page('/')
) the EventSource interface is used to receive server-sent events. It connects to the server over HTTP and receives events in text/event-stream format from the page /hello
without closing the connection.
The web page /hello
at every second is sending a message text with the last message from the Kafka topic test
and with the current time from the server.
here is my file my_worker.py
code:
import asyncio
from datetime import datetime
import faust
from aiohttp.web import Response
from aiohttp_sse import sse_response
app = faust.App(
"app1",
broker='kafka://localhost:9092',
value_serializer='json',
)
test_topic = app.topic("test")
last_message_from_topic = ['No messages yet']
@app.agent(test_topic)
async def greet(greetings):
async for greeting in greetings:
last_message_from_topic[0] = greeting
@app.page('/hello')
async def hello(self, request):
loop = request.app.loop
async with sse_response(request) as resp:
while True:
data = f'last message from topic_test: {last_message_from_topic[0]} | '
data += f'Server Time : {datetime.now()}'
print(data)
await resp.send(data)
await asyncio.sleep(1, loop=loop)
return resp
@app.page('/')
async def index(self, request):
d = """
<html>
<body>
<script>
var evtSource = new EventSource("/hello");
evtSource.onmessage = function(e) {
document.getElementById('response').innerText = e.data
}
</script>
<h1>Response from server:</h1>
<div id="response"></div>
</body>
</html>
"""
return Response(text=d, content_type='text/html')
now you have to start the Faust worker with the following command:
faust -A my_worker worker -l info
on your web browser you can access http://localhost:6066/
:
here is the code to send messages to Kafka on the topic test
(from another python file):
import time
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],value_serializer=lambda x: json.dumps(x).encode('utf-8'))
for i in range(220):
time.sleep(1)
producer.send('test', value=f'Some message from kafka id {i}')