Привет, ребята, у меня проблема, к сожалению, я пытаюсь подключиться к Azure EventHub, но всегда получаю исключение, например:
System.Net.Sockets.SocketException (10060): попытка подключения не удалось, потому что подключенная сторона не ответила должным образом после период времени, или установленное соединение не удалось, поскольку подключено хост не ответил. в Microsoft.Azure.Amqp.Transport.TransportStream.EndRead(IAsyncResult асинхронный результат) в Microsoft.Azure.Amqp.Transport.TransportStream.<>c__DisplayClass22_0.b__1(IAsyncResult а) в System.Threading.Tasks.TaskFactory
1.FromAsyncCoreLogic(IAsyncResult iar, Func
2 endFunction, Action1 endAction, Task
1 обещание, логическое значение требует синхронизации) --- Конец трассировки стека из предыдущего расположения --- в System.Net.Security.SslStream.g__InternalFillHandshakeBufferAsync|189_0[TIOAdapter](TIOAdapter adap, ValueTask1 task, Int32 minSize) at System.Net.Security.SslStream.ReceiveBlobAsync[TIOAdapter](TIOAdapter adapter) at System.Net.Security.SslStream.ForceAuthenticationAsync[TIOAdapter](TIOAdapter adapter, Boolean receiveFirst, Byte[] reAuthenticationData, Boolean isApm) at System.Threading.Tasks.TaskToApm.End(IAsyncResult asyncResult) at System.Net.Security.SslStream.EndAuthenticateAsClient(IAsyncResult asyncResult) at Microsoft.Azure.Amqp.Transport.TlsTransport.HandleOpenComplete(IAsyncResult result, Boolean syncComplete) --- End of stack trace from previous location --- at Microsoft.Azure.Amqp.ExceptionDispatcher.Throw(Exception exception) at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result) at Microsoft.Azure.Amqp.AmqpObject.OpenAsyncResult.End(IAsyncResult result) at Microsoft.Azure.Amqp.AmqpObject.EndOpen(IAsyncResult result) at Microsoft.Azure.Amqp.Transport.TlsTransportInitiator.HandleTransportOpened(IAsyncResult result) at Microsoft.Azure.Amqp.Transport.TlsTransportInitiator.OnTransportOpened(IAsyncResult result) --- End of stack trace from previous location --- at Microsoft.Azure.Amqp.ExceptionDispatcher.Throw(Exception exception) at Microsoft.Azure.Amqp.AsyncResult.End[TAsyncResult](IAsyncResult result) at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.ConnectAsyncResult.End(IAsyncResult result) at Microsoft.Azure.Amqp.Transport.AmqpTransportInitiator.<>c.<ConnectAsync>b__17_1(IAsyncResult r) at System.Threading.Tasks.TaskFactory
1.FromAsyncCoreLogic(IAsyncResult iar, Func2 endFunction, Action
1 endAction, Task1 promise, Boolean requiresSynchronization) --- End of stack trace from previous location --- at Azure.Messaging.EventHubs.Amqp.AmqpConnectionScope.CreateAndOpenConnectionAsync(Version amqpVersion, Uri serviceEndpoint, Uri connectionEndpoint, EventHubsTransportType transportType, IWebProxy proxy, Int32 sendBufferSizeBytes, Int32 receiveBufferSizeBytes, RemoteCertificateValidationCallback certificateValidationCallback, String scopeIdentifier, TimeSpan timeout) at Microsoft.Azure.Amqp.FaultTolerantAmqpObject
1.OnCreateAsync(TimeSpan тайм-аут, CancellationToken, CancellationToken) в Microsoft.Azure.Amqp.Singleton1.GetOrCreateAsync(TimeSpan timeout, CancellationToken cancellationToken) at Microsoft.Azure.Amqp.Singleton
1.GetOrCreateAsync (время ожидания TimeSpan, CancellationToken (аннулированиеToken) в Azure.Messaging.EventHubs.Amqp.AmqpConnectionScope.OpenManagementLinkAsync(TimeSpan OperationTimeout, TimeSpan linkTimeout, CancellationToken CancellationToken) в Microsoft.Azure.Amqp.FaultTolerantAmqpObject1.OnCreateAsync(TimeSpan timeout, CancellationToken cancellationToken) at Microsoft.Azure.Amqp.Singleton
1.GetOrCreateAsync (время ожидания TimeSpan, CancellationToken (аннулированиеToken) в Microsoft.Azure.Amqp.Singleton`1.GetOrCreateAsync (время ожидания TimeSpan, CancellationToken (аннулированиеToken) в Azure.Messaging.EventHubs.Amqp.AmqpClient.GetPropertiesAsync(EventHubsRetryPolicy retryPolicy, CancellationToken, CancellationToken) в Azure.Messaging.EventHubs.Amqp.AmqpClient.GetPropertiesAsync(EventHubsRetryPolicy retryPolicy, CancellationToken, CancellationToken) в Azure.Messaging.EventHubs.EventHubConnection.GetPropertiesAsync(EventHubsRetryPolicy retryPolicy, CancellationToken, CancellationToken) в Azure.Messaging.EventHubs.EventHubConnection.GetPartitionIdsAsync(EventHubsRetryPolicy retryPolicy, CancellationToken, CancellationToken) в Azure.Messaging.EventHubs.Consumer.EventHubConsumerClient.GetPartitionIdsAsync (CancellationToken CancellationToken) в Program.Main() в C:\Users\f.daquila\RiderProjects\AuditLogRecevierSample\AuditLogRecevierSample\Program.cs:line 20 в Program.Main() в C:\Users\f.daquila\RiderProjects\AuditLogRecevierSample\AuditLogRecevierSample\Program.cs:line 41
Код, с которым я пытаюсь сделать вызов, таков:
class Program
{
private const string eventHubConnectionString = "<CONNECTION STRING>";
private const string consumerGroup = "<GROUP NAME>";
public static async Task Main()
{
try
{
await using var consumer = new EventHubConsumerClient(consumerGroup, new EventHubConnection(eventHubConnectionString));
var startingPosition = EventPosition.Earliest;
var partitionIds = await consumer.GetPartitionIdsAsync();
if (partitionIds is not null && partitionIds.Any())
{
var partitionId = partitionIds.First();
using var cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(45));
await foreach (var receivedEvent in consumer.ReadEventsFromPartitionAsync(partitionId, startingPosition, cancellationSource.Token))
{
var body = Encoding.UTF8.GetString(receivedEvent.Data.Body.ToArray());
Console.WriteLine(body);
}
}
}
catch (Exception e)
{
Console.WriteLine(e);
}
}
}
Я попытался посмотреть, есть ли проблемы с сетью, используя powershell, проверив соединение следующим образом:
и все вроде нормально. Что я делаю неправильно? Почему я не могу подключиться?
Огромное спасибо.
Наиболее распространенная проблема с подключением из-за постоянных тайм-аутов заключается в том, что порты, необходимые для AMQP через TCP (5671/5672), не открыты. Это часто оказывается основной причиной, даже когда Test-NetConnection
удается.
Изменение транспорта на AMQP через WebSockets обычно помогает, так как он будет использовать порт 443 и при необходимости может маршрутизироваться через прокси-сервер. Транспорт можно указать с помощью EventHubConsumerClientOptions
при создании потребителя:
var options = new EventHubConsumerClientOptions();
options.ConnectionOptions.TransportType = EventHubsTransportType.AmqpWebSockets;
await using var consumer = new EventHubConsumerClient(
"<< CONSUMER GROUP >>",
"<< CONNECTION STRING >>",
"<< EVENT HUB NAME >>",
options);
Если это не решит проблему, я бы посоветовал обратиться к руководству по устранению неполадок с концентраторами событий, чтобы узнать о дальнейших шагах.
Несколько других примечательных вещей о вашем фрагменте:
Нет причин явно создавать EventHubConnection
, если только вы не создаете несколько клиентов и не хотите заставить их использовать одно соединение.
Возврат от GetPartitionIdsAsync
никогда не будет null
и всегда будет хотя бы один участник. Невозможно создать концентратор событий без разделов.
Нет необходимости явно декодировать байты для тела события. Использование receivedEvent.Data.EventBody.ToString()
даст вам тот же результат.
Свойство EventData.Body
устарело и скрыто; он существует только для обратной совместимости. Мы рекомендуем использовать EventBody
, который предоставляет дополнительные возможности для работы с двоичными данными.
Большое спасибо за предложения. Будучи потребителем в корпоративной сети и за обратным прокси, я считаю, что вы правильно сконцентрировали проблему. Огромное спасибо