Przeglądaj źródła

ping\ack等

master
xuqm 1 rok temu
rodzic
commit
d88eed56f1
1 zmienionych plików z 32 dodań i 64 usunięć
  1. +32
    -64
      push/src/main/java/cn/org/bjca/trust/push/message/client/ImClient.kt

+ 32
- 64
push/src/main/java/cn/org/bjca/trust/push/message/client/ImClient.kt Wyświetl plik

@@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit

class ImClient : ImClientInterface {
private lateinit var connectOptions: ImConnectOptions
private lateinit var mqttSendClient: MqttClient
private lateinit var mqttArrivedClient: MqttClient
private lateinit var options: MqttConnectOptions
private var isConnecting = false
@@ -40,12 +39,10 @@ class ImClient : ImClientInterface {
isConnecting = true
Thread {
try {
if (!mqttSendClient.isConnected) mqttSendClient.connect(options)
} catch (e: Exception) {
connectOptions.callback?.connectError(ConnAckReturnCode.CONNECTED)
}
try {
if (!mqttArrivedClient.isConnected) mqttArrivedClient.connect(options)
if (!mqttArrivedClient.isConnected) {
LogHelper.d("------------开始重连::mqttArrivedClient--------------")
mqttArrivedClient.connect(options)
}
} catch (e: Exception) {
connectOptions.callback?.connectError(ConnAckReturnCode.CONNECTED)
}
@@ -54,32 +51,6 @@ class ImClient : ImClientInterface {
}

private fun init() {
if (!::mqttSendClient.isInitialized) {
mqttSendClient = MqttClient(
"tcp://${connectOptions.host}:${connectOptions.port}",
"${connectOptions.clientId}_send",
MemoryPersistence()
)
options = MqttConnectOptions()
options.isCleanSession = true
options.userName = connectOptions.clientId
options.password = "xuqinmin1022".toCharArray()
options.connectionTimeout = connectOptions.connectTimeOutSecond
options.keepAliveInterval = connectOptions.pingInterval
options.isAutomaticReconnect = false
mqttSendClient.setCallback(object : MqttCallback {
override fun connectionLost(cause: Throwable?) {
connectOptions.callback?.connectionLost()
reConnect()
}

override fun messageArrived(topic: String?, message: MqttMessage) {
}

override fun deliveryComplete(token: IMqttDeliveryToken?) {
}
})
}
if (!::mqttArrivedClient.isInitialized) {
mqttArrivedClient = MqttClient(
"tcp://${connectOptions.host}:${connectOptions.port}",
@@ -109,6 +80,8 @@ class ImClient : ImClientInterface {
}

override fun connectComplete(reconnect: Boolean, serverURI: String) {
mqttArrivedClient.subscribe(connectOptions.clientId, 2)
sendConnectMessage()
startReconnect()
}
})
@@ -119,14 +92,13 @@ class ImClient : ImClientInterface {
val msg =
GsonImplHelp.get().toObject(String(payload, Charsets.UTF_8), BaseMessage::class.java)

LogHelper.e(GsonImplHelp.get().toJson(msg))

when (msg.getPacketType()) {
PacketType.CONNACK -> {
connectOptions.callback?.connected()
}
PacketType.SEND -> {
val sendMessage = GsonImplHelp.get().toObject(String(payload, Charsets.UTF_8), SendMessage::class.java)
val sendMessage = GsonImplHelp.get()
.toObject(String(payload, Charsets.UTF_8), SendMessage::class.java)
// 存表
DbHelper.getDataBase().messageDao().insertAll(
SzyxMessage(
@@ -149,7 +121,8 @@ class ImClient : ImClientInterface {
}
}
PacketType.SENDACK -> {
val ackSendMessage = GsonImplHelp.get().toObject(String(payload, Charsets.UTF_8), AckSendMessage::class.java)
val ackSendMessage = GsonImplHelp.get()
.toObject(String(payload, Charsets.UTF_8), AckSendMessage::class.java)
ackSendMessage.messageId?.let {
MessageHelper.changeStatus(it, true)
?.let { it1 -> connectOptions.callback?.sendComplete(it1) }
@@ -176,7 +149,6 @@ class ImClient : ImClientInterface {
override fun disConnect() {
startConnect = false
if (::mqttArrivedClient.isInitialized && mqttArrivedClient.isConnected) mqttArrivedClient.disconnect()
if (::mqttSendClient.isInitialized && mqttSendClient.isConnected) mqttSendClient.disconnect()
isConnecting = false
}

@@ -184,11 +156,9 @@ class ImClient : ImClientInterface {
private lateinit var scheduler: ScheduledExecutorService
private lateinit var pingScheduler: ScheduledExecutorService

// ping和断线重连
fun startReconnect() {
if (!::scheduler.isInitialized) {
mqttArrivedClient.subscribe(connectOptions.clientId, 2)
LogHelper.e("'----------------------------------------------'")
sendConnectMessage()
scheduler = Executors.newSingleThreadScheduledExecutor()
scheduler.scheduleAtFixedRate({
reConnect()
@@ -205,11 +175,7 @@ class ImClient : ImClientInterface {
}

override fun isConnect(): Boolean {
return if (::mqttSendClient.isInitialized && ::mqttArrivedClient.isInitialized) {
mqttSendClient.isConnected && mqttArrivedClient.isConnected
} else {
false
}
return ::mqttArrivedClient.isInitialized && mqttArrivedClient.isConnected
}

override fun isConnecting(): Boolean {
@@ -228,27 +194,28 @@ class ImClient : ImClientInterface {
}

override fun sendMessage(msg: SendMessage) {
DbHelper.getDataBase().messageDao().insertAll(
SzyxMessage(
msg.messageId,
msg.timestamp,
msg.fromClientId,
msg.toClientId,
msg.target,
msg.messageType,
msg.message,
msg.tenantNo,
0
)
)
if (!isConnect()) {
msg.messageId?.let { MessageHelper.changeStatus(it,false) }
MessageHelper.parseMessage(msg)?.let { connectOptions.callback?.sendError("1002", it) }
return
}
try {
mqttSendClient.publish(
mqttArrivedClient.publish(
"server", GsonImplHelp.get().toJson(msg).toByteArray(Charsets.UTF_8), 2, true
)
DbHelper.getDataBase().messageDao().insertAll(
SzyxMessage(
msg.messageId,
msg.timestamp,
msg.fromClientId,
msg.toClientId,
msg.target,
msg.messageType,
msg.message,
msg.tenantNo,
0
)
)
handler.sendMessageDelayed(android.os.Message().apply {
what = 10086
obj = msg.messageId
@@ -257,6 +224,7 @@ class ImClient : ImClientInterface {
LogHelper.e("发送消息失败", e)
e.printStackTrace()
MessageHelper.parseMessage(msg)?.let { connectOptions.callback?.sendError("1003", it) }
msg.messageId?.let { MessageHelper.changeStatus(it,false) }
}
}

@@ -266,7 +234,7 @@ class ImClient : ImClientInterface {
return
}
try {
mqttSendClient.publish(
mqttArrivedClient.publish(
"server",
GsonImplHelp.get().toJson(DisconnectMessage()).toByteArray(Charsets.UTF_8),
2,
@@ -287,7 +255,7 @@ class ImClient : ImClientInterface {
return
}
try {
mqttSendClient.publish(
mqttArrivedClient.publish(
"server", GsonImplHelp.get().toJson(ConnectMessage().apply {
osType = OsType.ANDROID
osVer = DeviceHelper.getDevice().model
@@ -310,7 +278,7 @@ class ImClient : ImClientInterface {
return
}
try {
mqttSendClient.publish(
mqttArrivedClient.publish(
"server",
GsonImplHelp.get().toJson(PingMessage()).toByteArray(Charsets.UTF_8),
2,
@@ -330,7 +298,7 @@ class ImClient : ImClientInterface {
return
}
try {
mqttSendClient.publish(
mqttArrivedClient.publish(
"server", GsonImplHelp.get().toJson(AckSendMessage().apply {
messageId = msgId
}).toByteArray(Charsets.UTF_8), 2, false


Ładowanie…
Anuluj
Zapisz