в настоящее время я работаю над более или менее сложной симуляцией колл-центра. Я новичок в Simpy, и у меня проблема с тайм-аутом звонков, если ни один агент не может на них ответить.
В моем моделировании я генерирую вызовы в 4 разных очередях. Каждый из них должен иметь свою собственную скорость появления вызовов. Дополнительно у меня есть 4 активных агента. Каждый из них работает с несколькими очередями, но никто не работает со всеми 4 очередями. И каждый из них имеет отдельное распределение времени обработки при работе в определенной очереди.
Кроме того, доступный агент должен не только иметь возможность принять первый доступный вызов в очереди, но и позже я хочу добавить более подробную логику выбора вызова. Таким образом, я добавляю порождающие вызовы в хранилище (впоследствии хранилище фильтров) и вытаскиваю их в функции Consumer_Calls.
Поскольку колл-центр может работать в условиях нехватки персонала (слишком мало операторов), я также хочу смоделировать терпение клиентов. Поэтому я также определил MIN_PATIENCE и MAX_PATIENCE и позволил им прекратить вызов, если агент не может принять его в течение этого временного диапазона.
Мой скрипт выглядит примерно так.
from typing import Callable, Generator, List, Union
import numpy as np
import pandas as pd
import simpy
RANDOM_SEED = 42
NUM_AGENTS = 4 # Number of agents in the callcenter
NUM_QUEUES = 2
MIN_PATIENCE = 2 * 60
MAX_PATIENCE = 5 * 60
SIM_DURATION = 8 * 60 * 60
RNG = np.random.default_rng(RANDOM_SEED)
i = 0
# Parse config files for agent performance and queue arrivial times
agents_config = pd.read_csv("queue_agent_mapping.csv")
agents_config["lambda"] = agents_config["lambda"] * 60
agents_config_idx = agents_config.set_index(["agent_id", "queue_id"])
queue_config = pd.read_csv("call_freq.csv")
queue_config["lambda"] = queue_config["lambda"] * 60
queue_config_idx = queue_config.set_index("queue_id")
callcenter_logging = pd.DataFrame(
{
"call_id": [],
"queue_id": [],
"received_time": [],
"agent_id": [],
"start_time": [],
"end_time": [],
"status": [],
}
)
callcenter_logging = callcenter_logging.set_index("call_id")
open_transactions = pd.DataFrame({"call_id": [], "queue_id": [], "received_time": []})
open_transactions = open_transactions.set_index("call_id")
# Get number of agents in simulation from config file
num_agents = agents_config.drop_duplicates(subset = "agent_id")
# Get agent-queue mapping from config file
agents_config_grouped = (
agents_config.groupby("agent_id").agg({"queue_id": lambda x: list(x)}).reset_index()
)
class Callcenter:
"""Representation of the call center.
Entry point of the simulation as it starts processes to generate calls and their processing.
"""
# Variable used to generate unique ids
call_id = 0
def __init__(self, env: simpy.Environment):
self.env = env
def get_next_call_id(self):
self.call_id += 1
return self.call_id
def run_simulation(self, agents: simpy.FilterStore):
self.agents = agents
self.queues = [Queue(env, queue_id=qq) for qq in range(4)]
self.call_generator = [env.process(queue.generate_calls(self)) for queue in self.queues]
self.call_accept_consumer = [env.process(queue.consume_calls()) for queue in self.queues]
class Agent:
"""Representation of agents and their global attributes."""
def __init__(self, agent_id, queue_id):
self.agent_id = agent_id
self.allowed_queue_ids = queue_id
class Queue:
"""Representation of call center queues.
Holds methods to generate and consume calls. Calls are stored in a store.
"""
def __init__(self, env, queue_id):
self.env = env
# Defines the store for calls. Later, a FilterStore should be used. This will enable us to
# draw calls by special attribute to fine tune the routing.
self.store = simpy.FilterStore(env)
self.queue_id = queue_id
# Get the arrivial distribution of calls in the queue from the initial config.
self.lam = self._get_customer_arrival_distribution()
def generate_calls(self, callcenter_instance: Callcenter) -> Generator:
"""Generate the calls.
Calls are then put to the queues store.
"""
while True:
yield self.env.timeout(RNG.poisson(self.lam))
# Initialize and fill the call object.
new_call_id = callcenter_instance.get_next_call_id()
call = Call(queue_id=self.queue_id, call_id=new_call_id, env=env)
call = call.update_history()
call.add_open_transaction(status = "active")
# Write call to logging table
callcenter_logging.loc[call.call_id, ["queue_id", "received_time"]] = [
call.queue_id,
self.env.now,
]
# Put call to the queue store.
self.put_call(call)
def consume_calls(self) -> Generator:
"""Draw call from queue store and let agents work on them.
If no agent is found within MIN_PATIENCE and MAX_PATIENCE, drop the call.
"""
while True:
# Wait for available agent or drop the call as customer ran out of patience.
agent = yield agents.get(lambda ag: self.queue_id in ag.allowed_queue_ids)
# call = yield self.get_call(lambda ca: ca.status == "active")
call = yield self.get_call(lambda ca: ca.status == "active")
if call.received_at + call.max_waiting <= call.env.now:
print(
f"customer hung up call {call.call_id} after waiting {call.max_waiting / 60} minutes."
)
# The call did not receive an agent in time and ran out of patience.
call.status = "dropped"
callcenter_logging.loc[call.call_id, ["end_time", "status"]] = [
call.env.now,
call.status,
]
print(
f"no agent for {call.call_id} after waiting {(call.env.now - call.received_at) / 60} minutes"
)
else:
# Do some logging that an agent took the call.
print(
f"Agent {agent.agent_id} takes {call.call_id} in queue {call.queue_id} at {call.env.now}."
)
callcenter_logging.loc[call.call_id, ["queue_id", "agent_id", "start_time"]] = [
call.queue_id,
agent.agent_id,
call.env.now,
]
# Get average handling time from config dataframe. Change this to an Agent class
# attribute, later.
ag_lambda = agents_config_idx.loc[agent.agent_id, call.queue_id]["lambda"]
yield call.env.timeout(RNG.poisson(ag_lambda)) # Let the agent work on the call.
call.status = "finished"
# Do some logging
callcenter_logging.loc[call.call_id, ["end_time", "status"]] = [
call.env.now,
call.status,
]
print(
f"Agent {agent.agent_id} finishes {call.call_id} in queue {call.queue_id} at {call.env.now}."
)
# Put the agent back to the agents store. -> Why is this needed? Shouldn't the
# context manager handle this? But it did not work without this line.
yield agents.put(agent)
def _get_customer_arrival_distribution(self) -> float:
"""Returns the mean call arrivial time in a given queue_id."""
return queue_config.loc[self.queue_id, "lambda"]
def get_call(self, filter_func: Callable):
"""Helper function to get calls by complex filters from the queue store."""
return self.store.get(filter=filter_func)
def put_call(self, call):
"""Helper function to put calls to the queue store."""
self.store.put(call)
print(f"Call {call.call_id} added to queue {call.queue_id} at {call.env.now}")
class Call(Callcenter):
"""Representation of a call with all its."""
def __init__(self, env: simpy.Environment, call_id: int, queue_id: int):
self.env = env
self.call_id = call_id
self.queue_id = queue_id
self.agent_id = None
self.status = "active"
self.received_at = env.now
self.max_waiting = RNG.integers(MIN_PATIENCE, MAX_PATIENCE)
self.history: List[Union[int, str]] = []
def update_history(self):
"""Helper to update the call history.
Needed for tracking calls, which changes are transferred from one queue to another.
Currently this is not in use.
"""
self.history.append([self.call_id, self.queue_id, self.agent_id, self.status])
return self
def add_open_transaction(self, status = "active"):
"""Helper to add a call to the open transactions log.
Needed for rebalancing logics. Currently this is not in use.
"""
open_transactions.loc[self.call_id, ["queue_id", "received_time", "status"]] = [
self.queue_id,
self.env.now,
status,
]
env = simpy.Environment()
agents = simpy.FilterStore(env, capacity=len(num_agents))
agents.items = [Agent(row.agent_id, row.queue_id) for row in agents_config_grouped.itertuples()]
callcenter = Callcenter(env)
callcenter.run_simulation(agents)
env.run(until=SIM_DURATION)
Это файлы конфигурации.
# call_freq
queue_name,queue_id,lambda
A,0,2
B,1,4
C,2,3.5
D,3,3
# queue_agent_mapping.csv
queue_id,agent_id,lambda
0,abc11,2
0,abc13,5
0,abc14,2
1,abc11,5
1,abc12,3
1,abc14,12
2,abc12,2
2,abc13,3
3,abc14,3
Я установил терпение клиента в пределах от 3 до 6 минут. Тем не менее, если я проверю время ожидания вызова, пока он не будет обслужен, я обнаружу, что многие из них ждут гораздо больше, чем 3-6 минут. Что еще хуже, я также обнаружил, что некоторые звонки сбрасываются по желанию.
callcenter_logging["wait_time"] = callcenter_logging["start_time"] - callcenter_logging["received_time"]
callcenter_logging["serving_wait"] = callcenter_logging["end_time"] - callcenter_logging["start_time"]
callcenter_logging.head(20)
dropped = (
callcenter_logging.loc[callcenter_logging["status2"] == "dropped", ["queue_id", "status2", "end_time"]]
)
dropped = (
dropped.set_index("end_time")
.groupby(["queue_id"])
.expanding()["status2"]
.agg({"cnt_dropped": 'count'})
.reset_index()
)
fig_waiting = px.line(callcenter_logging, "received_time", "wait_time", color = "queue_id")
fig_waiting = fig_waiting.update_traces(connectgaps=True)
fig_waiting.show()
fig_dropped = px.line(dropped, x = "end_time", y = "cnt_dropped", color = "queue_id")
fig_dropped.show()
Я предполагаю, что мои звонки не извлекаются из хранилища очередей в нужное время. Но я не смог понять точную проблему с кодом.
Есть кто-нибудь, у кого есть идея?
Время ожидания вашего терпения не зависит от того, как долго вызов уже находится в очереди. Таким образом, если вызов находится в очереди в течение 10 минут при запуске цикла и нет доступных агентов, то общее время ожидания вызова при истечении времени ожидания будет составлять 10 минут плюс env.timeout(RNG.integers(MIN_PATIENCE, МАКСИМАЛЬНОЕ ТЕРПЕНИЕ)).
Вам нужно, чтобы время ожидания было в объекте вызова, и чтобы объект вызова удалялся из очереди или устанавливал флаг срока ожидания, когда время ожидания истекло.
Я бы изменил ваш цикл, чтобы сначала получить агента, и сделал внутренний цикл, чтобы получить вызов, игнорируя вызовы с истекшим флагом.
Я только что понял, что, вероятно, неправильно понял тебя, @Michael. Я думаю, вы имели в виду, что я должен переместить генератор случайных чисел в свойство вызова, определяющее количество времени, в течение которого клиент будет ждать. Затем в потребительской функции я могу получить вызов из хранилища, проверить, есть ли env.now > received_time + max_waiting
, и соответствующим образом обработать вызовы. Это ты предложил? Или вы видите какие-либо потенциальные проблемы с этим подходом? Я только что обновил код, еще раз.
Привет, Майкл, это звучит как отличная идея. Я внес некоторые изменения, но он все еще не работает. Если я извлекаю вызов из очереди, чтобы запустить таймер тайм-аута, агент больше не может его извлекать и простаивает. Как бы вы порекомендовали мне сделать эти два вызова параллельными? Я обновил пример кода, чтобы он отражал мою новую реализацию.