@@ -37,6 +37,10 @@ | |||
<artifactId>gson</artifactId> | |||
<version>2.9.0</version> | |||
</dependency> | |||
<dependency> | |||
<groupId>org.springframework.boot</groupId> | |||
<artifactId>spring-boot-starter-webflux</artifactId> | |||
</dependency> | |||
</dependencies> | |||
@@ -1,17 +1,49 @@ | |||
package cn.org.bjca.trust.pushdemo.controller; | |||
import org.springframework.web.bind.annotation.*; | |||
import org.apache.tomcat.util.codec.binary.Base64; | |||
import org.springframework.http.HttpHeaders; | |||
import org.springframework.util.MultiValueMap; | |||
import org.springframework.web.bind.annotation.GetMapping; | |||
import org.springframework.web.bind.annotation.PathVariable; | |||
import org.springframework.web.bind.annotation.RequestMapping; | |||
import org.springframework.web.bind.annotation.RestController; | |||
import org.springframework.web.reactive.function.BodyInserters; | |||
import org.springframework.web.reactive.function.client.WebClient; | |||
import java.nio.charset.StandardCharsets; | |||
@RestController | |||
@RequestMapping("hello") | |||
public class HelloController { | |||
@GetMapping("/{id}") | |||
public String getById(@PathVariable Integer id) { | |||
public String getById(@PathVariable String id) { | |||
System.out.println("id ==> " + id); | |||
return "{\n" + | |||
" \"msg\": \"qdxorigin\",\n" + | |||
" \"status\": \"200\",\n" + | |||
" \"data\": \"Sbfuiaefhaikufhcsauik\"\n" + | |||
"}"; | |||
WebClient webClient = WebClient.builder().baseUrl("http://127.0.0.1:15672").defaultHeaders(httpHeaders -> httpHeaders.addAll(getHeaders())).build(); | |||
webClient.put().uri("/api/users/{userId}", id) // 请求路径 | |||
.body(BodyInserters.fromValue("{\n" + " \"username\": \"" + id + "\",\n" + " \"password\": \"" + id + "\",\n" + " \"tags\": \"none\"\n" + "}")).retrieve(); | |||
webClient.put().uri("/api/permissions/{vhost}/{userId}", "/", id) // 请求路径 | |||
.body(BodyInserters.fromValue("{\n" + " \"username\": \"" + id + "\",\n" + " \"vhost\": \"/\",\n" + " \"configure\": \".*\",\n" + " \"write\": \".*\",\n" + " \"read\": \".*\"\n" + "}")).retrieve(); | |||
webClient.put().uri("/api/topic-permissions/{vhost}/{userId}", "/", id) // 请求路径 | |||
.body(BodyInserters.fromValue("{\n" + " \"username\": \"" + id + "\",\n" + " \"vhost\": \"/\",\n" + " \"exchange\": \"\",\n" + " \"write\": \".*\",\n" + " \"read\": \".*\"\n" + "}")).retrieve(); | |||
return "{\n" + " \"msg\": \"qdxorigin\",\n" + " \"status\": \"200\",\n" + " \"data\": \"Sbfuiaefhaikufhcsauik\"\n" + "}"; | |||
} | |||
String enc = new String(Base64.encodeBase64("admin:admin".getBytes(StandardCharsets.ISO_8859_1), false)); | |||
HttpHeaders headers = null; | |||
private MultiValueMap<String, String> getHeaders() { | |||
if (headers == null) { | |||
headers = new HttpHeaders(); | |||
headers.add(HttpHeaders.AUTHORIZATION, "Basic " + enc); | |||
headers.add(HttpHeaders.CONTENT_TYPE, "application/json"); | |||
} | |||
return headers; | |||
} | |||
} |
@@ -1,9 +1,17 @@ | |||
package cn.org.bjca.trust.pushdemo.message; | |||
import cn.org.bjca.trust.pushdemo.json.GsonImplHelp; | |||
import org.apache.tomcat.util.codec.binary.Base64; | |||
import org.eclipse.paho.client.mqttv3.*; | |||
import org.springframework.http.HttpHeaders; | |||
import org.springframework.util.MultiValueMap; | |||
import org.springframework.web.reactive.function.client.WebClient; | |||
import reactor.core.publisher.Mono; | |||
import java.nio.charset.StandardCharsets; | |||
import java.util.ArrayList; | |||
import java.util.List; | |||
import java.util.Objects; | |||
public class ImClient implements ImClientInterface { | |||
@@ -76,6 +84,18 @@ public class ImClient implements ImClientInterface { | |||
System.out.println("----------------------掉线--------------------------"); | |||
} | |||
final String enc = new String(Base64.encodeBase64("admin:admin".getBytes(StandardCharsets.ISO_8859_1), false)); | |||
HttpHeaders headers = null; | |||
private MultiValueMap<String, String> getHeaders() { | |||
if (headers == null) { | |||
headers = new HttpHeaders(); | |||
headers.add(HttpHeaders.AUTHORIZATION, "Basic " + enc); | |||
headers.add(HttpHeaders.CONTENT_TYPE, "application/json"); | |||
} | |||
return headers; | |||
} | |||
@Override | |||
public void messageArrived(String s, MqttMessage mqttMessage) { | |||
// System.out.println(new String(mqttMessage.getPayload())); | |||
@@ -94,17 +114,28 @@ public class ImClient implements ImClientInterface { | |||
} | |||
try { | |||
if (sendMessage.getToClientId().startsWith("G_")) { | |||
SendMessage message1 = new SendMessage(); | |||
message1.setMessageId(sendMessage.getMessageId()); | |||
message1.setMessage(sendMessage.getMessage()); | |||
message1.setMessageType(sendMessage.getMessageType()); | |||
message1.setFromClientId(sendMessage.getToClientId()); | |||
message1.setTarget(sendMessage.getFromClientId()); | |||
message1.setToClientId("xuqm"); | |||
message1.setTimestamp(sendMessage.getTimestamp()); | |||
message1.setTenantNo(sendMessage.getTenantNo()); | |||
client.publish(message1.getToClientId(), GsonImplHelp.get().toJson(message1).getBytes(StandardCharsets.UTF_8), 2, false); | |||
} else { | |||
WebClient webClient = WebClient.builder().baseUrl("http://127.0.0.1:15672").defaultHeaders(httpHeaders -> httpHeaders.addAll(getHeaders())).build(); | |||
Mono<ImUser[]> m = webClient.get().uri("/api/users/") // 请求路径 | |||
.retrieve().bodyToMono(ImUser[].class); | |||
for (ImUser imUser : m.block()) {SendMessage message1 = new SendMessage(); | |||
message1.setMessageId(sendMessage.getMessageId()); | |||
message1.setMessage(sendMessage.getMessage()); | |||
message1.setMessageType(sendMessage.getMessageType()); | |||
message1.setDescribe(sendMessage.getDescribe()); | |||
message1.setFromClientId(sendMessage.getToClientId()); | |||
message1.setTarget(sendMessage.getFromClientId()); | |||
message1.setToClientId(imUser.getName()); | |||
message1.setTimestamp(sendMessage.getTimestamp()); | |||
message1.setTenantNo(sendMessage.getTenantNo()); | |||
client.publish(message1.getToClientId(), GsonImplHelp.get().toJson(message1).getBytes(StandardCharsets.UTF_8), 2, false); | |||
} | |||
} else { | |||
client.publish(sendMessage.getToClientId(), GsonImplHelp.get().toJson(sendMessage).getBytes(StandardCharsets.UTF_8), 2, false); | |||
} | |||
} catch (MqttException e) { | |||
@@ -0,0 +1,13 @@ | |||
package cn.org.bjca.trust.pushdemo.message; | |||
public class ImUser { | |||
private String name; | |||
public String getName() { | |||
return name; | |||
} | |||
public void setName(String name) { | |||
this.name = name; | |||
} | |||
} |
@@ -9,6 +9,7 @@ public class SendMessage extends BaseMessage { | |||
private String target; | |||
private int messageType; | |||
private String message; | |||
private String describe; | |||
private String fromClientId; | |||
private String tenantNo; | |||
@@ -32,6 +33,14 @@ public class SendMessage extends BaseMessage { | |||
this.messageId = messageId; | |||
} | |||
public String getDescribe() { | |||
return describe; | |||
} | |||
public void setDescribe(String describe) { | |||
this.describe = describe; | |||
} | |||
public long getTimestamp() { | |||
return this.timestamp; | |||
} | |||