Procházet zdrojové kódy

ping\ack等

master
xuqm před 1 rokem
rodič
revize
61dedd2085
19 změnil soubory, kde provedl 274 přidání a 61 odebrání
  1. +9
    -0
      push/src/main/java/cn/org/bjca/trust/push/common/DeviceHelper.kt
  2. +34
    -1
      push/src/main/java/cn/org/bjca/trust/push/common/MessageHelper.kt
  3. +1
    -1
      push/src/main/java/cn/org/bjca/trust/push/db/PushDatabase.kt
  4. +7
    -4
      push/src/main/java/cn/org/bjca/trust/push/db/message/MessageDao.kt
  5. +3
    -0
      push/src/main/java/cn/org/bjca/trust/push/db/message/SzyxMessage.kt
  6. +4
    -4
      push/src/main/java/cn/org/bjca/trust/push/enums/PacketType.kt
  7. +1
    -0
      push/src/main/java/cn/org/bjca/trust/push/kit/ImClientInterface.kt
  8. +1
    -1
      push/src/main/java/cn/org/bjca/trust/push/manager/PushSdkManager.kt
  9. +3
    -3
      push/src/main/java/cn/org/bjca/trust/push/message/ImCallback.kt
  10. +2
    -2
      push/src/main/java/cn/org/bjca/trust/push/message/callback/CallbackListener.kt
  11. +179
    -38
      push/src/main/java/cn/org/bjca/trust/push/message/client/ImClient.kt
  12. +0
    -1
      push/src/main/java/cn/org/bjca/trust/push/message/client/ImConnectOptions.kt
  13. +7
    -0
      push/src/main/java/cn/org/bjca/trust/push/message/msg/AckSendMessage.kt
  14. +4
    -4
      push/src/main/java/cn/org/bjca/trust/push/message/msg/BaseMessage.kt
  15. +6
    -0
      push/src/main/java/cn/org/bjca/trust/push/message/msg/ConnAckMessage.kt
  16. +1
    -0
      push/src/main/java/cn/org/bjca/trust/push/message/msg/ConnectMessage.kt
  17. +0
    -2
      push/src/main/java/cn/org/bjca/trust/push/message/msg/DisconnectMessage.kt
  18. +6
    -0
      push/src/main/java/cn/org/bjca/trust/push/message/msg/PangMessage.kt
  19. +6
    -0
      push/src/main/java/cn/org/bjca/trust/push/message/msg/PingMessage.kt

+ 9
- 0
push/src/main/java/cn/org/bjca/trust/push/common/DeviceHelper.kt Zobrazit soubor

@@ -1,6 +1,7 @@
package cn.org.bjca.trust.push.common

import cn.org.bjca.trust.push.db.DbHelper
import cn.org.bjca.trust.push.db.device.Device

object DeviceHelper {
@JvmStatic
@@ -12,6 +13,14 @@ object DeviceHelper {
return device[0].device
}
@JvmStatic
fun getDevice(): Device {
val device = DbHelper.getDataBase().deviceDao().getAll()
if (device.isEmpty()) {
throw Throwable("获取设备信息失败")
}
return device[0]
}
@JvmStatic
fun getManufacturer(): String {
val device = DbHelper.getDataBase().deviceDao().getAll()
if (device.isEmpty()) {


+ 34
- 1
push/src/main/java/cn/org/bjca/trust/push/common/MessageHelper.kt Zobrazit soubor

@@ -1,5 +1,7 @@
package cn.org.bjca.trust.push.common

import cn.org.bjca.trust.push.db.DbHelper
import cn.org.bjca.trust.push.db.message.SzyxMessage
import cn.org.bjca.trust.push.enums.MsgType
import cn.org.bjca.trust.push.message.ImManager
import cn.org.bjca.trust.push.message.bean.TextMessage
@@ -11,7 +13,8 @@ import org.json.JSONException
object MessageHelper {

@JvmStatic
fun parseMessage(msgType: MsgType, sendMessage: SendMessage): Message? {
fun parseMessage(sendMessage: SendMessage): Message? {
val msgType = MsgType.getMsgType(sendMessage.messageType)

val clazz: Class<*> = when (msgType) {
MsgType.Text -> {
@@ -38,6 +41,21 @@ object MessageHelper {
}

@JvmStatic
fun parseMessage(msg: SzyxMessage): Message? {
val sendMessage = SendMessage().apply {
messageId = msg.messageId
timestamp = msg.timestamp
fromClientId = msg.fromClientId
toClientId = msg.toClientId
target = msg.target
messageType = msg.messageType
message = msg.message
tenantNo = msg.tenantNo
}
return parseMessage(sendMessage)
}

@JvmStatic
fun sendMessage(clientId: String, message: Message) {
LogHelper.d("------app端调用发送消息-----")
val sendMessage = SendMessage()
@@ -50,4 +68,19 @@ object MessageHelper {
sendMessage.message = message.createContentJsonStr()
ImManager.instance.sendMessage(sendMessage)
}

fun changeStatus(msgId: String, status: Boolean): Message? {
val m = DbHelper.getDataBase().messageDao().getMessage(msgId)
if (m.isNotEmpty()) {
val ms = m[0]
if (ms.status == 0) {
ms.status = if (status) 1 else 2
DbHelper.getDataBase().messageDao().update(ms)
parseMessage(ms)?.also { ite ->
return ite
}
}
}
return null
}
}

+ 1
- 1
push/src/main/java/cn/org/bjca/trust/push/db/PushDatabase.kt Zobrazit soubor

@@ -8,7 +8,7 @@ import cn.org.bjca.trust.push.db.message.MessageDao
import cn.org.bjca.trust.push.db.message.SzyxMessage
import cn.org.bjca.trust.push.message.msg.SendMessage

@Database(entities = [Device::class, SzyxMessage::class], version = 1)
@Database(entities = [Device::class, SzyxMessage::class], version = 1, exportSchema=false)
abstract class PushDatabase : RoomDatabase() {
abstract fun deviceDao(): DeviceDao
abstract fun messageDao(): MessageDao


+ 7
- 4
push/src/main/java/cn/org/bjca/trust/push/db/message/MessageDao.kt Zobrazit soubor

@@ -1,18 +1,21 @@
package cn.org.bjca.trust.push.db.message

import androidx.room.Dao
import androidx.room.Delete
import androidx.room.Insert
import androidx.room.Query
import androidx.room.*

@Dao
interface MessageDao {
@Query("SELECT * FROM message")
fun getAll(): List<SzyxMessage>

@Query("SELECT * FROM message WHERE message_id = :msgId")
fun getMessage(msgId: String): List<SzyxMessage>

@Insert
fun insertAll(vararg msg: SzyxMessage)

@Update
fun update(msg: SzyxMessage)

@Delete
fun delete(msg: SzyxMessage)
}

+ 3
- 0
push/src/main/java/cn/org/bjca/trust/push/db/message/SzyxMessage.kt Zobrazit soubor

@@ -22,6 +22,9 @@ data class SzyxMessage(

@ColumnInfo(name = "tenantNo") var tenantNo: String? = null,

// 0=发送中;1-发送成功;2=发送失败
@ColumnInfo(name = "status") var status: Int = 0,

@PrimaryKey(autoGenerate = true) var p_id: Int? = null,

)

+ 4
- 4
push/src/main/java/cn/org/bjca/trust/push/enums/PacketType.kt Zobrazit soubor

@@ -2,11 +2,11 @@ package cn.org.bjca.trust.push.enums

enum class PacketType(value: Int) {
CONNECT(10),
// CONNACK(11),
CONNACK(11),
SEND(20),
// SENDACK(21),
// PINGREQ(30),
// PINGRESP(31),
SENDACK(21),
PING(30),
PANG(31),
DISCONNECT(40),
UNKNOWN(-1);
}

+ 1
- 0
push/src/main/java/cn/org/bjca/trust/push/kit/ImClientInterface.kt Zobrazit soubor

@@ -8,6 +8,7 @@ interface ImClientInterface {
fun connect(mImConnectOptions: ImConnectOptions)
fun reConnect()
fun disConnect()
fun unregister()
fun isConnect(): Boolean
fun isConnecting(): Boolean
fun sendMessage(msg: SendMessage)

+ 1
- 1
push/src/main/java/cn/org/bjca/trust/push/manager/PushSdkManager.kt Zobrazit soubor

@@ -52,7 +52,7 @@ class PushSdkManager : SdkInterface {
}

override fun unregister(context: Context, userId: String) {
TODO("Not yet implemented")
ImManager.instance.unregister()
}




+ 3
- 3
push/src/main/java/cn/org/bjca/trust/push/message/ImCallback.kt Zobrazit soubor

@@ -8,12 +8,12 @@ import cn.org.bjca.trust.push.message.msg.Message
import cn.org.bjca.trust.push.message.msg.SendMessage

class ImCallback : CallbackListener {
override fun sendComplete(msg: SendMessage) {
override fun sendComplete(msg: Message) {
LogHelper.d("------发送消息完成------")
}

override fun sendError(msg: SendMessage) {
LogHelper.d("消息发送失败")
override fun sendError(code:String, msg: Message) {
LogHelper.d("消息发送失败::${code}")
}

override fun messageArrived(msg: Message) {


+ 2
- 2
push/src/main/java/cn/org/bjca/trust/push/message/callback/CallbackListener.kt Zobrazit soubor

@@ -7,10 +7,10 @@ import cn.org.bjca.trust.push.message.msg.SendMessage

interface CallbackListener {
// 发送消息完成
fun sendComplete(msg: SendMessage)
fun sendComplete(msg: Message)

// 发送消息失败
fun sendError(msg: SendMessage)
fun sendError(code:String, msg: Message)

// 消息到达
fun messageArrived(msg: Message)


+ 179
- 38
push/src/main/java/cn/org/bjca/trust/push/message/client/ImClient.kt Zobrazit soubor

@@ -1,14 +1,20 @@
package cn.org.bjca.trust.push.message.client

import android.os.Handler
import android.os.Looper
import android.os.Message
import cn.org.bjca.trust.push.BuildConfig
import cn.org.bjca.trust.push.common.DeviceHelper
import cn.org.bjca.trust.push.common.LogHelper
import cn.org.bjca.trust.push.common.MessageHelper
import cn.org.bjca.trust.push.common.json.GsonImplHelp
import cn.org.bjca.trust.push.db.DbHelper
import cn.org.bjca.trust.push.db.message.SzyxMessage
import cn.org.bjca.trust.push.enums.ConnAckReturnCode
import cn.org.bjca.trust.push.enums.MsgType
import cn.org.bjca.trust.push.enums.OsType
import cn.org.bjca.trust.push.enums.PacketType
import cn.org.bjca.trust.push.kit.ImClientInterface
import cn.org.bjca.trust.push.message.msg.SendMessage
import cn.org.bjca.trust.push.message.msg.*
import org.eclipse.paho.client.mqttv3.*
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence
import java.util.concurrent.Executors
@@ -93,26 +99,7 @@ class ImClient : ImClientInterface {
}

override fun messageArrived(topic: String?, message: MqttMessage) {
val sendMessage = GsonImplHelp.get()
.toObject(String(message.payload, Charsets.UTF_8), SendMessage::class.java)
DbHelper.getDataBase().messageDao().insertAll(
SzyxMessage(
sendMessage.messageId,
sendMessage.timestamp,
sendMessage.fromClientId,
sendMessage.toClientId,
sendMessage.target,
sendMessage.messageType,
sendMessage.message,
sendMessage.tenantNo
)
)
MessageHelper.parseMessage(
MsgType.getMsgType(sendMessage.messageType),
sendMessage
)?.also {
connectOptions.callback?.messageArrived(it)
}
arrived(message.payload)
}

override fun deliveryComplete(token: IMqttDeliveryToken?) {
@@ -123,13 +110,63 @@ class ImClient : ImClientInterface {

override fun connectComplete(reconnect: Boolean, serverURI: String) {
startReconnect()
connectOptions.callback?.connected()
mqttArrivedClient.subscribe(connectOptions.clientId, 2)
}
})
}
}

private fun arrived(payload: ByteArray) {
val msg =
GsonImplHelp.get().toObject(String(payload, Charsets.UTF_8), BaseMessage::class.java)

LogHelper.e(GsonImplHelp.get().toJson(msg))

when (msg.getPacketType()) {
PacketType.CONNACK -> {
connectOptions.callback?.connected()
}
PacketType.SEND -> {
val sendMessage = GsonImplHelp.get().toObject(String(payload, Charsets.UTF_8), SendMessage::class.java)
// 存表
DbHelper.getDataBase().messageDao().insertAll(
SzyxMessage(
sendMessage.messageId,
sendMessage.timestamp,
sendMessage.fromClientId,
sendMessage.toClientId,
sendMessage.target,
sendMessage.messageType,
sendMessage.message,
sendMessage.tenantNo,
1
)
)
// 回复服务端收到
sendMessage.messageId?.let { ackMessage(it) }
// 通知app新消息到达
MessageHelper.parseMessage(sendMessage)?.also {
connectOptions.callback?.messageArrived(it)
}
}
PacketType.SENDACK -> {
val ackSendMessage = GsonImplHelp.get().toObject(String(payload, Charsets.UTF_8), AckSendMessage::class.java)
ackSendMessage.messageId?.let {
MessageHelper.changeStatus(it, true)
?.let { it1 -> connectOptions.callback?.sendComplete(it1) }
}

}
PacketType.PANG -> {

}
else -> {

}
}


}

override fun reConnect() {
if (!startConnect) return
init()
@@ -138,19 +175,31 @@ class ImClient : ImClientInterface {

override fun disConnect() {
startConnect = false
init()
mqttArrivedClient.disconnect()
mqttSendClient.disconnect()
if (::mqttArrivedClient.isInitialized && mqttArrivedClient.isConnected) mqttArrivedClient.disconnect()
if (::mqttSendClient.isInitialized && mqttSendClient.isConnected) mqttSendClient.disconnect()
isConnecting = false
}


private lateinit var scheduler: ScheduledExecutorService
private lateinit var pingScheduler: ScheduledExecutorService

fun startReconnect() {
if (!::scheduler.isInitialized) {
mqttArrivedClient.subscribe(connectOptions.clientId, 2)
LogHelper.e("'----------------------------------------------'")
sendConnectMessage()
scheduler = Executors.newSingleThreadScheduledExecutor()
scheduler.scheduleAtFixedRate({
reConnect()
}, 2, 5, TimeUnit.SECONDS)
}, 2, 3, TimeUnit.SECONDS)
}

if (!::pingScheduler.isInitialized) {
pingScheduler = Executors.newSingleThreadScheduledExecutor()
pingScheduler.scheduleAtFixedRate({
ping()
}, 3, connectOptions.pingInterval.toLong(), TimeUnit.SECONDS)
}

}
@@ -164,20 +213,28 @@ class ImClient : ImClientInterface {
}

override fun isConnecting(): Boolean {
return false
return isConnecting
}

private val handler = object : Handler(Looper.getMainLooper()) {
override fun handleMessage(msg: Message) {
super.handleMessage(msg)
val msgId = msg.obj as String
MessageHelper.changeStatus(msgId, false)?.let {
connectOptions.callback?.sendError("1001", it)
}
}

}

override fun sendMessage(msg: SendMessage) {
if (!isConnect()) {
connectOptions.callback?.sendError(msg)
MessageHelper.parseMessage(msg)?.let { connectOptions.callback?.sendError("1002", it) }
return
}
try {
mqttSendClient.publish(
"server",
GsonImplHelp.get().toJson(msg).toByteArray(Charsets.UTF_8),
2,
true
"server", GsonImplHelp.get().toJson(msg).toByteArray(Charsets.UTF_8), 2, true
)
DbHelper.getDataBase().messageDao().insertAll(
SzyxMessage(
@@ -188,15 +245,99 @@ class ImClient : ImClientInterface {
msg.target,
msg.messageType,
msg.message,
msg.tenantNo
msg.tenantNo,
0
)
)
connectOptions.callback?.sendComplete(msg)
handler.sendMessageDelayed(android.os.Message().apply {
what = 10086
obj = msg.messageId
}, 1500)
} catch (e: MqttException) {
LogHelper.e("发送消息失败", e)

connectOptions.callback?.sendError(msg)
e.printStackTrace()
MessageHelper.parseMessage(msg)?.let { connectOptions.callback?.sendError("1003", it) }
}
}

override fun unregister() {
if (!isConnect()) {
disConnect()
return
}
try {
mqttSendClient.publish(
"server",
GsonImplHelp.get().toJson(DisconnectMessage()).toByteArray(Charsets.UTF_8),
2,
false
)
disConnect()
} catch (e: MqttException) {
LogHelper.d("发送消息失败", e)
}
}


/**
* im连接成功,发送上线消息
*/
private fun sendConnectMessage() {
if (!isConnect()) {
return
}
try {
mqttSendClient.publish(
"server", GsonImplHelp.get().toJson(ConnectMessage().apply {
osType = OsType.ANDROID
osVer = DeviceHelper.getDevice().model
sdkVer = BuildConfig.versionName
clientId = connectOptions.clientId
deviceId = connectOptions.deviceId
}).toByteArray(Charsets.UTF_8), 2, false
)
} catch (e: MqttException) {
LogHelper.d("ping失败", e)
}

}

/**
* ping
*/
private fun ping() {
if (!isConnect()) {
return
}
try {
mqttSendClient.publish(
"server",
GsonImplHelp.get().toJson(PingMessage()).toByteArray(Charsets.UTF_8),
2,
false
)
} catch (e: MqttException) {
LogHelper.d("ping失败", e)
}

}

/**
* ack
*/
private fun ackMessage(msgId: String) {
if (!isConnect()) {
return
}
try {
mqttSendClient.publish(
"server", GsonImplHelp.get().toJson(AckSendMessage().apply {
messageId = msgId
}).toByteArray(Charsets.UTF_8), 2, false
)
} catch (e: MqttException) {
LogHelper.d("ping失败", e)
}

}
}

+ 0
- 1
push/src/main/java/cn/org/bjca/trust/push/message/client/ImConnectOptions.kt Zobrazit soubor

@@ -21,5 +21,4 @@ class ImConnectOptions(
var connectTimeOutSecond = 30
var callback: CallbackListener? = null
var qrCodeType: QRCodeType? = null
var pingService: PingService? = null
}

+ 7
- 0
push/src/main/java/cn/org/bjca/trust/push/message/msg/AckSendMessage.kt Zobrazit soubor

@@ -0,0 +1,7 @@
package cn.org.bjca.trust.push.message.msg

import cn.org.bjca.trust.push.enums.PacketType

class AckSendMessage : BaseMessage(PacketType.SENDACK) {
var messageId: String? = null
}

+ 4
- 4
push/src/main/java/cn/org/bjca/trust/push/message/msg/BaseMessage.kt Zobrazit soubor

@@ -21,11 +21,11 @@ open class BaseMessage protected constructor(packetType: PacketType) {
open fun getClassz(packetType: PacketType): Class<*> {
return when (packetType) {
PacketType.CONNECT -> ConnectMessage::class.java
// PacketType.CONNACK -> ConnAckMessage::class.java
PacketType.CONNACK -> ConnAckMessage::class.java
PacketType.SEND -> SendMessage::class.java
// PacketType.SENDACK -> SendAckMessage::class.java
// PacketType.PINGREQ -> PingReqMessage::class.java
// PacketType.PINGRESP -> PingRespMessage::class.java
PacketType.SENDACK -> AckSendMessage::class.java
PacketType.PING -> PingMessage::class.java
PacketType.PANG -> PangMessage::class.java
PacketType.DISCONNECT -> DisconnectMessage::class.java
else -> throw IllegalStateException("协议类型不正确!")
}


+ 6
- 0
push/src/main/java/cn/org/bjca/trust/push/message/msg/ConnAckMessage.kt Zobrazit soubor

@@ -0,0 +1,6 @@
package cn.org.bjca.trust.push.message.msg

import cn.org.bjca.trust.push.enums.PacketType

class ConnAckMessage:BaseMessage(PacketType.CONNACK) {
}

+ 1
- 0
push/src/main/java/cn/org/bjca/trust/push/message/msg/ConnectMessage.kt Zobrazit soubor

@@ -11,6 +11,7 @@ class ConnectMessage : BaseMessage(PacketType.CONNECT) {
var sessionKey: String? = null
var osType: OsType? = null
var osVer: String? = null
var sdkVer: String? = null
var deviceType: DeviceType? = null
var token: String? = null
var clientId: String? = null


+ 0
- 2
push/src/main/java/cn/org/bjca/trust/push/message/msg/DisconnectMessage.kt Zobrazit soubor

@@ -1,8 +1,6 @@
package cn.org.bjca.trust.push.message.msg

import cn.org.bjca.trust.push.enums.PacketType
import cn.org.bjca.trust.push.message.msg.BaseMessage

class DisconnectMessage : BaseMessage(PacketType.DISCONNECT) {
var requestCode: Int? = -1
}

+ 6
- 0
push/src/main/java/cn/org/bjca/trust/push/message/msg/PangMessage.kt Zobrazit soubor

@@ -0,0 +1,6 @@
package cn.org.bjca.trust.push.message.msg

import cn.org.bjca.trust.push.enums.PacketType

class PangMessage : BaseMessage(PacketType.PANG) {
}

+ 6
- 0
push/src/main/java/cn/org/bjca/trust/push/message/msg/PingMessage.kt Zobrazit soubor

@@ -0,0 +1,6 @@
package cn.org.bjca.trust.push.message.msg

import cn.org.bjca.trust.push.enums.PacketType

class PingMessage : BaseMessage(PacketType.PING) {
}

Načítá se…
Zrušit
Uložit