How to use RxJs with Socket.IO on event
Simply use fromEvent()
. Here is a full example in Node.js but works the same in browser. Note that i use first()
and takeUntil()
to prevent a memory leak: first()
only listens to one event and then completes. Now use takeUntil()
on all other socket-events you listen to so the observables complete on disconnect:
const app = require('express')();
const server = require('http').createServer(app);
const io = require('socket.io')(server);
const Rx = require('rxjs/Rx');
connection$ = Rx.Observable.fromEvent(io, 'connection');
connection$.subscribe(socket => {
console.log(`Client connected`);
// Observables
const disconnect$ = Rx.Observable.fromEvent(socket, 'disconnect').first();
const message$ = Rx.Observable.fromEvent(socket, 'message').takeUntil(disconnect$);
// Subscriptions
message$.subscribe(data => {
console.log(`Got message from client with data: ${data}`);
io.emit('message', data); // Emit to all clients
});
disconnect$.subscribe(() => {
console.log(`Client disconnected`);
})
});
server.listen(3000);
I have experienced some strange issues using the fromEvent method, so I prefer just to create my own Observable:
function RxfromIO (io, eventName) {
return Rx.Observable.create(observer => {
io.on(eventName, (data) => {
observer.onNext(data)
});
return {
dispose : io.close
}
});
I can then use like this:
let $connection = RxfromIO(io, 'connection');
You can create an Observable like so:
var senses = Rx.Observable.fromEventPattern(
function add (h) {
socket.on('sense',h);
}
);
Then use senses
like any other Observable.
You can use Rx.Observable.fromEvent
(https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/fromevent.md).
Here's how I did a similar thing using Bacon.js, which has a very similar API: https://github.com/raimohanska/bacon-minsk-2015/blob/gh-pages/server.js#L13
So in Bacon.js it would go like
io.on('connection', function(socket){
Bacon.fromEvent(socket, "sense")
.filter(function(data) { return true })
.forEach(function(data) { dealWith(data) })
})
And in RxJs you'd replace Bacon.fromEvent
with Rx.Observable.fromEvent
.