from concurrent.futures import ProcessPoolExecutor, as_completed from dataclasses import dataclass import numpy as np import polars as pl from tqdm import tqdm from simulation.lib import Process, Simulation def run_simulation(n_servers=2, lam=40): @dataclass class Customer: arrived_at: float started_at: float | None = None handling_time: float | None = None finished_at: float | None = None customers_unhandled: list[Customer] = [] customers_in_progress: list[Customer] = [] customers_handled: list[Customer] = [] class Server(Process): def run(self): while True: if len(customers_unhandled) > 0: customer = customers_unhandled[0] customers_unhandled.remove(customer) customers_in_progress.append(customer) customer.started_at = self.simulation.clock handling_time = float(max(0, np.random.normal(5, 2.5, size=1)[0])) yield from self.hold(handling_time) customer.handling_time = handling_time customer.finished_at = self.simulation.clock customers_in_progress.remove(customer) customers_handled.append(customer) self.simulation.log( f"unhandled customers: {len(customers_unhandled)}" ) else: yield from self.suspend() class CustomerGenerator(Process): def __init__(self, lam=40, servers=[]): self.lam = lam self.servers = servers super().__init__() def run(self): while True: wait_for = float( max(0, (np.random.poisson(lam=self.lam, size=1) / 10)[0]) ) customer = Customer(arrived_at=self.simulation.clock) customers_unhandled.append(customer) for server in self.servers: server.resume() yield from self.hold(wait_for) servers = [Server() for _ in range(n_servers)] simulation = Simulation( processes=[CustomerGenerator(servers=servers, lam=lam), *servers], enable_logging=False, ) simulation.start() df_customers = pl.DataFrame( [customer.__dict__ for customer in customers_handled] ).with_columns(queue_time=pl.col("started_at") - pl.col("arrived_at")) df_servers = pl.DataFrame([server.__dict__ for server in servers]) utilization = { f"utilization_server_{key}": value for key, value in dict( zip(df_servers["id"].to_list(), df_servers["utilization"].to_list()) ).items() } return { "n_servers": n_servers, "lam": lam, "queue_time": df_customers["queue_time"].mean(), "handling_time": df_customers["handling_time"].mean(), **utilization, } with ProcessPoolExecutor(max_workers=4) as executor: n_servers = [n + 1 for n in range(10)] lam = [(n + 1) * 4 for n in range(10)] parameter_values = [ {"n_servers": n, "lam": m} for n in n_servers for m in lam for _ in range(10) ] futures = [ executor.submit(run_simulation, **parameters) for parameters in parameter_values ] results = [] for future in tqdm(as_completed(futures), total=len(parameter_values)): result = future.result() results.append(result) df = ( pl.DataFrame(results) .fill_null(0) .group_by(["n_servers", "lam"]) .agg(pl.all().mean()) .sort(["n_servers", "lam"]) ) df_queue_time = df.pivot("lam", index="n_servers", values="queue_time").sort( "n_servers" ) def stats(column): return ( df.pivot("lam", index="n_servers", values=column) .sort("n_servers") .with_columns(pl.all().round(2)) ) pl.Config.set_tbl_cols(20) print(df) print(stats("queue_time")) print(stats("utilization_server_1"))