how to use mysql-stress-test.pl code example
Example 1: how to use tap()
content_copy
open_in_new
import { fromEvent } from 'rxjs';
import { tap, map } from 'rxjs/operators';
const clicks = fromEvent(document, 'click');
const positions = clicks.pipe(
tap(ev => console.log(ev)),
map(ev => ev.clientX),
);
positions.subscribe(x => console.log(x));
Example 2: how to use visualize_runtimes
import threading
import multiprocessing
import math
import numpy as np
import time
import matplotlib.pyplot as plt
import glob
from PIL import Image
import random
from random import sample
import string
from concurrent.futures import ThreadPoolExecutor
cpu_workers = 3
nSim = 12
def generate_bar_colors(cpu_workers):
colors = ['red', 'gold', 'royalblue']
return colors
def visualize_runtimes(results, title):
colors = generate_bar_colors(cpu_workers)
plt.rcParams["font.family"] = "Times New Roman"
plt.rcParams['axes.axisbelow'] = True
start,stop = np.array(results).T
plt.barh(range(len(start)),stop-start,left=start, color=colors)
plt.grid(axis='x', color= 'lightgrey')
plt.title("Tasks", rotation='horizontal', fontsize=12, horizontalalignment="left", x=0)
plt.xlabel("Seconds", fontsize=12, horizontalalignment='right', x=1.0)
def multithreading(func, args, workers):
begin_time=time.time()
with ThreadPoolExecutor(max_workers = workers) as executor:
res = executor.map(func, args, [begin_time for i in range (len(args))])
return list(res)
def simulation(i, base):
start = time.time() - base
print(str(threading.current_thread().getName()) + ': '+ str(i))
time.sleep(math.cos(i)+i*0.1+1)
stop = time.time() - base
return start, stop
if __name__ == '__main__':
visualize_runtimes(multithreading(simulation, i, cpu_workers), "Multi-threading")
plt.savefig('foo.png', bbox_inches='tight')
plt.show()