Мне нужно завершить работающий поток таким образом, чтобы не вызвать ошибку или задержку. Проблема заключается в функции «lMsg := lMsgQueue.Get(FQueueGetTimeout);», где она будет ждать определенное время (обычно 5000 мс). Таким образом, если мне нужно вызвать внешнее завершение, мое приложение застрянет в ожидании завершения.
Как лучше всего прекратить его в середине процесса?
{ TConsumerThread }
constructor TConsumerThread.Create;
begin
FreeOnTerminate := True;
InitializeVars;
inherited Create(True);
end;
procedure TConsumerThread.Execute;
var
lMsgQueue: TAMQPMessageQueue;
lMsg: TAMQPMessage;
lStartTime: TDateTime;
begin
lMsgQueue := TAMQPMessageQueue.Create;
FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
try
try
FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer');
lStartTime := Now;
repeat
try
try
if not(FConnectionAMQP.IsOpen) then
BEGIN
FConnectionAMQP.Connect;
FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer';
END;
except
on E: Exception do
Break;
end;
lMsg := lMsgQueue.Get(FQueueGetTimeout);
if (lMsg = nil) and not(Terminated) then
begin
if Assigned(FChannelAMQPThread) then
begin
FConnectionAMQP.CloseChannel(FChannelAMQPThread);
FChannelAMQPThread := nil;
end;
FChannelAMQPThread := FConnectionAMQP.OpenChannel(FQueuePrefetchSize, FQueuePrefetchCount);
FChannelAMQPThread.BasicConsume(lMsgQueue, FQueue, 'Consumer');
end;
if not(Terminated) then
begin
try
if not(FConnectionAMQP.IsOpen) then
FConnectionAMQP.Connect;
except
on E: Exception do
Break;
end;
end;
if not(Terminated) then
begin
if ValidateFilter(lMsg) then
begin
FCorrelationID := lMsg.Header.PropertyList.CorrelationID.Value;
FReceivedMessage := lMsg.Body.asString[TEncoding.ASCII];
lMsg.Ack;
lMsg.Free;
Terminate;
end
else
begin
lMsg.Reject;
lMsg.Free;
if not(FTimeout = INFINITE) then
begin
if (MilliSecondsBetween(Now, lStartTime) >= (Int64(FTimeout))) then
begin
FReceivedMessage := '';
Terminate;
end;
end;
end;
end
else
begin
Terminate;
end;
except
on E: Exception do
begin
if Assigned(lMsg) then
begin
lMsg.Free;
lMsg := nil;
end;
end;
end;
until (Terminated);
except
on E: Exception do
begin
FReceivedMessage := '';
if not(Terminated) then
Terminate;
end;
end;
finally
lMsgQueue.Free;
end;
end;
procedure TConsumerThread.TerminatedSet;
begin
inherited;
if Assigned(FChannelAMQPThread) then
begin
try
if FConnectionAMQP.IsOpen then
FConnectionAMQP.CloseChannel(FChannelAMQPThread);
except
on E: Exception do
end;
FChannelAMQPThread := nil;
end;
end;
function TConsumerThread.ValidateFilter(pMsg: TAMQPMessage): Boolean;
begin
Result := False;
case FMsgFilter of
fmsgNone:
Result := True;
fmsgMessageID:
Result := (pMsg.Header.PropertyList.MessageID.Value = FFilterValue);
fmsgCorrelationID:
Result := (pMsg.Header.PropertyList.CorrelationID.Value = FFilterValue);
end;
end;
procedure TConsumerThread.InitializeVars;
begin
FConnectionAMQP := nil;
FChannelAMQPThread := nil;
FQueue := '';
FTimeout := INFINITE;
FQueueGetTimeout := 5000;
FQueuePrefetchSize := 0;
FQueuePrefetchCount := 10;
FMsgFilter := fmsgNone;
FFilterValue := '';
FReceivedMessage := '';
end;
Чтобы проверить возвращенное сообщение или исключение, я использую функцию OnTerminate.
Кроме того, лучшая альтернатива в этом случае — сделать «FreeOnTerminate»?
Запускаю его приостановлено, так как перед запуском установил свойства (инициализированные в InitializeVars).
Это код из функции «Получить», я его не писал, но при необходимости могу отредактировать.
{$I AMQP.Options.inc}
unit AMQP.Classes;
interface
Uses
SysUtils, Classes, SyncObjs, Generics.Collections,
AMQP.Frame, AMQP.Message, AMQP.Method, AMQP.Types
{$IfDef fpc}
, AMQP.SyncObjs
{$EndIf}
;
Type
AMQPException = Class(Exception);
AMQPTimeout = class(AMQPException);
TAMQPServerProperties = Class
Strict Private
FCapabilities : TStringList;
FMechanisms : TStringList;
FLocales : TStringList;
FClusterName : String;
FCopyright : String;
FInformation : String;
FPlatform : String;
FProduct : String;
FVersion : String;
FKnownHosts : String;
FVersionMajor : Integer;
FVersionMinor : Integer;
FChannelMax : Integer;
FFrameMax : Integer;
FHeartbeat : Integer;
Public
Property Capabilities : TStringList read FCapabilities;
Property Mechanisms : TStringList read FMechanisms;
Property Locales : TStringList read FLocales;
Property ClusterName : String read FClusterName;
Property Copyright : String read FCopyright;
Property Information : String read FInformation;
Property &Platform : String read FPlatform;
Property Product : String read FProduct;
Property Version : String read FVersion;
Property KnownHosts : String read FKnownHosts;
Property ProtocolVersionMajor : Integer read FVersionMajor;
Property ProtocolVersionMinor : Integer read FVersionMinor;
Property ChannelMax : Integer read FChannelMax;
Property FrameMax : Integer read FFrameMax;
Property Heartbeat : Integer read FHeartbeat;
Procedure ReadConnectionStart( AConnectionStart: TAMQPMethod );
Procedure ReadConnectionTune( AConnectionTune: TAMQPMethod );
Procedure ReadConnectionOpenOK( AConnectionOpenOK: TAMQPMethod );
Constructor Create;
Destructor Destroy; Override;
End;
TBlockingQueue<T> = Class
Strict Protected
FGuard : {$IFDEF FPC}TRTLCriticalSection{$ELSE}TCriticalSection{$ENDIF};
FCondition : TConditionVariableCS;
FQueue : TQueue<T>;
Public
Function Count: Integer; Virtual;
Function Get(ATimeOut: LongWord): T; Virtual;
Procedure Put( Item: T ); Virtual;
Constructor Create; Virtual;
Destructor Destroy; Override;
End;
TAMQPQueue = TBlockingQueue<TAMQPFrame>;
TAMQPMessageQueue = TBlockingQueue<TAMQPMessage>;
implementation
{ TAMQPServerProperties }
constructor TAMQPServerProperties.Create;
begin
FCapabilities := TStringList.Create;
FMechanisms := TStringList.Create;
FLocales := TStringList.Create;
FMechanisms.StrictDelimiter := True;
FMechanisms.Delimiter := ' ';
FLocales.StrictDelimiter := True;
FLocales.Delimiter := ' ';
FClusterName := '';
FCopyright := '';
FInformation := '';
FPlatform := '';
FProduct := '';
FVersion := '';
FKnownHosts := '';
FVersionMajor := 0;
FVersionMinor := 0;
FChannelMax := 0;
FFrameMax := 0;
FHeartbeat := 0;
end;
Procedure TAMQPServerProperties.ReadConnectionStart( AConnectionStart: TAMQPMethod );
var
ServerProperties: TFieldTable;
ServerCapabilities: TFieldTable;
Pair: TFieldValuePair;
begin
FMechanisms.DelimitedText := AConnectionStart.Field['mechanisms'].AsLongString.Value;
FLocales.DelimitedText := AConnectionStart.Field['locales'].AsLongString.Value;
ServerProperties := AConnectionStart.Field['server-properties'].AsFieldTable;
FVersionMajor := AConnectionStart.Field['version-major'].AsShortShortUInt.Value;
FVersionMinor := AConnectionStart.Field['version-minor'].AsShortShortUInt.Value;
FClusterName := ServerProperties.Field['cluster_name'].AsShortString.Value;
FCopyright := ServerProperties.Field['copyright'].AsShortString.Value;
FInformation := ServerProperties.Field['information'].AsShortString.Value;
FPlatform := ServerProperties.Field['platform'].AsShortString.Value;
FProduct := ServerProperties.Field['product'].AsShortString.Value;
FVersion := ServerProperties.Field['version'].AsShortString.Value;
ServerCapabilities := ServerProperties.Field['capabilities'].AsFieldTable;
for Pair in ServerCapabilities do
FCapabilities.Values[ Pair.Name.Value ] := Pair.Value.AsString('');
end;
Procedure TAMQPServerProperties.ReadConnectionTune( AConnectionTune: TAMQPMethod );
begin
FChannelMax := AConnectionTune.Field['channel-max'].AsShortUInt.Value;
FFrameMax := AConnectionTune.Field['frame-max'].AsLongUInt.Value;
FHeartbeat := AConnectionTune.Field['heartbeat'].AsShortUInt.Value;
end;
Procedure TAMQPServerProperties.ReadConnectionOpenOK( AConnectionOpenOK: TAMQPMethod );
begin
FKnownHosts := AConnectionOpenOK.Field['known-hosts'].AsShortString.Value;
end;
destructor TAMQPServerProperties.Destroy;
begin
FCapabilities.Free;
FMechanisms.Free;
FLocales.Free;
inherited;
end;
{ TBlockingQueue<T> }
function TBlockingQueue<T>.Count: Integer;
begin
{$IFDEF FPC}
EnterCriticalSection(FGuard);
{$ELSE}
FGuard.Acquire;
{$ENDIF}
try
Result := FQueue.Count;
finally
{$IFDEF FPC}
LeaveCriticalSection(FGuard);
{$ELSE}
FGuard.Release;
{$ENDIF}
end;
end;
constructor TBlockingQueue<T>.Create;
begin
inherited;
{$IFDEF FPC}
InitCriticalSection(FGuard);
{$ELSE}
FGuard := TCriticalSection.Create;
{$ENDIF}
FCondition := TConditionVariableCS.Create;
FQueue := TQueue<T>.Create;
end;
destructor TBlockingQueue<T>.Destroy;
begin
FQueue.Free;
FQueue := nil;
FCondition.Free;
FCondition := nil;
{$IFDEF FPC}
DoneCriticalSection(FGuard);
{$ELSE}
FGuard.Free;
FGuard := nil;
{$ENDIF}
inherited;
end;
function TBlockingQueue<T>.Get(ATimeOut: LongWord): T;
begin
{$IFDEF FPC}
EnterCriticalSection(FGuard);
{$ELSE}
FGuard.Acquire;
{$ENDIF}
try
while FQueue.Count = 0 do
begin
{$IFDEF FPC}
if FCondition.WaitForRTL(FGuard, ATimeOut) = wrTimeout then
{$Else}
if FCondition.WaitFor(FGuard, ATimeOut) = wrTimeout then
{$EndIf}
raise AMQPTimeout.Create('Timeout!');
end;
Result := FQueue.Dequeue
finally
{$IFDEF FPC}
LeaveCriticalSection(FGuard);
{$ELSE}
FGuard.Release;
{$ENDIF}
end;
end;
procedure TBlockingQueue<T>.Put(Item: T);
begin
{$IFDEF FPC}
EnterCriticalSection(FGuard);
{$ELSE}
FGuard.Acquire;
{$ENDIF}
try
FQueue.Enqueue( Item );
FCondition.ReleaseAll;
finally
{$IFDEF FPC}
LeaveCriticalSection(FGuard);
{$ELSE}
FGuard.Release;
{$ENDIF}
end;
end;
end.
TBlockingQueue<T>.Get()
ожидает сигнала TConditionVariableCS
до указанного времени ожидания. Чтобы ускорить этот выход, вам придется сигнализировать ConditionVariable, даже если в очередь ничего не было добавлено. Если бы вы это сделали, вам пришлось бы обновить Get()
, чтобы убедиться, что очередь не пуста, прежде чем удалять ее из очереди, и иметь где-то флаг, указывающий, что отмена является намеренной. Затем вы просто установите этот флаг и сигнализируете ConditionVariable, когда хотите отменить ожидание.
Но в вашем случае похоже, что ваш поток ставит в очередь указатели объектов и уже обрабатывает указатели nil
, поэтому обновление Get()
не требуется. При завершении потока просто поставьте в очередь указатель объекта nil
в качестве флага, а затем поток сможет проверить свое свойство Terminated
, когда получит указатель nil
из очереди.
Что-то вроде этого:
// move the MsgQueue to be a member of the thread class
// instead of being a local variable of Execute...
private
FMsgQueue: TAMQPMessageQueue;
...
constructor TConsumerThread.Create;
begin
FMsgQueue := TAMQPMessageQueue.Create;
...
end;
destructor TConsumerThread.Destroy;
begin
FMsgQueue.Free;
...
end;
procedure TConsumerThread.Execute;
var
...
begin
...
lMsg := FMsgQueue.Get(FQueueGetTimeout);
if lMsg = nil then
begin
if Terminated then Exit;
...
end;
...
end;
procedure TConsumerThread.TerminatedSet;
begin
inherited;
FMsgQueue.Put(nil);
...
end;
Почему? Terminated
уже сам по себе служит этой цели. FStopSignal
является излишним. Если только вы не хотите остановить очередь (и, возможно, перезапустить ее позже), не завершая весь поток. Но это не тот вопрос, который вы задали.
Я создал процедуру, используя FMsgQueue.Put(nil), где я также изменил управляющую переменную FStopSignal. После lMsg := MsgQueue.Get(FQueueGetTimeout) я проверяю, нет ли (завершено) и (FStopSignal), затем завершаю;