求助,AMQ嵌入式服务器在客户端断开的时候异常,怎么处理?

avatar
public final class EmbeddedBroker {
 
    public static void main(String[] args) {
        final BrokerService broker = new BrokerService();
        try {
            // 配置broker
            broker.setBrokerName("what21.com");
            // 增加连接地址
            TransportConnector connector = new TransportConnector();
            connector.setUri(new URI("tcp://localhost:61616"));
            broker.addConnector(connector);
            // 增加连接地址
            broker.addConnector("tcp://localhost:61617");
            // 是否使用JMX
            broker.setUseJmx(true);
            broker.setDataDirectory("d:/data/activemq-data");
            broker.setUseShutdownHook(true);
            // broker.setPlugins(new BrokerPlugin[]{new JaasAuthenticationPlugin()});
            broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
            broker.start();
            System.out.println("JMS broker started ...");
        } catch (IOException e) {
            e.printStackTrace();
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 关闭
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                try {
                    broker.stop();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.exit(0);
            }
        });
        
    }
}

异常信息

org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: tcp://127.0.0.1:55266
	at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.doOnewaySend(MQTTInactivityMonitor.java:183)
	at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.oneway(MQTTInactivityMonitor.java:174)
	at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
	at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1486)
	at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:971)
	at org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:927)
	at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:196)
	at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
	at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onCommand(MQTTInactivityMonitor.java:162)
	at org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToActiveMQ(MQTTTransportFilter.java:106)
	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToActiveMQ(MQTTProtocolConverter.java:181)
	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onTransportError(MQTTProtocolConverter.java:676)
	at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onException(MQTTInactivityMonitor.java:194)
	at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
	at org.apache.activemq.transport.mqtt.MQTTTransportFilter.onException(MQTTTransportFilter.java:207)
	at org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96)
	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:219)
	at java.lang.Thread.run(Thread.java:748)
15:26:39.521 [ActiveMQ Transport: tcp:///127.0.0.1:55266@1883] DEBUG org.apache.activemq.broker.TransportConnection.Transport - Transport Connection to: tcp://127.0.0.1:55266 failed: java.net.SocketException: Connection reset
java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
	at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634)
	at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59)
	at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619)
	at java.io.DataInputStream.readByte(DataInputStream.java:265)
	at org.apache.activemq.transport.mqtt.MQTTWireFormat.unmarshal(MQTTWireFormat.java:86)
	at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
	at java.lang.Thread.run(Thread.java:748)

3 条评论
avatar
tomoya92 发布于 6 个月前
  0  

把错误信息也帖出来吧

avatar
wwhai 发布于 6 个月前
  0  

@tomoya92

org.apache.activemq.transport.InactivityIOException: Cannot send, channel has already failed: tcp://127.0.0.1:55266
	at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.doOnewaySend(MQTTInactivityMonitor.java:183)
	at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.oneway(MQTTInactivityMonitor.java:174)
	at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
	at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1486)
	at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:971)
	at org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:927)
	at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:196)
	at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45)
	at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onCommand(MQTTInactivityMonitor.java:162)
	at org.apache.activemq.transport.mqtt.MQTTTransportFilter.sendToActiveMQ(MQTTTransportFilter.java:106)
	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.sendToActiveMQ(MQTTProtocolConverter.java:181)
	at org.apache.activemq.transport.mqtt.MQTTProtocolConverter.onTransportError(MQTTProtocolConverter.java:676)
	at org.apache.activemq.transport.mqtt.MQTTInactivityMonitor.onException(MQTTInactivityMonitor.java:194)
	at org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:114)
	at org.apache.activemq.transport.mqtt.MQTTTransportFilter.onException(MQTTTransportFilter.java:207)
	at org.apache.activemq.transport.TransportSupport.onException(TransportSupport.java:96)
	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:219)
	at java.lang.Thread.run(Thread.java:748)
15:26:39.521 [ActiveMQ Transport: tcp:///127.0.0.1:55266@1883] DEBUG org.apache.activemq.broker.TransportConnection.Transport - Transport Connection to: tcp://127.0.0.1:55266 failed: java.net.SocketException: Connection reset
java.net.SocketException: Connection reset
	at java.net.SocketInputStream.read(SocketInputStream.java:210)
	at java.net.SocketInputStream.read(SocketInputStream.java:141)
	at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
	at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634)
	at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59)
	at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619)
	at java.io.DataInputStream.readByte(DataInputStream.java:265)
	at org.apache.activemq.transport.mqtt.MQTTWireFormat.unmarshal(MQTTWireFormat.java:86)
	at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
	at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
	at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
	at java.lang.Thread.run(Thread.java:748)
avatar
tomoya92 发布于 6 个月前
  0  

我尝试着写了一个hello world,发送跟接收都没有问题

Sender

public class Sender {

  public static void main(String[] args) throws JMSException, InterruptedException {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
        ActiveMQConnectionFactory.DEFAULT_USER,
        ActiveMQConnectionFactory.DEFAULT_PASSWORD,
        "tcp://localhost:61616"
    );

    Connection connection = connectionFactory.createConnection();
    connection.start();

    // 如果session不支持事件,就是FALSE,支持事务就是true
    Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("first");

    MessageProducer producer = session.createProducer(null);

    for (int i = 0; i < 100; i++) {
      TextMessage msg = session.createTextMessage("我是消息内容" + i);
      producer.send(destination, msg);
      TimeUnit.SECONDS.sleep(1);
    }

    connection.close();
  }
}

Receiver

public class Receiver {

  public static void main(String[] args) throws JMSException {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
        ActiveMQConnectionFactory.DEFAULT_USER,
        ActiveMQConnectionFactory.DEFAULT_PASSWORD,
        "tcp://localhost:61616"
    );

    Connection connection = connectionFactory.createConnection();
    connection.start();

    Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);

    Destination destination = session.createQueue("first");
    MessageConsumer consumer = session.createConsumer(destination);

    while(true) {
      TextMessage msg = (TextMessage) consumer.receive();
      System.out.println("消费数据:" + msg.getText());
    }
  }
}
添加一条评论 请尽量发布对他人有帮助的评论

登录后可发布评论

登录 | Github登录