引入pom.xml

        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-stream-rabbit -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.1.3.RELEASE</version>
        </dependency>
  1. 配置client
/**
 * @author huangdeyao
 * @date 2019/7/2 12:50
 */
public interface StreamClient {
    /**
     * 同一个服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道
     */
    String STREAM_INPUT = "stream-input";
    String STREAM_OUTPUT = "stream-output";

    /**
     * 订阅消息
     *
     * @return
     */
    @Input(StreamClient.STREAM_INPUT)
    SubscribableChannel input();

    /**
     * 消息发送
     *
     * @return
     */
    @Output(StreamClient.STREAM_OUTPUT)
    MessageChannel output();
}
  1. 接收端
/**
 * @author huangdeyao
 * @date 2019/7/2 13:04
 */
@Component
@EnableBinding(StreamClient.class)
@Log4j2
public class StreamReceiver {

    @StreamListener(StreamClient.STREAM_INPUT)
    public void receive(Object message) {
        log.info("stream input receiver: {}", message);
    }
}
  1. 发送端
    @Autowired
    StreamClient streamClient;

    @GetMapping("/send")
    public String messageWithMQ(String message) {
        streamClient.output().send(MessageBuilder.withPayload(message).build());
        return "ok";
    }
  1. 遇到的错误
org.springframework.beans.factory.BeanDefinitionStoreException: Invalid bean definition with name 'stream_input' defined in com.dy.spring.client.StreamClient: bean definition with this name already exists - Root bean: class [null]; scope=; abstract=false; lazyInit=false; autowireMode=0; dependencyCheck=0; autowireCandidate=true; primary=false; factoryBeanName=com.dy.spring.client.StreamClient; factoryMethodName=input; initMethodName=null; destroyMethodName=null
    at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:68) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerOutputBindingTargetBeanDefinition(BindingBeanDefinitionRegistryUtils.java:55) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.lambda$registerBindingTargetBeanDefinitions$0(BindingBeanDefinitionRegistryUtils.java:92) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:410) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.util.ReflectionUtils.doWithMethods(ReflectionUtils.java:389) ~[spring-core-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.cloud.stream.binding.BindingBeanDefinitionRegistryUtils.registerBindingTargetBeanDefinitions(BindingBeanDefinitionRegistryUtils.java:82) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:44) ~[spring-cloud-stream-2.1.3.RELEASE.jar:2.1.3.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:364) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) ~[na:1.8.0_191]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:363) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:327) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:232) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:275) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:95) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:705) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:531) ~[spring-context-5.1.8.RELEASE.jar:5.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:311) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202) [spring-boot-2.1.6.RELEASE.jar:2.1.6.RELEASE]
    at com.dy.spring.SpringDomeApplication.main(SpringDomeApplication.java:13) [classes/:na]

Disconnected from the target VM, address: '127.0.0.1:57769', transport: 'socket'

Process finished with exit code 1
解决StreamClient 中不能通道名,使用不同的名字
修改
public interface StreamClient {
    /**
     * 同一个服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道
     */
    String STREAM_INPUT = "stream-input";
    String STREAM_OUTPUT = "stream-output";
    
...... 略
}
yml配置,互相指向成一个通道stream-output
spring:
  cloud:
    stream:
      bindings:
        stream-input:
          destination: stream-output
  1. 多实例接收多个消息解决,yml配置group
spring:
  rabbitmq:
    port: 5672
    addresses: 192.168.177.129
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        stream-input:
          destination: stream-output
          group: my-stream
  1. 接收后回调信息
StreamClient.class
    /**
     * 同一个服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道
     */
    String STREAM_INPUT_2 = "stream-input-2";
    String STREAM_OUTPUT_2 = "stream-output-2";
  /**
     * 订阅消息
     *
     * @return
     */
    @Input(StreamClient.STREAM_INPUT_2)
    SubscribableChannel input2();

    /**
     * 消息发送
     *
     * @return
     */
    @Output(StreamClient.STREAM_OUTPUT_2)
    MessageChannel output2();
StreamReceiver.class
    @StreamListener(StreamClient.STREAM_INPUT)
    @SendTo(StreamClient.STREAM_INPUT_2)
    public UserEntity process(UserEntity userEntity) {
        log.info("stream input receiver: {}", userEntity.toString());
        return userEntity;
    }

    @StreamListener(StreamClient.STREAM_INPUT_2)
    public void process2(UserEntity userEntity) {
        log.info("STREAM_INPUT_2  input receiver: {}", userEntity.toString());
    }
yml
spring:
  rabbitmq:
    port: 5672
    addresses: 192.168.177.129
    username: guest
    password: guest
  cloud:
    stream:
      bindings:
        stream-input:
          destination: stream-output
          group: my-stream
          content-type: application/json
        stream-input-2:
          destination: stream-output-2
          group: my-stream
          content-type: application/json

项目地址

reference

Spring Cloud Stream如何消费自己生产的消息?

Last modification:February 13th, 2020 at 03:53 pm
如果觉得我的文章对你有用,请随意赞赏