异步处理基础概念
在接收设备数据之前,需要先来熟悉一些基本的概念,方便理解接收设备数据的基本思路
同步和异步
同步和异步的概念:
- 同步(Background Synchronous)是指任务在后台进行处理,但其他任务需要等待任务完成后才能执行
- 异步(Asynchronous)是指任务的提交和执行是相互独立的,任务的执行不会阻塞程序的继续执行
- 同步请求下,任务1完成后才能执行任务2,任务2需要等待任务1的完成。这种顺序执行的方式称为同步
- 异步请求下,任务1和任务2可以并行执行,彼此之间相互独立,不需要等待对方的完成。这种并行执行的方式称为异步
单线程/多线程:决定了有多少人”同时干活”
同步/异步:决定了是否需要”一直等着”
异步的优缺点:
消息队列的基本概念
知道了同步和异步的区别之后,再来理解一下消息队列的概念,**消息队列都是以异步的方式进行请求的**
- broker:负责接收、存储和分发消息的中间件组件,实现类发送者和接收者之间的解耦和异步通信
- 生产者:负责将消息发送到消息队列中
- 消费者:负责从消息队列中获取消息并进行处理
- 队列:负责存储消息
- topic:消息的分类
在IOT中数据流转是这样的,如下图
- 生产者:设备负责将消息发送到IOT中(队列)
- 每个产品可以绑定不同的topic来对消息进行分类,比如有手表topic、烟雾topic
- IOT本身相当于是一个队列
- 消费者可以从指定的topic中获取数据
- 如果有多个消费者都要接收同一类消息,可以设置多个消费者,称为消费者组
什么是AMQP
现在已经清楚了消息队列的基础概念,在IOT中是用AMQP来接收和处理消息的
AMQP全称Advanced Message Queuing Protocol,是一种网络协议,用于在应用程序之间传递消息。它是一种开放标准的消息传递协议,可以在不同的系统之间实现可靠、安全、高效的消息传递
AMQP协议的实现包括多种消息队列软件,例如RabbitMQ、Apache ActiveMQ、Apache Qpid等。这些软件提供了可靠、高效的消息传递服务,广泛应用于分布式系统、云计算、物联网等领域
其中的RabbitMQ在微服务课程中会详细、重点讲解

设备数据转发
在IOT官方文档中,已经提供了对应的数据转发的解决方案,如下链接:AMQP转发
官网Java SDK接入
创建数据转发规则
设备接入物联网平台后,可以通过自定义Topic或产品模型将数据上报给平台。在控制台上设置订阅后,平台会将包括设备生命周期变更、属性上报、消息上报及状态变更等信息,通过订阅推送的方式转发到您指定的服务器

- 订阅(AMQP): 用户可以通过控制台或调用API创建订阅,以获取设备的变更信息,如设备状态、数据上报等。订阅时需要指定具体的AMQP消息通道
- 推送: 一旦订阅成功,物联网平台会根据用户选择的数据类型,自动将相关变更推送到指定的AMQP消息队列中。用户使用AMQP客户端(Apache Qpid)连接到IoT平台即可接收这些更新信息
按照上述流程,开发者需要在IOT平台中创建转发规则,才能让AMQP客户端接收到数据
配置规则参考官方说明:https://support.huaweicloud.com/usermanual-iothub/iot_01_00101.html
- 找到规则->数据转发->创建规则

- 创建规则,其中数据来源为:设备属性

- 添加转发目标,转发目标选择:AMQP推送消息队列

- 上述操作完成后,可以启动规则

Java SDK接入示例
- 下载sdk代码
下载地址:https://developer.obs.myhuaweicloud.com/manage/amqp/amqp-demo.zip
官网说明参考:https://support.huaweicloud.com/usermanual-iothub/iot_01_00100_3.html
- 使用idea打开刚刚下载的amqp-demo中的amqp-client-examples代码目录,如下图

- 修改连接参数
可以直接到AmqpConstants类中进行修改,以下参数的获取都可以通过对应链接说明找到
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.iot.amqp;
public interface AmqpConstants {
String HOST = "38e7abedbf.st1.iotda-app.cn-east-3.myhuaweicloud.com";
int PORT = 5671;
String ACCESS_KEY = "S2DSSeTC5";
String ACCESS_CODE = "61xLTVTzppmBi62h5my2TT2LIXpU";
String DEFAULT_QUEUE = "DefaultQueue"; }
|
- 放开AbstractAmqpExample中的打印消息的代码,如下图:

SDK改造
改造思路
SDK中提供的这个工具类,只是官网提供的参考代码,需要将其改造后集成到自己的项目中,改造思路如下:
- 导入对应的依赖
- 所有的可变参数,如HOST、ACCESS_KEY、ACCESS_CODE、DEFAULT_QUEUE统一在配置文件中维护
- 在项目中根据项目需求配置线程池
- 让Spring进行管理和监听,一旦有数据变化,马上消费,可以让这个类实现ApplicationRunner接口,重新run方法
导入pom依赖
在zzyl-nursing-platform模块中导入依赖,参考官网提供的pom依赖,如下:
1 2 3 4 5 6
| <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-jms-client</artifactId> <version>0.61.0</version> </dependency>
|
配置文件添加iot配置
1 2 3 4 5 6 7 8 9 10 11 12
| # 华为云IOT配置 huaweicloud: ak: ********* sk: ********* regionId: cn-east-3 endpoint: ********* projectId: ********* #amqp相关配置 host: ********* accessKey: ********* accessCode: ********* queueName: DefaultQueue #默认无需改动
|
实现ApplicationRunner接口
让AmqpClient 实现ApplicationRunner接口,重写run方法,然后调用处理数据方法(由main方法改造而来)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241
| package com.zzyl.nursing.job;
import cn.hutool.core.text.CharSequenceUtil; import com.zzyl.framework.config.properties.HuaWeiIotConfigProperties; import lombok.extern.slf4j.Slf4j; import org.apache.qpid.jms.*; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.transports.TransportOptions; import org.apache.qpid.jms.transports.TransportSupport; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component;
import javax.jms.*; import java.net.InetAddress; import java.net.URI; import java.net.UnknownHostException; import java.text.MessageFormat; import java.util.HashMap; import java.util.Map; import java.util.UUID; import java.util.stream.Collectors;
@Slf4j @Component public class AmqpClient implements ApplicationRunner {
@Autowired private HuaWeiIotConfigProperties huaWeiIotConfigProperties;
@Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor;
private static String clientId;
static { try { clientId = InetAddress.getLocalHost().getHostAddress(); } catch (UnknownHostException e) { e.printStackTrace(); } }
@Override public void run(ApplicationArguments args) throws Exception { start(); }
public void start() throws Exception { for (int i = 0; i < huaWeiIotConfigProperties.getConnectionCount(); i++) { Connection connection = getConnection();
((JmsConnection) connection).addConnectionListener(myJmsConnectionListener); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start();
MessageConsumer consumer = newConsumer(session, connection, huaWeiIotConfigProperties.getQueueName()); consumer.setMessageListener(messageListener); }
log.info("amqp is started successfully, and will exit after server shutdown "); }
private Connection getConnection() throws Exception { String connectionUrl = generateConnectUrl(); JmsConnectionFactory cf = new JmsConnectionFactory(connectionUrl); TransportOptions to = new TransportOptions(); to.setTrustAll(true); cf.setSslContext(TransportSupport.createJdkSslContext(to)); String userName = "accessKey=" + huaWeiIotConfigProperties.getAccessKey(); cf.setExtension(JmsConnectionExtensions.USERNAME_OVERRIDE.toString(), (connection, uri) -> { String newUserName = userName; if (connection instanceof JmsConnection) { newUserName = ((JmsConnection) connection).getUsername(); } return newUserName + "|timestamp=" + System.currentTimeMillis(); });
return cf.createConnection(userName, huaWeiIotConfigProperties.getAccessCode()); }
public String generateConnectUrl() { String uri = MessageFormat.format("{0}://{1}:{2}", (huaWeiIotConfigProperties.isUseSsl() ? "amqps" : "amqp"), huaWeiIotConfigProperties.getHost(), String.valueOf(huaWeiIotConfigProperties.getPort())); Map<String, String> uriOptions = new HashMap<>(); uriOptions.put("amqp.vhost", huaWeiIotConfigProperties.getVhost()); uriOptions.put("amqp.idleTimeout", String.valueOf(huaWeiIotConfigProperties.getIdleTimeout())); uriOptions.put("amqp.saslMechanisms", huaWeiIotConfigProperties.getSaslMechanisms());
Map<String, String> jmsOptions = new HashMap<>(); jmsOptions.put("jms.prefetchPolicy.queuePrefetch", String.valueOf(huaWeiIotConfigProperties.getQueuePrefetch())); if (CharSequenceUtil.isNotBlank(clientId)) { jmsOptions.put("jms.clientID", clientId); } else { jmsOptions.put("jms.clientID", UUID.randomUUID().toString()); } jmsOptions.put("failover.reconnectDelay", String.valueOf(huaWeiIotConfigProperties.getReconnectDelay())); jmsOptions.put("failover.maxReconnectDelay", String.valueOf(huaWeiIotConfigProperties.getMaxReconnectDelay())); if (huaWeiIotConfigProperties.getMaxReconnectAttempts() > 0) { jmsOptions.put("failover.maxReconnectAttempts", String.valueOf(huaWeiIotConfigProperties.getMaxReconnectAttempts())); } if (huaWeiIotConfigProperties.getExtendedOptions() != null) { for (Map.Entry<String, String> option : huaWeiIotConfigProperties.getExtendedOptions().entrySet()) { if (option.getKey().startsWith("amqp.") || option.getKey().startsWith("transport.")) { uriOptions.put(option.getKey(), option.getValue()); } else { jmsOptions.put(option.getKey(), option.getValue()); } } } StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append(uriOptions.entrySet().stream() .map(option -> MessageFormat.format("{0}={1}", option.getKey(), option.getValue())) .collect(Collectors.joining("&", "failover:(" + uri + "?", ")"))); stringBuilder.append(jmsOptions.entrySet().stream() .map(option -> MessageFormat.format("{0}={1}", option.getKey(), option.getValue())) .collect(Collectors.joining("&", "?", ""))); return stringBuilder.toString(); }
public MessageConsumer newConsumer(Session session, Connection connection, String queueName) throws Exception { if (connection == null || !(connection instanceof JmsConnection) || ((JmsConnection) connection).isClosed()) { throw new Exception("create consumer failed,the connection is disconnected."); }
return session.createConsumer(new JmsQueue(queueName)); }
private final MessageListener messageListener = message -> { try { threadPoolTaskExecutor.submit(() -> processMessage(message)); } catch (Exception e) { log.error("submit task occurs exception ", e); } };
private void processMessage(Message message) { String contentStr; try { contentStr = message.getBody(String.class); String topic = message.getStringProperty("topic"); String messageId = message.getStringProperty("messageId"); log.info("receive message,\n topic = {},\n messageId = {},\n content = {}", topic, messageId, contentStr); } catch (JMSException e) { throw new RuntimeException("服务器错误"); }
}
private final JmsConnectionListener myJmsConnectionListener = new JmsConnectionListener() {
@Override public void onConnectionEstablished(URI remoteURI) { log.info("onConnectionEstablished, remoteUri:{}", remoteURI); }
@Override public void onConnectionFailure(Throwable error) { log.error("onConnectionFailure, {}", error.getMessage()); }
@Override public void onConnectionInterrupted(URI remoteURI) { log.info("onConnectionInterrupted, remoteUri:{}", remoteURI); }
@Override public void onConnectionRestored(URI remoteURI) { log.info("onConnectionRestored, remoteUri:{}", remoteURI); }
@Override public void onInboundMessage(JmsInboundMessageDispatch envelope) { }
@Override public void onSessionClosed(Session session, Throwable cause) { }
@Override public void onConsumerClosed(MessageConsumer consumer, Throwable cause) { }
@Override public void onProducerClosed(MessageProducer producer, Throwable cause) { } }; }
|
这里代码写完后,在测试过程中,遇到了Message这个类找不到的问题,解决方式:执行maven clean再重启项目
线程池相关配置
在目前若依的项目中已经提供了线程池的配置ThreadPoolTaskExecutor
类的位置:com.zzyl.framework.config.ThreadPoolConfig
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| package com.zzyl.framework.config;
import com.zzyl.common.utils.Threads; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@Configuration public class ThreadPoolConfig { private int corePoolSize = 50;
private int maxPoolSize = 200;
private int queueCapacity = 1000;
private int keepAliveSeconds = 300;
@Bean(name = "threadPoolTaskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setMaxPoolSize(maxPoolSize); executor.setCorePoolSize(corePoolSize); executor.setQueueCapacity(queueCapacity); executor.setKeepAliveSeconds(keepAliveSeconds); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; }
@Bean(name = "scheduledExecutorService") protected ScheduledExecutorService scheduledExecutorService() { return new ScheduledThreadPoolExecutor(corePoolSize, new BasicThreadFactory.Builder().namingPattern("schedule-pool-%d").daemon(true).build(), new ThreadPoolExecutor.CallerRunsPolicy()) { @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); Threads.printException(r, t); } }; } }
|
线程池核心参数和原理
- corePoolSize 核心线程数目
- 到底多少合适?
- 对于IO密集型的项目,一般设置核心线程数为:CPU核数 * 2
- 对于计算密集型的项目,一般设置核心线程数为: CPU核数 + 1
- maximumPoolSize 最大线程数目 = (核心线程+临时线程的最大数目)
- keepAliveTime 生存时间 - 临时线程的生存时间,生存时间内没有新任务,此线程资源会释放
- unit 时间单位 - 临时线程的生存时间单位,如秒、毫秒等
- workQueue - 当没有空闲核心线程时,新来任务会加入到此队列排队,队列满会创建临时线程执行任务
- threadFactory 线程工厂 - 可以定制线程对象的创建,例如设置线程名字、是否是守护线程等
- handler 拒绝策略 - 当所有线程都在繁忙,workQueue 也放满时,会触发拒绝策略
- AbortPolicy:直接抛出异常,默认策略;
- CallerRunsPolicy:用调用者所在的线程来执行任务;
- DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- DiscardPolicy:直接丢弃任务;
- 执行流程(原理)