Как завершить поток, ожидающий возврата

Мне нужно завершить работающий поток таким образом, чтобы не вызвать ошибку или задержку. Проблема заключается в функции «lMsg := lMsgQueue.Get(FQueueGetTimeout);», где она будет ждать определенное время (обычно 5000 мс). Таким образом, если мне нужно вызвать внешнее завершение, мое приложение застрянет в ожидании завершения.

Как лучше всего прекратить его в середине процесса?

{ TConsumerThread }

constructor TConsumerThread.Create;
begin
  FreeOnTerminate := True;
  InitializeVars;
  inherited Create(True);
end;

procedure TConsumerThread.Execute;
var
  lMsgQueue: TAMQPMessageQueue;
  lMsg: TAMQPMessage;
  lStartTime: TDateTime;
begin
  lMsgQueue := TAMQPMessageQueue.Create;
  FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
  try
    try
      FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer');
      lStartTime := Now;
      repeat
        try

          try
            if not(FConnectionAMQP.IsOpen) then
            BEGIN
              FConnectionAMQP.Connect;
              FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
              FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer';
            END;
          except
            on E: Exception do
              Break;
          end;

          lMsg := lMsgQueue.Get(FQueueGetTimeout);
          if (lMsg = nil) and not(Terminated) then
          begin
            if Assigned(FChannelAMQPThread) then
            begin
              FConnectionAMQP.CloseChannel(FChannelAMQPThread);
              FChannelAMQPThread := nil;
            end;

            FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
            FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer');
          end;

          if not(Terminated) then
          begin
            try
              if not(FConnectionAMQP.IsOpen) then
                FConnectionAMQP.Connect;
            except
              on E: Exception do
                Break;
            end;
          end;

          if not(Terminated) then
          begin
            if ValidateFilter(lMsg) then
            begin
              FCorrelationID := lMsg.Header.PropertyList.CorrelationID.Value;

              FReceivedMessage := lMsg.Body.asString[TEncoding.ASCII];
              lMsg.Ack;
              lMsg.Free;
              Terminate;
            end
            else
            begin
              lMsg.Reject;
              lMsg.Free;

              if not(FTimeout = INFINITE) then
              begin
                if (MilliSecondsBetween(Now, lStartTime) >= (Int64(FTimeout))) then
                begin
                  FReceivedMessage := '';
                  Terminate;
                end;
              end;
            end;
          end
          else
          begin
            Terminate;
          end;
        except
          on E: Exception do
          begin
            if Assigned(lMsg) then
            begin
              lMsg.Free;
              lMsg := nil;
            end;
          end;
        end;
      until (Terminated);
    except
      on E: Exception do
      begin
        FReceivedMessage := '';

        if not(Terminated) then
          Terminate;
      end;
    end;
  finally
    lMsgQueue.Free;
  end;
end;

procedure TConsumerThread.TerminatedSet;
begin
  inherited;
  if Assigned(FChannelAMQPThread) then
  begin
    try
      if FConnectionAMQP.IsOpen then
        FConnectionAMQP.CloseChannel(FChannelAMQPThread);
    except
      on E: Exception do
    end;

    FChannelAMQPThread := nil;
  end;
end;

function TConsumerThread.ValidateFilter(pMsg: TAMQPMessage): Boolean;
begin
  Result := False;

  case FMsgFilter of
    fmsgNone:
      Result := True;
    fmsgMessageID:
      Result := (pMsg.Header.PropertyList.MessageID.Value = FFilterValue);
    fmsgCorrelationID:
      Result := (pMsg.Header.PropertyList.CorrelationID.Value = FFilterValue);
  end;
end;

procedure TConsumerThread.InitializeVars;
begin
  FConnectionAMQP := nil;
  FChannelAMQPThread := nil;
  FQueue := '';
  FTimeout := INFINITE;
  FQueueGetTimeout := 5000;
  FQueuePrefetchSize := 0;
  FQueuePrefetchCount := 10;
  FMsgFilter := fmsgNone;
  FFilterValue := '';
  FReceivedMessage := '';
end;

Чтобы проверить возвращенное сообщение или исключение, я использую функцию OnTerminate.

Кроме того, лучшая альтернатива в этом случае — сделать «FreeOnTerminate»?

Запускаю его приостановлено, так как перед запуском установил свойства (инициализированные в InitializeVars).

Это код из функции «Получить», я его не писал, но при необходимости могу отредактировать.

{$I AMQP.Options.inc}
unit AMQP.Classes;

interface

Uses
  SysUtils, Classes, SyncObjs, Generics.Collections,
  AMQP.Frame, AMQP.Message, AMQP.Method, AMQP.Types
  {$IfDef fpc}
  , AMQP.SyncObjs
  {$EndIf}
  ;

Type
  AMQPException = Class(Exception);
  AMQPTimeout  = class(AMQPException);

  TAMQPServerProperties = Class
  Strict Private
    FCapabilities : TStringList;
    FMechanisms   : TStringList;
    FLocales      : TStringList;
    FClusterName  : String;
    FCopyright    : String;
    FInformation  : String;
    FPlatform     : String;
    FProduct      : String;
    FVersion      : String;
    FKnownHosts   : String;
    FVersionMajor : Integer;
    FVersionMinor : Integer;
    FChannelMax   : Integer;
    FFrameMax     : Integer;
    FHeartbeat    : Integer;
  Public
    Property Capabilities         : TStringList read FCapabilities;
    Property Mechanisms           : TStringList read FMechanisms;
    Property Locales              : TStringList read FLocales;
    Property ClusterName          : String      read FClusterName;
    Property Copyright            : String      read FCopyright;
    Property Information          : String      read FInformation;
    Property &Platform            : String      read FPlatform;
    Property Product              : String      read FProduct;
    Property Version              : String      read FVersion;
    Property KnownHosts           : String      read FKnownHosts;
    Property ProtocolVersionMajor : Integer     read FVersionMajor;
    Property ProtocolVersionMinor : Integer     read FVersionMinor;
    Property ChannelMax           : Integer     read FChannelMax;
    Property FrameMax             : Integer     read FFrameMax;
    Property Heartbeat            : Integer     read FHeartbeat;

    Procedure ReadConnectionStart( AConnectionStart: TAMQPMethod );
    Procedure ReadConnectionTune( AConnectionTune: TAMQPMethod );
    Procedure ReadConnectionOpenOK( AConnectionOpenOK: TAMQPMethod );

    Constructor Create;
    Destructor Destroy; Override;
  End;

  TBlockingQueue<T> = Class
  Strict Protected
    FGuard     : {$IFDEF FPC}TRTLCriticalSection{$ELSE}TCriticalSection{$ENDIF};
    FCondition : TConditionVariableCS;
    FQueue     : TQueue<T>;
  Public
    Function Count: Integer; Virtual;
    Function Get(ATimeOut: LongWord): T; Virtual;
    Procedure Put( Item: T ); Virtual;

    Constructor Create; Virtual;
    Destructor Destroy; Override;
  End;

  TAMQPQueue = TBlockingQueue<TAMQPFrame>;

  TAMQPMessageQueue = TBlockingQueue<TAMQPMessage>;

implementation

{ TAMQPServerProperties }


constructor TAMQPServerProperties.Create;
begin
  FCapabilities := TStringList.Create;
  FMechanisms   := TStringList.Create;
  FLocales      := TStringList.Create;
  FMechanisms.StrictDelimiter := True;
  FMechanisms.Delimiter       := ' ';
  FLocales.StrictDelimiter    := True;
  FLocales.Delimiter          := ' ';
  FClusterName  := '';
  FCopyright    := '';
  FInformation  := '';
  FPlatform     := '';
  FProduct      := '';
  FVersion      := '';
  FKnownHosts   := '';
  FVersionMajor := 0;
  FVersionMinor := 0;
  FChannelMax   := 0;
  FFrameMax     := 0;
  FHeartbeat    := 0;
end;

Procedure TAMQPServerProperties.ReadConnectionStart( AConnectionStart: TAMQPMethod );
var
  ServerProperties: TFieldTable;
  ServerCapabilities: TFieldTable;
  Pair: TFieldValuePair;
begin
  FMechanisms.DelimitedText := AConnectionStart.Field['mechanisms'].AsLongString.Value;
  FLocales.DelimitedText    := AConnectionStart.Field['locales'].AsLongString.Value;
  ServerProperties          := AConnectionStart.Field['server-properties'].AsFieldTable;
  FVersionMajor             := AConnectionStart.Field['version-major'].AsShortShortUInt.Value;
  FVersionMinor             := AConnectionStart.Field['version-minor'].AsShortShortUInt.Value;
  FClusterName              := ServerProperties.Field['cluster_name'].AsShortString.Value;
  FCopyright                := ServerProperties.Field['copyright'].AsShortString.Value;
  FInformation              := ServerProperties.Field['information'].AsShortString.Value;
  FPlatform                 := ServerProperties.Field['platform'].AsShortString.Value;
  FProduct                  := ServerProperties.Field['product'].AsShortString.Value;
  FVersion                  := ServerProperties.Field['version'].AsShortString.Value;
  ServerCapabilities        := ServerProperties.Field['capabilities'].AsFieldTable;
  for Pair in ServerCapabilities do
    FCapabilities.Values[ Pair.Name.Value ] := Pair.Value.AsString('');
end;

Procedure TAMQPServerProperties.ReadConnectionTune( AConnectionTune: TAMQPMethod );
begin
  FChannelMax               := AConnectionTune.Field['channel-max'].AsShortUInt.Value;
  FFrameMax                 := AConnectionTune.Field['frame-max'].AsLongUInt.Value;
  FHeartbeat                := AConnectionTune.Field['heartbeat'].AsShortUInt.Value;
end;

Procedure TAMQPServerProperties.ReadConnectionOpenOK( AConnectionOpenOK: TAMQPMethod );
begin
  FKnownHosts               := AConnectionOpenOK.Field['known-hosts'].AsShortString.Value;
end;

destructor TAMQPServerProperties.Destroy;
begin
  FCapabilities.Free;
  FMechanisms.Free;
  FLocales.Free;
  inherited;
end;

{ TBlockingQueue<T> }

function TBlockingQueue<T>.Count: Integer;
begin
  {$IFDEF FPC}
  EnterCriticalSection(FGuard);
  {$ELSE}
  FGuard.Acquire;
  {$ENDIF}
  try
    Result := FQueue.Count;
  finally
    {$IFDEF FPC}
     LeaveCriticalSection(FGuard);
    {$ELSE}
    FGuard.Release;
    {$ENDIF}
  end;
end;

constructor TBlockingQueue<T>.Create;
begin
  inherited;
  {$IFDEF FPC}
  InitCriticalSection(FGuard);
  {$ELSE}
  FGuard     := TCriticalSection.Create;
  {$ENDIF}
  FCondition := TConditionVariableCS.Create;
  FQueue     := TQueue<T>.Create;
end;

destructor TBlockingQueue<T>.Destroy;
begin
  FQueue.Free;
  FQueue := nil;
  FCondition.Free;
  FCondition := nil;
  {$IFDEF FPC}
  DoneCriticalSection(FGuard);
  {$ELSE}
  FGuard.Free;
  FGuard := nil;
  {$ENDIF}
  inherited;
end;

function TBlockingQueue<T>.Get(ATimeOut: LongWord): T;
begin
  {$IFDEF FPC}
  EnterCriticalSection(FGuard);
  {$ELSE}
  FGuard.Acquire;
  {$ENDIF}
  try
    while FQueue.Count = 0 do
    begin
     {$IFDEF FPC}
      if FCondition.WaitForRTL(FGuard, ATimeOut) = wrTimeout then
     {$Else}
      if FCondition.WaitFor(FGuard, ATimeOut) = wrTimeout then
     {$EndIf}
       raise AMQPTimeout.Create('Timeout!');
    end;
    Result := FQueue.Dequeue
  finally
  {$IFDEF FPC}
   LeaveCriticalSection(FGuard);
  {$ELSE}
  FGuard.Release;
  {$ENDIF}
  end;
end;

procedure TBlockingQueue<T>.Put(Item: T);
begin
  {$IFDEF FPC}
  EnterCriticalSection(FGuard);
  {$ELSE}
  FGuard.Acquire;
  {$ENDIF}
  try
    FQueue.Enqueue( Item );
    FCondition.ReleaseAll;
  finally
    {$IFDEF FPC}
     LeaveCriticalSection(FGuard);
    {$ELSE}
    FGuard.Release;
    {$ENDIF}
  end;
end;

end.
Стоит ли изучать PHP в 2023-2024 годах?
Стоит ли изучать PHP в 2023-2024 годах?
Привет всем, сегодня я хочу высказать свои соображения по поводу вопроса, который я уже много раз получал в своем сообществе: "Стоит ли изучать PHP в...
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
Поведение ключевого слова "this" в стрелочной функции в сравнении с нормальной функцией
В JavaScript одним из самых запутанных понятий является поведение ключевого слова "this" в стрелочной и обычной функциях.
Приемы CSS-макетирования - floats и Flexbox
Приемы CSS-макетирования - floats и Flexbox
Здравствуйте, друзья-студенты! Готовы совершенствовать свои навыки веб-дизайна? Сегодня в нашем путешествии мы рассмотрим приемы CSS-верстки - в...
Тестирование функциональных ngrx-эффектов в Angular 16 с помощью Jest
В системе управления состояниями ngrx, совместимой с Angular 16, появились функциональные эффекты. Это здорово и делает код определенно легче для...
Концепция локализации и ее применение в приложениях React ⚡️
Концепция локализации и ее применение в приложениях React ⚡️
Локализация - это процесс адаптации приложения к различным языкам и культурным требованиям. Это позволяет пользователям получить опыт, соответствующий...
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
1
0
115
1
Перейти к ответу Данный вопрос помечен как решенный

Ответы 1

Ответ принят как подходящий

TBlockingQueue<T>.Get() ожидает сигнала TConditionVariableCS до указанного времени ожидания. Чтобы ускорить этот выход, вам придется сигнализировать ConditionVariable, даже если в очередь ничего не было добавлено. Если бы вы это сделали, вам пришлось бы обновить Get(), чтобы убедиться, что очередь не пуста, прежде чем удалять ее из очереди, и иметь где-то флаг, указывающий, что отмена является намеренной. Затем вы просто установите этот флаг и сигнализируете ConditionVariable, когда хотите отменить ожидание.

Но в вашем случае похоже, что ваш поток ставит в очередь указатели объектов и уже обрабатывает указатели nil, поэтому обновление Get() не требуется. При завершении потока просто поставьте в очередь указатель объекта nil в качестве флага, а затем поток сможет проверить свое свойство Terminated, когда получит указатель nil из очереди.

Что-то вроде этого:

// move the MsgQueue to be a member of the thread class
// instead of being a local variable of Execute...
private
  FMsgQueue: TAMQPMessageQueue;
...

constructor TConsumerThread.Create;
begin
  FMsgQueue := TAMQPMessageQueue.Create;
  ...
end;

destructor TConsumerThread.Destroy;
begin
  FMsgQueue.Free;
  ...
end;

procedure TConsumerThread.Execute;
var
  ...
begin
  ...
  lMsg := FMsgQueue.Get(FQueueGetTimeout);
  if lMsg = nil then
  begin
    if Terminated then Exit;
    ...
  end;
  ...
end;
    
procedure TConsumerThread.TerminatedSet;
begin
  inherited;
  FMsgQueue.Put(nil);
  ...
end;

Я создал процедуру, используя FMsgQueue.Put(nil), где я также изменил управляющую переменную FStopSignal. После lMsg := MsgQueue.Get(FQueueGetTimeout) я проверяю, нет ли (завершено) и (FStopSignal), затем завершаю;

missingNO 26.07.2024 13:09

Почему? Terminated уже сам по себе служит этой цели. FStopSignal является излишним. Если только вы не хотите остановить очередь (и, возможно, перезапустить ее позже), не завершая весь поток. Но это не тот вопрос, который вы задали.

Remy Lebeau 26.07.2024 16:46

Другие вопросы по теме