why redis pub sub is used code example
Example 1: redis custom pub sub
import IORedis, { Redis } from 'ioredis'
import chalk from 'chalk'
import { Publisher } from '../utils/util.publisher'
import { ISubscriber } from '../interface/interface.subscriber'
export class Subscriber {
private keyTo: string
private keyFrom: string
private uniqueId: string
constructor(config: Readonly<ISubscriber>) {
this.keyTo = config.key
this.keyFrom = Publisher.get().key
this.uniqueId = Publisher.get().unique
}
private redisConnect(): Redis {
const ioRedis = new IORedis({
host: '127.0.0.1',
port: 6379,
maxRetriesPerRequest: 50,
connectTimeout: 5000,
enableReadyCheck: true,
enableAutoPipelining: true
}) as Redis
return ioRedis
}
public async getString(keyName: string): Promise<any> {
if (this.keyTo == this.keyFrom) {
const ioRedis = this.redisConnect() as Redis
const response: string = await ioRedis.get(keyName)
if (response) {
return Promise.resolve(response)
}
return {}
} else {
return Promise.reject(chalk.red(new Error(`invalid key Subscriber: ${this.keyTo} and Publisher: ${this.keyFrom}`)))
}
}
public async getMap(keyName: string): Promise<any> {
if (this.keyTo == this.keyFrom) {
const ioRedis = this.redisConnect() as Redis
const response: Record<string, any> = await ioRedis.hgetall(keyName)
if (response) {
return Promise.resolve(JSON.parse(response.payload))
}
return {}
} else {
return Promise.reject(chalk.red(new Error(`invalid key Subscriber: ${this.keyTo} and Publisher: ${this.keyFrom}`)))
}
}
public async getResponse(): Promise<any> {
const ioRedis = this.redisConnect() as Redis
const getEvent = await ioRedis.get(`response:speaker:${this.uniqueId}`)
const response: Record<string, any> = await ioRedis.hgetall(`${getEvent}`)
if (response) {
return Promise.resolve(JSON.parse(response.response))
}
return {}
}
}
Example 2: redis custom pub sub
import IORedis, { Redis } from 'ioredis'
import { v4 as uuid } from 'uuid'
import { IPublisher } from '../interface/interface.publisher'
export class Publisher {
private static key: string
private static unique: string
constructor(configs: Readonly<IPublisher>) {
Publisher.key = configs.key
Publisher.unique = uuid()
Publisher.set({ key: configs.key, unique: Publisher.unique })
}
public static get(): Record<string, any> {
const options: Record<string, any> = {
key: Publisher.key,
unique: Publisher.unique
}
return options
}
private static set(config: Record<string, any>): void {
Publisher.key = config.key
Publisher.unique = config.unique
}
private redisConnect(): Redis {
const ioRedis = new IORedis({
host: '127.0.0.1',
port: 6379,
maxRetriesPerRequest: 50,
connectTimeout: 5000,
enableReadyCheck: true,
enableAutoPipelining: true
}) as IORedis.Redis
return ioRedis
}
public async setString(keyName: string, data: string): Promise<void> {
const ioRedis = this.redisConnect() as Redis
await ioRedis.set(keyName, data)
await ioRedis.expire(keyName, 3)
}
public async setMap(keyName: string, data: Record<string, any>): Promise<void> {
const ioRedis = this.redisConnect() as Redis
await ioRedis.hset(keyName, { payload: JSON.stringify(data) })
await ioRedis.expire(keyName, 3)
}
public async setResponse(keyName: string, data: Record<string, any>): Promise<void> {
const ioRedis = this.redisConnect() as Redis
await ioRedis.setex(`response:speaker:${Publisher.get().unique}`, 1, `response:speaker:${keyName}`)
await ioRedis.hset(`response:speaker:${keyName}`, { response: JSON.stringify(data) })
await ioRedis.expire(`response:speaker:${keyName}`, 3)
}
}