how to use mysql-stress-test.pl code example

Example 1: how to use tap()

content_copy
      
      
        open_in_new
      
      import { fromEvent } from 'rxjs';
import { tap, map } from 'rxjs/operators';

const clicks = fromEvent(document, 'click');
const positions = clicks.pipe(
  tap(ev => console.log(ev)),
  map(ev => ev.clientX),
);
positions.subscribe(x => console.log(x));

Example 2: how to use visualize_runtimes

import threading
import multiprocessing
import math
import numpy as np
import time
import matplotlib.pyplot as plt
import glob
from PIL import Image
import random
from random import sample
import string
from concurrent.futures import ThreadPoolExecutor


cpu_workers = 3
nSim = 12

def generate_bar_colors(cpu_workers):
    colors = ['red', 'gold', 'royalblue']
    return colors

def visualize_runtimes(results, title):
    colors = generate_bar_colors(cpu_workers)
    plt.rcParams["font.family"] = "Times New Roman"
    plt.rcParams['axes.axisbelow'] = True
    start,stop = np.array(results).T
    plt.barh(range(len(start)),stop-start,left=start, color=colors)
    plt.grid(axis='x', color= 'lightgrey')
    plt.title("Tasks", rotation='horizontal', fontsize=12, horizontalalignment="left", x=0)
    plt.xlabel("Seconds", fontsize=12, horizontalalignment='right', x=1.0)

def multithreading(func, args, workers):
    begin_time=time.time()
    with ThreadPoolExecutor(max_workers = workers) as executor:
        res = executor.map(func, args, [begin_time for i in range (len(args))])
    return list(res)

def simulation(i, base):
    start = time.time() - base
    print(str(threading.current_thread().getName()) + ': '+ str(i))
    time.sleep(math.cos(i)+i*0.1+1)
    stop = time.time() - base
    return start, stop


if __name__ == '__main__':
    visualize_runtimes(multithreading(simulation, i, cpu_workers), "Multi-threading")
    plt.savefig('foo.png', bbox_inches='tight')

plt.show()