Simpy: Моделирование колл-центра — тайм-аут неактивного звонка

в настоящее время я работаю над более или менее сложной симуляцией колл-центра. Я новичок в Simpy, и у меня проблема с тайм-аутом звонков, если ни один агент не может на них ответить.

В моем моделировании я генерирую вызовы в 4 разных очередях. Каждый из них должен иметь свою собственную скорость появления вызовов. Дополнительно у меня есть 4 активных агента. Каждый из них работает с несколькими очередями, но никто не работает со всеми 4 очередями. И каждый из них имеет отдельное распределение времени обработки при работе в определенной очереди.

Кроме того, доступный агент должен не только иметь возможность принять первый доступный вызов в очереди, но и позже я хочу добавить более подробную логику выбора вызова. Таким образом, я добавляю порождающие вызовы в хранилище (впоследствии хранилище фильтров) и вытаскиваю их в функции Consumer_Calls.

Поскольку колл-центр может работать в условиях нехватки персонала (слишком мало операторов), я также хочу смоделировать терпение клиентов. Поэтому я также определил MIN_PATIENCE и MAX_PATIENCE и позволил им прекратить вызов, если агент не может принять его в течение этого временного диапазона.

Мой скрипт выглядит примерно так.

Код обновлен (2023-02-02): теперь работает

from typing import Callable, Generator, List, Union

import numpy as np
import pandas as pd
import simpy

RANDOM_SEED = 42
NUM_AGENTS = 4  # Number of agents in the callcenter
NUM_QUEUES = 2

MIN_PATIENCE = 2 * 60
MAX_PATIENCE = 5 * 60
SIM_DURATION = 8 * 60 * 60

RNG = np.random.default_rng(RANDOM_SEED)

i = 0

# Parse config files for agent performance and queue arrivial times
agents_config = pd.read_csv("queue_agent_mapping.csv")
agents_config["lambda"] = agents_config["lambda"] * 60
agents_config_idx = agents_config.set_index(["agent_id", "queue_id"])

queue_config = pd.read_csv("call_freq.csv")
queue_config["lambda"] = queue_config["lambda"] * 60
queue_config_idx = queue_config.set_index("queue_id")

callcenter_logging = pd.DataFrame(
    {
        "call_id": [],
        "queue_id": [],
        "received_time": [],
        "agent_id": [],
        "start_time": [],
        "end_time": [],
        "status": [],
    }
)
callcenter_logging = callcenter_logging.set_index("call_id")

open_transactions = pd.DataFrame({"call_id": [], "queue_id": [], "received_time": []})
open_transactions = open_transactions.set_index("call_id")

# Get number of agents in simulation from config file
num_agents = agents_config.drop_duplicates(subset = "agent_id")

# Get agent-queue mapping from config file
agents_config_grouped = (
    agents_config.groupby("agent_id").agg({"queue_id": lambda x: list(x)}).reset_index()
)


class Callcenter:
    """Representation of the call center.

    Entry point of the simulation as it starts processes to generate calls and their processing.
    """

    # Variable used to generate unique ids
    call_id = 0

    def __init__(self, env: simpy.Environment):

        self.env = env

    def get_next_call_id(self):
        self.call_id += 1
        return self.call_id

    def run_simulation(self, agents: simpy.FilterStore):
        self.agents = agents
        self.queues = [Queue(env, queue_id=qq) for qq in range(4)]
        self.call_generator = [env.process(queue.generate_calls(self)) for queue in self.queues]
        self.call_accept_consumer = [env.process(queue.consume_calls()) for queue in self.queues]


class Agent:
    """Representation of agents and their global attributes."""

    def __init__(self, agent_id, queue_id):
        self.agent_id = agent_id
        self.allowed_queue_ids = queue_id


class Queue:
    """Representation of call center queues.

    Holds methods to generate and consume calls. Calls are stored in a store.
    """

    def __init__(self, env, queue_id):
        self.env = env

        # Defines the store for calls. Later, a FilterStore should be used. This will enable us to
        # draw calls by special attribute to fine tune the routing.
        self.store = simpy.FilterStore(env)

        self.queue_id = queue_id

        # Get the arrivial distribution of calls in the queue from the initial config.
        self.lam = self._get_customer_arrival_distribution()

    def generate_calls(self, callcenter_instance: Callcenter) -> Generator:
        """Generate the calls.

        Calls are then put to the queues store.
        """
        while True:
            yield self.env.timeout(RNG.poisson(self.lam))

            # Initialize and fill the call object.
            new_call_id = callcenter_instance.get_next_call_id()
            call = Call(queue_id=self.queue_id, call_id=new_call_id, env=env)
            call = call.update_history()
            call.add_open_transaction(status = "active")

            # Write call to logging table
            callcenter_logging.loc[call.call_id, ["queue_id", "received_time"]] = [
                call.queue_id,
                self.env.now,
            ]

            # Put call to the queue store.
            self.put_call(call)

    def consume_calls(self) -> Generator:
        """Draw call from queue store and let agents work on them.

        If no agent is found within MIN_PATIENCE and MAX_PATIENCE, drop the call.
        """
        while True:

            # Wait for available agent or drop the call as customer ran out of patience.
            agent = yield agents.get(lambda ag: self.queue_id in ag.allowed_queue_ids)

            # call = yield self.get_call(lambda ca: ca.status == "active")
            call = yield self.get_call(lambda ca: ca.status == "active")
            if call.received_at + call.max_waiting <= call.env.now:
                print(
                    f"customer hung up call {call.call_id} after waiting {call.max_waiting / 60} minutes."
                )

                # The call did not receive an agent in time and ran out of patience.
                call.status = "dropped"

                callcenter_logging.loc[call.call_id, ["end_time", "status"]] = [
                    call.env.now,
                    call.status,
                ]
                print(
                    f"no agent for {call.call_id} after waiting {(call.env.now - call.received_at) / 60} minutes"
                )

            else:
                # Do some logging that an agent took the call.
                print(
                    f"Agent {agent.agent_id} takes {call.call_id} in queue {call.queue_id} at {call.env.now}."
                )
                callcenter_logging.loc[call.call_id, ["queue_id", "agent_id", "start_time"]] = [
                    call.queue_id,
                    agent.agent_id,
                    call.env.now,
                ]

                # Get average handling time from config dataframe. Change this to an Agent class
                # attribute, later.
                ag_lambda = agents_config_idx.loc[agent.agent_id, call.queue_id]["lambda"]

                yield call.env.timeout(RNG.poisson(ag_lambda))  # Let the agent work on the call.

                call.status = "finished"

                # Do some logging
                callcenter_logging.loc[call.call_id, ["end_time", "status"]] = [
                    call.env.now,
                    call.status,
                ]
                print(
                    f"Agent {agent.agent_id} finishes {call.call_id} in queue {call.queue_id} at {call.env.now}."
                )

                # Put the agent back to the agents store. -> Why is this needed? Shouldn't the
                # context manager handle this? But it did not work without this line.
            yield agents.put(agent)

    def _get_customer_arrival_distribution(self) -> float:
        """Returns the mean call arrivial time in a given queue_id."""
        return queue_config.loc[self.queue_id, "lambda"]

    def get_call(self, filter_func: Callable):
        """Helper function to get calls by complex filters from the queue store."""
        return self.store.get(filter=filter_func)

    def put_call(self, call):
        """Helper function to put calls to the queue store."""
        self.store.put(call)
        print(f"Call {call.call_id} added to queue {call.queue_id} at {call.env.now}")


class Call(Callcenter):
    """Representation of a call with all its."""

    def __init__(self, env: simpy.Environment, call_id: int, queue_id: int):
        self.env = env
        self.call_id = call_id
        self.queue_id = queue_id
        self.agent_id = None
        self.status = "active"
        self.received_at = env.now
        self.max_waiting = RNG.integers(MIN_PATIENCE, MAX_PATIENCE)
        self.history: List[Union[int, str]] = []

    def update_history(self):
        """Helper to update the call history.

        Needed for tracking calls, which changes are transferred from one queue to another.
        Currently this is not in use.
        """
        self.history.append([self.call_id, self.queue_id, self.agent_id, self.status])
        return self

    def add_open_transaction(self, status = "active"):
        """Helper to add a call to the open transactions log.

        Needed for rebalancing logics. Currently this is not in use.
        """
        open_transactions.loc[self.call_id, ["queue_id", "received_time", "status"]] = [
            self.queue_id,
            self.env.now,
            status,
        ]


env = simpy.Environment()

agents = simpy.FilterStore(env, capacity=len(num_agents))
agents.items = [Agent(row.agent_id, row.queue_id) for row in agents_config_grouped.itertuples()]

callcenter = Callcenter(env)
callcenter.run_simulation(agents)
env.run(until=SIM_DURATION)

Это файлы конфигурации.

# call_freq
queue_name,queue_id,lambda
A,0,2
B,1,4
C,2,3.5
D,3,3
# queue_agent_mapping.csv
queue_id,agent_id,lambda
0,abc11,2
0,abc13,5
0,abc14,2
1,abc11,5
1,abc12,3
1,abc14,12
2,abc12,2
2,abc13,3
3,abc14,3

Я установил терпение клиента в пределах от 3 до 6 минут. Тем не менее, если я проверю время ожидания вызова, пока он не будет обслужен, я обнаружу, что многие из них ждут гораздо больше, чем 3-6 минут. Что еще хуже, я также обнаружил, что некоторые звонки сбрасываются по желанию.

callcenter_logging["wait_time"] = callcenter_logging["start_time"] - callcenter_logging["received_time"] 
callcenter_logging["serving_wait"] = callcenter_logging["end_time"] - callcenter_logging["start_time"]
callcenter_logging.head(20)

dropped = (
    callcenter_logging.loc[callcenter_logging["status2"] == "dropped", ["queue_id", "status2", "end_time"]]
)
dropped = (
    dropped.set_index("end_time")
    .groupby(["queue_id"])
    .expanding()["status2"]
    .agg({"cnt_dropped": 'count'})
    .reset_index()
)

fig_waiting = px.line(callcenter_logging, "received_time", "wait_time", color = "queue_id")
fig_waiting = fig_waiting.update_traces(connectgaps=True)
fig_waiting.show()

fig_dropped = px.line(dropped, x = "end_time", y = "cnt_dropped", color = "queue_id")
fig_dropped.show()

сюжет времени ожидания

Сюжет сброса звонков

Я предполагаю, что мои звонки не извлекаются из хранилища очередей в нужное время. Но я не смог понять точную проблему с кодом.

Есть кто-нибудь, у кого есть идея?

Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
1
0
82
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

Время ожидания вашего терпения не зависит от того, как долго вызов уже находится в очереди. Таким образом, если вызов находится в очереди в течение 10 минут при запуске цикла и нет доступных агентов, то общее время ожидания вызова при истечении времени ожидания будет составлять 10 минут плюс env.timeout(RNG.integers(MIN_PATIENCE, МАКСИМАЛЬНОЕ ТЕРПЕНИЕ)).

Вам нужно, чтобы время ожидания было в объекте вызова, и чтобы объект вызова удалялся из очереди или устанавливал флаг срока ожидания, когда время ожидания истекло.

Я бы изменил ваш цикл, чтобы сначала получить агента, и сделал внутренний цикл, чтобы получить вызов, игнорируя вызовы с истекшим флагом.

Привет, Майкл, это звучит как отличная идея. Я внес некоторые изменения, но он все еще не работает. Если я извлекаю вызов из очереди, чтобы запустить таймер тайм-аута, агент больше не может его извлекать и простаивает. Как бы вы порекомендовали мне сделать эти два вызова параллельными? Я обновил пример кода, чтобы он отражал мою новую реализацию.

steMy 02.02.2023 13:55

Я только что понял, что, вероятно, неправильно понял тебя, @Michael. Я думаю, вы имели в виду, что я должен переместить генератор случайных чисел в свойство вызова, определяющее количество времени, в течение которого клиент будет ждать. Затем в потребительской функции я могу получить вызов из хранилища, проверить, есть ли env.now > received_time + max_waiting, и соответствующим образом обработать вызовы. Это ты предложил? Или вы видите какие-либо потенциальные проблемы с этим подходом? Я только что обновил код, еще раз.

steMy 02.02.2023 20:27

Другие вопросы по теме