diff --git a/push/src/main/java/cn/org/bjca/trust/push/message/client/ImClient.kt b/push/src/main/java/cn/org/bjca/trust/push/message/client/ImClient.kt index 3561660..51c79e3 100644 --- a/push/src/main/java/cn/org/bjca/trust/push/message/client/ImClient.kt +++ b/push/src/main/java/cn/org/bjca/trust/push/message/client/ImClient.kt @@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit class ImClient : ImClientInterface { private lateinit var connectOptions: ImConnectOptions - private lateinit var mqttSendClient: MqttClient private lateinit var mqttArrivedClient: MqttClient private lateinit var options: MqttConnectOptions private var isConnecting = false @@ -40,12 +39,10 @@ class ImClient : ImClientInterface { isConnecting = true Thread { try { - if (!mqttSendClient.isConnected) mqttSendClient.connect(options) - } catch (e: Exception) { - connectOptions.callback?.connectError(ConnAckReturnCode.CONNECTED) - } - try { - if (!mqttArrivedClient.isConnected) mqttArrivedClient.connect(options) + if (!mqttArrivedClient.isConnected) { + LogHelper.d("------------开始重连::mqttArrivedClient--------------") + mqttArrivedClient.connect(options) + } } catch (e: Exception) { connectOptions.callback?.connectError(ConnAckReturnCode.CONNECTED) } @@ -54,32 +51,6 @@ class ImClient : ImClientInterface { } private fun init() { - if (!::mqttSendClient.isInitialized) { - mqttSendClient = MqttClient( - "tcp://${connectOptions.host}:${connectOptions.port}", - "${connectOptions.clientId}_send", - MemoryPersistence() - ) - options = MqttConnectOptions() - options.isCleanSession = true - options.userName = connectOptions.clientId - options.password = "xuqinmin1022".toCharArray() - options.connectionTimeout = connectOptions.connectTimeOutSecond - options.keepAliveInterval = connectOptions.pingInterval - options.isAutomaticReconnect = false - mqttSendClient.setCallback(object : MqttCallback { - override fun connectionLost(cause: Throwable?) { - connectOptions.callback?.connectionLost() - reConnect() - } - - override fun messageArrived(topic: String?, message: MqttMessage) { - } - - override fun deliveryComplete(token: IMqttDeliveryToken?) { - } - }) - } if (!::mqttArrivedClient.isInitialized) { mqttArrivedClient = MqttClient( "tcp://${connectOptions.host}:${connectOptions.port}", @@ -109,6 +80,8 @@ class ImClient : ImClientInterface { } override fun connectComplete(reconnect: Boolean, serverURI: String) { + mqttArrivedClient.subscribe(connectOptions.clientId, 2) + sendConnectMessage() startReconnect() } }) @@ -119,14 +92,13 @@ class ImClient : ImClientInterface { val msg = GsonImplHelp.get().toObject(String(payload, Charsets.UTF_8), BaseMessage::class.java) - LogHelper.e(GsonImplHelp.get().toJson(msg)) - when (msg.getPacketType()) { PacketType.CONNACK -> { connectOptions.callback?.connected() } PacketType.SEND -> { - val sendMessage = GsonImplHelp.get().toObject(String(payload, Charsets.UTF_8), SendMessage::class.java) + val sendMessage = GsonImplHelp.get() + .toObject(String(payload, Charsets.UTF_8), SendMessage::class.java) // 存表 DbHelper.getDataBase().messageDao().insertAll( SzyxMessage( @@ -149,7 +121,8 @@ class ImClient : ImClientInterface { } } PacketType.SENDACK -> { - val ackSendMessage = GsonImplHelp.get().toObject(String(payload, Charsets.UTF_8), AckSendMessage::class.java) + val ackSendMessage = GsonImplHelp.get() + .toObject(String(payload, Charsets.UTF_8), AckSendMessage::class.java) ackSendMessage.messageId?.let { MessageHelper.changeStatus(it, true) ?.let { it1 -> connectOptions.callback?.sendComplete(it1) } @@ -176,7 +149,6 @@ class ImClient : ImClientInterface { override fun disConnect() { startConnect = false if (::mqttArrivedClient.isInitialized && mqttArrivedClient.isConnected) mqttArrivedClient.disconnect() - if (::mqttSendClient.isInitialized && mqttSendClient.isConnected) mqttSendClient.disconnect() isConnecting = false } @@ -184,11 +156,9 @@ class ImClient : ImClientInterface { private lateinit var scheduler: ScheduledExecutorService private lateinit var pingScheduler: ScheduledExecutorService + // ping和断线重连 fun startReconnect() { if (!::scheduler.isInitialized) { - mqttArrivedClient.subscribe(connectOptions.clientId, 2) - LogHelper.e("'----------------------------------------------'") - sendConnectMessage() scheduler = Executors.newSingleThreadScheduledExecutor() scheduler.scheduleAtFixedRate({ reConnect() @@ -205,11 +175,7 @@ class ImClient : ImClientInterface { } override fun isConnect(): Boolean { - return if (::mqttSendClient.isInitialized && ::mqttArrivedClient.isInitialized) { - mqttSendClient.isConnected && mqttArrivedClient.isConnected - } else { - false - } + return ::mqttArrivedClient.isInitialized && mqttArrivedClient.isConnected } override fun isConnecting(): Boolean { @@ -228,27 +194,28 @@ class ImClient : ImClientInterface { } override fun sendMessage(msg: SendMessage) { + DbHelper.getDataBase().messageDao().insertAll( + SzyxMessage( + msg.messageId, + msg.timestamp, + msg.fromClientId, + msg.toClientId, + msg.target, + msg.messageType, + msg.message, + msg.tenantNo, + 0 + ) + ) if (!isConnect()) { + msg.messageId?.let { MessageHelper.changeStatus(it,false) } MessageHelper.parseMessage(msg)?.let { connectOptions.callback?.sendError("1002", it) } return } try { - mqttSendClient.publish( + mqttArrivedClient.publish( "server", GsonImplHelp.get().toJson(msg).toByteArray(Charsets.UTF_8), 2, true ) - DbHelper.getDataBase().messageDao().insertAll( - SzyxMessage( - msg.messageId, - msg.timestamp, - msg.fromClientId, - msg.toClientId, - msg.target, - msg.messageType, - msg.message, - msg.tenantNo, - 0 - ) - ) handler.sendMessageDelayed(android.os.Message().apply { what = 10086 obj = msg.messageId @@ -257,6 +224,7 @@ class ImClient : ImClientInterface { LogHelper.e("发送消息失败", e) e.printStackTrace() MessageHelper.parseMessage(msg)?.let { connectOptions.callback?.sendError("1003", it) } + msg.messageId?.let { MessageHelper.changeStatus(it,false) } } } @@ -266,7 +234,7 @@ class ImClient : ImClientInterface { return } try { - mqttSendClient.publish( + mqttArrivedClient.publish( "server", GsonImplHelp.get().toJson(DisconnectMessage()).toByteArray(Charsets.UTF_8), 2, @@ -287,7 +255,7 @@ class ImClient : ImClientInterface { return } try { - mqttSendClient.publish( + mqttArrivedClient.publish( "server", GsonImplHelp.get().toJson(ConnectMessage().apply { osType = OsType.ANDROID osVer = DeviceHelper.getDevice().model @@ -310,7 +278,7 @@ class ImClient : ImClientInterface { return } try { - mqttSendClient.publish( + mqttArrivedClient.publish( "server", GsonImplHelp.get().toJson(PingMessage()).toByteArray(Charsets.UTF_8), 2, @@ -330,7 +298,7 @@ class ImClient : ImClientInterface { return } try { - mqttSendClient.publish( + mqttArrivedClient.publish( "server", GsonImplHelp.get().toJson(AckSendMessage().apply { messageId = msgId }).toByteArray(Charsets.UTF_8), 2, false