Как я могу открыть и закрыть поток с помощью Python Socket Server?

У меня есть простой сервер сокетов Python, и я хочу, чтобы мой клиент мог открывать/закрывать поток в течение нескольких соединений (т. е. я не хочу, чтобы мое соединение клиент-сервер оставалось открытым между запросами). Упрощенная версия сервера выглядит так:

_bottle_thread = None # I have tried this and a class variable

class CalibrationServer(socketserver.BaseRequestHandler):

    allow_reuse_address = True
    request_params = None
    response = None

    bottle_thread = None

    def handle(self):
        self.data = self.request.recv(1024).strip()
        self.request_params = str(self.data.decode('utf-8')).split(" ")
        method = self.request_params[0]

        if method == "start": self.handle_start()
        elif method == "stop": self.handle_stop()
        else: self.response = "ERROR: Unknown request"

        self.request.sendall(self.response.encode('utf-8'))


    def handle_start(self):
        try:
            bottle = self.request_params[1]
            _bottle_thread = threading.Thread(target=start, args=(bottle,))
            _bottle_thread.start()
            self.response = "Ran successfully"
            print(_bottle_thread.is_alive(), _bottle_thread)
        except Exception as e:
            self.response = f"ERROR: Failed to unwrap: {e}"

    def handle_stop(self):
        print(_bottle_thread)
        if _bottle_thread and _bottle_thread.is_alive():
            _bottle_thread.join()  # Wait for the thread to finish
            self.response = "Thread stopped successfully"
        else:
            self.response = "No active thread to stop"



if __name__ == "__main__":
    HOST, PORT = LOCAL_IP, CAL_PORT

    with socketserver.TCPServer((HOST, PORT), CalibrationServer) as server:
        server.serve_forever()

Процесс начинается нормально, но когда мое соединение с сервером закрывается, я больше не могу получить доступ к _bottle_thread. Он просто повторно инициализируется как None. У меня есть только один главный компьютер, взаимодействующий с сервером, и мне понадобится только один экземпляр start. Я пробовал использовать переменные класса для его хранения и использовать глобальные переменные. Я также пробовал использовать TCP-серверы Threading и Forking. Как я могу получить доступ к этой теме, чтобы закрыть ее? Нужно ли мне его изменить, чтобы соединение всегда было открыто? Есть ли другой способ решить эту проблему? Я хочу использовать сервер, чтобы мне было легче контролировать этот процесс, потому что он будет работать на 8 разных компьютерах одновременно, но я полностью открыт для других идей (я пробовал Ansible, у меня это не сработало). Спасибо!

Обновлено:

Вот мой клиентский код:

HOST, PORT = LOCAL_IP, CAL_PORT
data = " ".join(sys.argv[1:])

with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
    sock.connect((HOST, PORT))
    sock.sendall(bytes(data + "\n", "utf-8"))
    received = str(sock.recv(1024), "utf-8")

print("Sent:     {}".format(data))
print("Received: {}".format(received))

Он открывает, отправляет и закрывает соединение с сервером с помощью нескольких простых аргументов.

Вот мой консольный вывод от клиента:

(venv) path$ python cal_client.py start argument
Sent:     start argument
Received: Ran successfully
(venv) path$ python cal_client.py stop
Sent:     stop
Received: No active thread to stop

и мой сервер:

Received from 127.0.0.1:
['start', 'argument']
Initializing
True <Thread(Thread-1, started 8236592650235098)>
Received from 127.0.0.1:
['stop']
None # the thread is showing None

Из вашего описания очень неясно, что вы пытаетесь сделать. Вы говорите: «Когда мое соединение с сервером закрывается, я больше не могу получить доступ к _bottle_thread», что звучит так, будто я/мой в данном случае является клиентом, но _bottle_thread находится на сервере. Кроме того, вы не показываете ни функцию start (цель вашего вызова Thread), ни вывод, отображаемый, когда это происходит. И как вы к нему подключаетесь (где код клиента)? Просто невозможно угадать, что происходит с предоставленной вами информацией.

Gil Hamilton 26.04.2024 00:32

Да, «мое» соединение относится к соединению между клиентом и сервером. Когда новый клиент устанавливает новое соединение, я теряю доступ к потоку изнутри сервера. Я добавил более подробную информацию в пост. Спасибо!

aaaaaaaaaron_g 26.04.2024 17:52
Почему в Python есть оператор "pass"?
Почему в Python есть оператор "pass"?
Оператор pass в Python - это простая концепция, которую могут быстро освоить даже новички без опыта программирования.
Некоторые методы, о которых вы не знали, что они существуют в Python
Некоторые методы, о которых вы не знали, что они существуют в Python
Python - самый известный и самый простой в изучении язык в наши дни. Имея широкий спектр применения в области машинного обучения, Data Science,...
Основы Python Часть I
Основы Python Часть I
Вы когда-нибудь задумывались, почему в программах на Python вы видите приведенный ниже код?
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
LeetCode - 1579. Удаление максимального числа ребер для сохранения полной проходимости графа
Алиса и Боб имеют неориентированный граф из n узлов и трех типов ребер:
Оптимизация кода с помощью тернарного оператора Python
Оптимизация кода с помощью тернарного оператора Python
И последнее, что мы хотели бы показать вам, прежде чем двигаться дальше, это
Советы по эффективной веб-разработке с помощью Python
Советы по эффективной веб-разработке с помощью Python
Как веб-разработчик, Python может стать мощным инструментом для создания эффективных и масштабируемых веб-приложений.
0
2
63
2
Перейти к ответу Данный вопрос помечен как решенный

Ответы 2

Ответ принят как подходящий

Ах. Я сначала пропустил, что происходит...

Проблема в том, что когда вы назначаете _bottle_thread, вы не назначаете его глобальной версии (или даже его версии класса).

Будет поучительно запустить этот короткий пример:

_bottle_thread = "Global"

class C:
    _bottle_thread = "Class"

    def meth(self):
        _bottle_thread = "Method"
        print(f"Method {_bottle_thread}")    

c = C()
c.meth()

print(f"Global: {_bottle_thread}")
print(f"Class: {c._bottle_thread}")

Результат:

Method Method
Global: Global
Class: Class

В результате вы получаете три разные версии _bottle_thread из-за правил области видимости Python: глобальную, которую вы получили, назначив ее вне всех других областей, классовую, которую вы получили, назначив ее в определении класса, и локальный, который вы получили, присвоив ему внутри метода. (Обратите внимание, что невозможно напечатать локальный _bottle_thread, назначенный внутри meth - то есть снаружи метода - поскольку его счетчик ссылок становится равным 0, как только метод завершается, и затем он будет уничтожен.)

Вам нужно будет использовать ключевое слово global (https://docs.python.org/3/reference/simple_stmts.html#the-global-statement) в каждой области, в которой вы хотите, чтобы глобальный объект был виден:

def meth(self):
    global _bottle_thread
    _bottle_thread = "Method"

Лучшее решение — просто сохранить переменную _bottle_thread как переменную экземпляра:

class CalibrationServer(socketserver.BaseRequestHandler):
    ...
    def __init__(self):
        # (Not strictly necessary but considered best practice to 
        # initialize instance variables in the constructor)
        self._bottle_thread = None 
        

    def handle_start(self):
        ...
        self._bottle_thread = threading.Thread(target=start, args=(bottle,))
        ...
    def handle_stop(self):
        if self._bottle_thread and self._bottle_thread.is_alive():
             ...

Сохраняйте поток бутылки как состояние сервера и управляйте потоком с помощью флага в состоянии потока.

Также обратите внимание, что TCP не является протоколом на основе сообщений. Это просто поток байтов, поэтому вам необходимо предоставить протокол, чтобы знать, где начинаются и заканчиваются сообщения. socketserver.StreamRequestHandler предоставляет файловые оболочки rfile и wfile для потока, которые буферизуют данные и позволяют .readline() работать. Я использовал их ниже, чтобы рассматривать сообщения как завершающиеся новой строкой:

сервер.py

import threading
import socketserver
import time

class Bottle(threading.Thread):

    def __init__(self, bottle):
        super().__init__(daemon=True)  # So thread dies on main thread halt (ctrl-C)
        # Thread state variables
        self.bottle = bottle
        self.running = True

    def run(self):
        while self.running:
            print(self.bottle)
            time.sleep(.5)

class Server(socketserver.TCPServer):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        # Server state variable
        self.bottle_thread = None

class Handler(socketserver.StreamRequestHandler):

    def handle(self):
        line = self.rfile.readline()
        if not line: return  # client closed socket
        self.request = line.strip().decode().split(' ')
        method = self.request[0]
        if method == 'start':
            self.handle_start()
        elif method == 'stop':
            self.handle_stop()
        else:
            self.response = 'ERROR: Unknown request'

        self.wfile.write(self.response.encode() + b'\n')
        self.wfile.flush()  # makefile wrappers are buffered, flush ensures all is sent

    def handle_start(self):
        if self.server.bottle_thread is None:
            if len(self.request) > 1:
                bottle = self.request[1]
                self.server.bottle_thread = Bottle(bottle)
                self.server.bottle_thread.start()
                self.response = 'Thread running'
            else:
                self.response = 'ERROR: missing parameter'
        else:
            self.response = 'Thread already running'

    def handle_stop(self):
        if self.server.bottle_thread is not None:
            self.server.bottle_thread.running = False
            self.server.bottle_thread.join()  # Wait for the thread to finish
            self.server.bottle_thread = None
            self.response = 'Thread stopped'
        else:
            self.response = 'Thread not running'

if __name__ == '__main__':
    HOST, PORT = '', 5000
    with Server((HOST, PORT), Handler) as server:
        server.serve_forever()

client.py — здесь я вручную обернул сокет в файловые объекты:

import socket
import sys

HOST, PORT = 'localhost', 5000
data = ' '.join(sys.argv[1:])

with socket.socket() as sock:
    sock.connect((HOST, PORT))
    with (sock.makefile('r', encoding='utf8') as rfile,
          sock.makefile('w', encoding='utf8') as wfile
    ):
        wfile.write(data + '\n')
        wfile.flush()
        response = rfile.readline()

print(f'Sent:     {data}')
print(f'Received: {response}')

Выход:

C:\>test stop
Sent:     stop
Received: Thread not running


C:\>test blah
Sent:     blah
Received: ERROR: Unknown request


C:\>test start
Sent:     start
Received: ERROR: missing parameter


C:\>test start abc
Sent:     start abc
Received: Thread running


C:\>test start
Sent:     start
Received: Thread already running


C:\>test stop
Sent:     stop
Received: Thread stopped

Другие вопросы по теме