Как настроить адаптеры Spring Integration Collaborating Channel с аннотациями Java?

Я видел другие сообщения, связанные с моим вопросом, но ни один из ответов не помог мне решить мою проблему.

Я пытался следовать примеру здесь: https://github.com/garyrussell/spring-integration-samples/tree/master/intermediate/tcp-client-server-multiplex

У меня есть Spring Rest API, который принимает запросы, эти запросы меняются на XML, а затем я отправляю их другому приложению, которое принимает запрос TCP.

Использование TcpOutboundGateway и TcpInboundGateway работало нормально, но было медленным, поэтому я хотел ускорить его с помощью совместных адаптеров каналов и мультиплексирования.

Идея (насколько я понимаю) состоит в том, чтобы отправить запрос через шлюз, этот запрос передается агрегатору, этот запрос также отправляется на tcp-сервер другого приложения с помощью TcpSendingMessageHandler. Затем TcpReceivingChannelAdapter прослушивает ответы, эти ответы отправляются агрегатору, где они соотносятся с их запросом (bc заголовка CORRELATION_ID), а затем отправляются в канал преобразования, который превращает байты в строку.

Очевидно, мое понимание неверно, потому что я не вижу, как ответ возвращается на шлюз, и он не работает.

Я вижу, что Socket открывается, но он закрывается сразу после отправки сообщения, поэтому десериализатор возвращает ошибку EOF: null.

  1. Я неправильно настроил TcpReceivingChannelAdapter?

  2. Как ответ возвращается на шлюз?

  3. Следует ли использовать Future в качестве ответа шлюза?

Конфигурация TCP:

@EnableIntegration
@IntegrationComponentScan
@Configuration
public class TcpMultiPlexConfig implements ApplicationListener<TcpConnectionEvent> {

    protected final static Logger LOGGER = LoggerFactory.getLogger(TcpMultiPlexConfig.class);

    @Value("${engine.port}")
    private int port;// = 55001;
    @Value("${engine.address}")
    private String ipAddress;// = "192.168.1.1";
    @Value("${engine.timeout}")
    private int timeout;

    @Override
    public void onApplicationEvent(TcpConnectionEvent tcpEvent) {
        TcpConnection source = (TcpConnection) tcpEvent.getSource();
        if (tcpEvent instanceof TcpConnectionOpenEvent) {
            LOGGER.info("********* Socket Opened " + source.getConnectionId());
        } else if (tcpEvent instanceof TcpConnectionCloseEvent) {
            LOGGER.info("*********** Socket Closed " + source.getConnectionId());
        }
    }

    @MessagingGateway(defaultRequestChannel = "input")
    public interface MultiPlexGateway {

        String send(@Payload String in, @Header("CORRELATION_ID") String transactionId);

    }
    // TODO the request and response are being put together
    @Bean
    @ServiceActivator(inputChannel = "input")
    public BridgeHandler bridge() {
        BridgeHandler bridge = new BridgeHandler();
        bridge.setOutputChannelName("toAggregatorClient");
        bridge.setOrder(1);
        return bridge;
    }

    @Bean
    public PublishSubscribeChannel input() {
        return new PublishSubscribeChannel();
    }

    @Bean
    public DirectChannel toAggregatorClient() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel noResponseChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel toTransformerClient() {
        return new DirectChannel();
    }

    @Bean
    public TcpReceivingChannelAdapter inAdapterClient() {
        TcpReceivingChannelAdapter receivingAdapter = new TcpReceivingChannelAdapter();
        receivingAdapter.setConnectionFactory(clientConnectionFactory());
        receivingAdapter.setOutputChannel(toAggregatorClient());
        receivingAdapter.setClientMode(true);
        return receivingAdapter;
    }


    @Bean
    @ServiceActivator(inputChannel = "input")
    public TcpSendingMessageHandler outAdapterClient() {
        TcpSendingMessageHandler outAdapter = new TcpSendingMessageHandler();
        outAdapter.setOrder(2);
        outAdapter.setConnectionFactory(clientConnectionFactory());
        outAdapter.setClientMode(true);
        return outAdapter;
    }

    @Bean(name  = "clientCFMP")
    public AbstractClientConnectionFactory clientConnectionFactory() {
        TcpNetClientConnectionFactory tcp = new TcpNetClientConnectionFactory(this.ipAddress , this.port);
        tcp.setSerializer(new DefaultSerializer()); // out
//      byte delimeter = "\n".getBytes()[0];
//      ElasticByteArrayRawSingleTerminatorSerializer deserializer = new ElasticByteArrayRawSingleTerminatorSerializer(delimeter);
//      DefaultDeserializer deserializer = new DefaultDeserializer();
        MyDefaultDeserializer deserializer = new MyDefaultDeserializer();
        tcp.setDeserializer(deserializer);

        tcp.setSoTimeout(timeout);
        tcp.setSingleUse(false);
        MapMessageConverter mc = new MapMessageConverter();
        mc.setHeaderNames("CORRELATION_ID");
        tcp.setMapper(new MessageConvertingTcpMessageMapper(mc));

        return tcp;
    }


    @MessageEndpoint
    public static class MyConverters {

        @Transformer(inputChannel = "toTransformerClient", outputChannel = "resultToString")
        public byte[] getResponse(MessageGroup payload) {
//          byte[] result = null;
            List<Message<?>>list = new ArrayList<>(payload.getMessages());
            byte[] result = (byte[]) list.get(1).getPayload();
//          LOGGER.info(result);
            return result;
        }

        @Transformer(inputChannel = "resultToString")
        public String convertResult(byte[] bytes) {
            String result = new String(bytes);
            LOGGER.info("*********** RESULT => " + result);
            return result;
        }

        @ServiceActivator(inputChannel = "noResponseChannel")
        public MessageTimeoutException  noResponse(String input) {
            throw new MessageTimeoutException("****** No response received for => " + input);
        }

    }



    @Bean
    @ServiceActivator(inputChannel = "toAggregatorClient", outputChannel = "toTransformerClient")
    public FactoryBean<MessageHandler>  aggregatorFactoryBean() {
        AggregatorFactoryBean  afb = new AggregatorFactoryBean ();
        afb.setExpireGroupsUponCompletion(true);
        afb.setExpireGroupsUponTimeout(true);
        afb.setGroupTimeoutExpression(new ValueExpression<>(this.timeout));
        afb.setCorrelationStrategy(new HeaderAttributeCorrelationStrategy("CORRELATION_ID"));
        afb.setReleaseStrategy(new MessageCountReleaseStrategy(2));
        afb.setProcessorBean(new DefaultAggregatingMessageGroupProcessor());
        afb.setSendPartialResultOnExpiry(false);
        afb.setMessageStore(new SimpleMessageStore());
        afb.setDiscardChannel(noResponseChannel());
        return afb;
    }

Сервис, вызывающий шлюз:

@Service
public class MultiPlexGatewayTransmission <T extends EngineData> extends AbstractMultiPlexEngineTransmission {

    public MultiPlexGatewayTransmission(MultiPlexGateway gateway) {
        super(gateway);
    }

    @Override
    public T request(EngineData request, Class<? extends EngineData> clazz) {
        String response = gateway.send(JaxbUtils.marshall(request), request.getApi().getMessageId());
        gateway.send(JaxbUtils.marshall(request), request.getApi().getMessageId());
        if (response == null || response.isEmpty()) {
            return null;
        }

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MPGateway response ::: " + response.trim());
        }

        @SuppressWarnings("unchecked")
        T clientResponse = (T) JaxbUtils.unmarshall(response, clazz);
        if (LOGGER.isDebugEnabled()) {
//          LOGGER.debug("*** Unmarshall response ::: " + clientResponse);
        }
        return clientResponse;
    }

Прецедент:

@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("test")
public class ITGetClientsTest extends AbstractEngineTest {

    private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());

//  @Autowired
//  private GatewayTransmission<ClientsResponse> transmission;

    @Autowired
    private MultiPlexGatewayTransmission<ClientsResponse> transmission;

    @Test
    public void testGetClients() {
        LOGGER.info("Gateway test testGetClients... ");

        Api api = new Api();
        api.setIp("192.168.1.1");
        api.setMessageId(UUID.randomUUID().toString());
        api.setVersion("1.0");      
        api.setUserToken(token);

        ClientsRequest request = new ClientsRequest();
        request.setApi(api);

        ClientsResponse response = (ClientsResponse) transmission.request(request, ClientsResponse.class);
        Assert.assertTrue(response != null);
        Assert.assertTrue(!response.getClient().isEmpty());

        LOGGER.info(Arrays.deepToString(response.getClient().toArray()));
    }



}
Пользовательский скаляр GraphQL
Пользовательский скаляр GraphQL
Листовые узлы системы типов GraphQL называются скалярами. Достигнув скалярного типа, невозможно спуститься дальше по иерархии типов. Скалярный тип...
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
Как вычислять биты и понимать побитовые операторы в Java - объяснение с примерами
В компьютерном программировании биты играют важнейшую роль в представлении и манипулировании данными на двоичном уровне. Побитовые операции...
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Поднятие тревоги для долго выполняющихся методов в Spring Boot
Приходилось ли вам сталкиваться с требованиями, в которых вас могли попросить поднять тревогу или выдать ошибку, когда метод Java занимает больше...
Полный курс Java для разработчиков веб-сайтов и приложений
Полный курс Java для разработчиков веб-сайтов и приложений
Получите сертификат Java Web и Application Developer, используя наш курс.
0
0
396
1

Ответы 1

Я не рассматривал ваш код подробно; уже поздно и выходные, но см. этот ответ для более простого метода использования идентификаторов входящих / исходящих соединений для корреляции запроса / ответа.

Другие вопросы по теме