插件 mqtt支持
测试可以发送,接收长度小于100的字符串消息,长消息,会崩溃。
PublicDependencyModuleNames.AddRange(new string[] { "Core", "CoreUObject", "Engine", "InputCore", "EnhancedInput", "WebSockets", "Json", "JsonUtilities", "MQTTCore", "AudioMixer"});MyObject.h
// Fill out your copyright notice in the Description page of Project Settings. #pragma once #include "CoreMinimal.h" #include "UObject/NoExportTypes.h" #include "IMQTTClient.h" #include "MyObject.generated.h" /** * */ UCLASS(BlueprintType, Blueprintable) class METAHUMANCHARACTERHEIXI_API UMyObject : public UObject { GENERATED_BODY() public: TSharedPtr<IMQTTClient, ESPMode::ThreadSafe> MQTTClient; UFUNCTION(BlueprintCallable, Category = "Demo") void HelloWorld(); void SaveWav(const FString& FilePath, const TArray<uint8>& AudioBytes, int32 SampleRate, int32 NumChannels); TMap<int32, FString> AudioChunks; // key = 片索引 int32 TotalChunks = 0; int32 FileIndex = 0; };MyObject.cpp
// Fill out your copyright notice in the Description page of Project Settings. #include "MyObject.h" #include "MQTTSubsystem.h" #include "IMQTTCoreModule.h" #include "MQTTClientSettings.h" #include "MQTTShared.h" #include "Engine/Engine.h" #include "Misc/FileHelper.h" #include "Misc/Paths.h" #include "Async/Async.h" void UMyObject::SaveWav(const FString& FilePath, const TArray<uint8>& AudioBytes, int32 SampleRate, int32 NumChannels) { if (AudioBytes.Num() == 0) return; const int32 BitsPerSample = 16; const int32 BlockAlign = NumChannels * BitsPerSample / 8; const int32 ByteRate = SampleRate * BlockAlign; const int32 DataSize = AudioBytes.Num(); const int32 ChunkSize = 36 + DataSize; TArray<uint8> Wav; Wav.Reserve(44 + DataSize); // 头44字节 + PCM数据 auto AppendInt32 = [&Wav](int32 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int32)); }; auto AppendInt16 = [&Wav](int16 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int16)); }; // --- WAV Header --- Wav.Append(reinterpret_cast<const uint8*>("RIFF"), 4); AppendInt32(ChunkSize); Wav.Append(reinterpret_cast<const uint8*>("WAVE"), 4); Wav.Append(reinterpret_cast<const uint8*>("fmt "), 4); AppendInt32(16); // fmt chunk size AppendInt16(1); // PCM format AppendInt16(NumChannels); AppendInt32(SampleRate); AppendInt32(ByteRate); AppendInt16(BlockAlign); AppendInt16(BitsPerSample); Wav.Append(reinterpret_cast<const uint8*>("data"), 4); AppendInt32(DataSize); // --- PCM Data --- Wav.Append(AudioBytes); FFileHelper::SaveArrayToFile(Wav, *FilePath); } inline void SaveWavToFile(const FString& FilePath, const TArray<uint8>& AudioBytes, int32 SampleRate, int32 NumChannels) { if (AudioBytes.Num() == 0) return; const int32 BitsPerSample = 16; const int32 BlockAlign = NumChannels * BitsPerSample / 8; const int32 ByteRate = SampleRate * BlockAlign; const int32 DataSize = AudioBytes.Num(); const int32 ChunkSize = 36 + DataSize; TArray<uint8> Wav; Wav.Reserve(44 + DataSize); // 头44字节 + PCM数据 auto AppendInt32 = [&Wav](int32 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int32)); }; auto AppendInt16 = [&Wav](int16 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int16)); }; // --- WAV Header --- Wav.Append(reinterpret_cast<const uint8*>("RIFF"), 4); AppendInt32(ChunkSize); Wav.Append(reinterpret_cast<const uint8*>("WAVE"), 4); Wav.Append(reinterpret_cast<const uint8*>("fmt "), 4); AppendInt32(16); // fmt chunk size AppendInt16(1); // PCM format AppendInt16(NumChannels); AppendInt32(SampleRate); AppendInt32(ByteRate); AppendInt16(BlockAlign); AppendInt16(BitsPerSample); Wav.Append(reinterpret_cast<const uint8*>("data"), 4); AppendInt32(DataSize); // --- PCM Data --- Wav.Append(AudioBytes); FFileHelper::SaveArrayToFile(Wav, *FilePath); } void UMyObject::HelloWorld() { UE_LOG(LogTemp, Warning, TEXT("Hello World from UMyObject!")); if (GEngine) { GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, TEXT("Hello World from UMyObject!")); } IMQTTCoreModule& MQTTModule = FModuleManager::LoadModuleChecked<IMQTTCoreModule>("MQTTCore"); FMQTTURL URL; URL.Host = TEXT("127.0.0.1"); URL.Port = 1883; MQTTClient = MQTTModule.GetOrCreateClient(URL); if (!MQTTClient.IsValid()) { UE_LOG(LogTemp, Error, TEXT("MQTTClient 无效!")); return; } //TMap<int32, FString> AudioChunks; // key = 片索引 //int32 TotalChunks = 0; MQTTClient->OnMessage().AddLambda([this](const FMQTTClientMessage& Msg) { FString Topic = Msg.Topic; if (Topic == TEXT("ue/audio")) { FString Payload = Msg.GetPayloadAsString(); // 格式解析: "index/total:chunk" FString IndexStr, TotalStr, ChunkData; if (Payload.Split(TEXT(":"), &IndexStr, &ChunkData) && IndexStr.Split(TEXT("/"), &IndexStr, &TotalStr)) { int32 Index = FCString::Atoi(*IndexStr); int32 Total = FCString::Atoi(*TotalStr); TotalChunks = Total; AudioChunks.Add(Index, ChunkData); // 检查是否收到所有分片 if (AudioChunks.Num() == TotalChunks) { // 拼接 FString FullB64; for (int32 i = 0; i < TotalChunks; ++i) { FullB64 += AudioChunks[i]; } // Base64 decode TArray<uint8> AudioBytes; if (FBase64::Decode(FullB64, AudioBytes)) { //UE_LOG(LogTemp, Log, TEXT("音频接收完成,字节数=%d"), AudioBytes.Num()); FString FilePath = FString::Printf(TEXT("D:/tmp/received_%03d.wav"), FileIndex++); SaveWavToFile(FilePath, AudioBytes, 16000, 1); } AudioChunks.Empty(); // 清理,准备下一条音频 } } } }); //MQTTClient->OnMessage().AddLambda([](const FMQTTClientMessage& Msg_a) // { // FMQTTClientMessage Msg = Msg_a; // const FString Topic = Msg_a.Topic; // //const FString PayloadStr = Msg_a.GetPayloadAsString(); // //TArray<uint8> PayloadBytes = Msg_a.Payload; // AsyncTask(ENamedThreads::GameThread, [Topic, Msg]() // { // if (Topic == TEXT("ue/command")) // { // // 只在“明确是文本 topic”时才解析字符串 // FString TextPayload = Msg.GetPayloadAsString(); // UE_LOG(LogTemp, Log, TEXT("CMD: %s"), *TextPayload); // } // else if (Topic == TEXT("ue/audio")) // { // // Base64 一定用 string // FString Payload = Msg.GetPayloadAsString(); // // 格式解析: "index/total:chunk" // FString IndexStr, TotalStr, ChunkData; // if (Payload.Split(TEXT(":"), &IndexStr, &ChunkData) && // IndexStr.Split(TEXT("/"), &IndexStr, &TotalStr)) // { // int32 Index = FCString::Atoi(*IndexStr); // int32 Total = FCString::Atoi(*TotalStr); // TotalChunks = Total; // AudioChunks.Add(Index, ChunkData); // // 检查是否收到所有分片 // if (AudioChunks.Num() == TotalChunks) // { // // 拼接 // FString FullB64; // for (int32 i = 0; i < TotalChunks; ++i) // { // FullB64 += AudioChunks[i]; // } // // Base64 decode // TArray<uint8> AudioBytes; // if (FBase64::Decode(FullB64, AudioBytes)) // { // UE_LOG(LogTemp, Log, TEXT("音频接收完成,字节数=%d"), AudioBytes.Num()); // FString FilePath = TEXT("D:/tmp/received.wav"); // SaveWavToFile(FilePath, AudioBytes, 44100, 1); // } // AudioChunks.Empty(); // 清理,准备下一条音频 // } // ////// 再 decode // //TArray<uint8> AudioBytes; // //if (FBase64::Decode(B64, AudioBytes)) // //{ // // UE_LOG(LogTemp, Error, TEXT("Audio bytes: %s"), *B64); // // //UE_LOG(LogTemp, Error, TEXT("Audio bytes")); // // // 这里可以调用 SaveWav 函数保存 // // FString FilePath = TEXT("D:/tmp/received.wav"); // // //SaveWavToFile(FilePath, AudioBytes, 44100, 1); // //} // //else // //{ // // UE_LOG(LogTemp, Warning, TEXT("Base64 decode failed!")); // //} // } // else // { // UE_LOG(LogTemp, Warning, TEXT("Unknown topic: %s"), *Topic); // } // } // }); // }); //MQTTClient->OnMessage().AddLambda([](const FMQTTClientMessage& Msg) // { // const FString TopicCopy = Msg.Topic; // const bool bIsCommand = (TopicCopy == TEXT("ue/command")); // FString TextPayload; // TArray<uint8> BinaryPayload; // if (bIsCommand) // { // TextPayload = Msg.GetPayloadAsString(); // 文本 OK // } // else // { // BinaryPayload.SetNumUninitialized(Msg.Payload.Num()); // FMemory::Memcpy( // BinaryPayload.GetData(), // Msg.Payload.GetData(), // Msg.Payload.Num() // ); // // } // AsyncTask(ENamedThreads::GameThread, // [TopicCopy, bIsCommand, TextPayload, BinaryPayload]() // { // if (bIsCommand) // { // UE_LOG(LogTemp, Log, TEXT("CMD: %s"), *TextPayload); // /*if (GEngine) // { // GEngine->AddOnScreenDebugMessage( // -1, 5.f, FColor::Yellow, TextPayload); // }*/ // } // else // { // UE_LOG(LogTemp, Log, // TEXT("Audio received, bytes=%d"), // BinaryPayload.Num()); // // 这里再存 wav / 播放 / 推 Audio2Face // } // }); // //AsyncTask(ENamedThreads::GameThread, [Msg](){ // // const FString& TopicCopy = Msg.Topic; // // //FString Topic = Msg.Topic; // // if (TopicCopy == "ue/command") { // // FString Payload = Msg.GetPayloadAsString(); // // UE_LOG(LogTemp, Error, TEXT("收到 MQTT 消息: Topic=%s, Payload=%s"), *TopicCopy, *Payload); // // if (GEngine) // // { // // GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Yellow, // // FString::Printf(TEXT("Topic=%s, Payload=%s"), *TopicCopy, *Payload)); // // } // // } // // else { // // // 拷贝音频数据,避免后台线程释放导致崩溃 // // //TArray<uint8> PayloadCopy = Msg.Payload; // // //FString FilePath = FPaths::ProjectSavedDir() / TEXT("received.wav"); // // //FString FilePath = FPaths::ProjectSavedDir() + TEXT("received.wav"); // // FString FilePath = TEXT("C:/Users/ChanJing-01/Desktop/tmp/0121/received.wav"); // // //if (PayloadCopy.Num() > 0) // // //{ // // // UE_LOG(LogTemp, Error, TEXT("[MQTT] start Saved audio ")); // // // const int32 SampleRate = 44100; // // // const int32 NumChannels = 1; // // // const int32 BitsPerSample = 16; // // // const int32 BlockAlign = NumChannels * BitsPerSample / 8; // // // const int32 ByteRate = SampleRate * BlockAlign; // // // const int32 DataSize = PayloadCopy.Num(); // // // const int32 ChunkSize = 36 + DataSize; // // // //TArray<uint8> Wav; // // // //Wav.Reserve(44 + DataSize); // // // //auto AppendInt32 = [&Wav](int32 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int32)); }; // // // //auto AppendInt16 = [&Wav](int16 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int16)); }; // // // //// WAV header // // // //Wav.Append(reinterpret_cast<const uint8*>("RIFF"), 4); // // // //AppendInt32(ChunkSize); // // // //Wav.Append(reinterpret_cast<const uint8*>("WAVE"), 4); // // // //Wav.Append(reinterpret_cast<const uint8*>("fmt "), 4); // // // //AppendInt32(16); // // // //AppendInt16(1); // // // //AppendInt16(NumChannels); // // // //AppendInt32(SampleRate); // // // //AppendInt32(ByteRate); // // // //AppendInt16(BlockAlign); // // // //AppendInt16(BitsPerSample); // // // //Wav.Append(reinterpret_cast<const uint8*>("data"), 4); // // // //AppendInt32(DataSize); // // // //// PCM 数据 // // // //Wav.Append(PayloadCopy); // // // //FFileHelper::SaveArrayToFile(Wav, *FilePath); // // // UE_LOG(LogTemp, Error, TEXT("[MQTT] Saved audio, size=%d bytes"), DataSize); // // // // 调用你的 SaveWav 函数保存 // // // SaveWavToFile(FilePath, PayloadCopy, 44100, 1); // // // if (GEngine) // // // { // // // GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, // // // FString::Printf(TEXT("音频已保存: %s"), *FilePath)); // // // } // // //} // // /* const TArray<uint8>& AudioBytes = Msg.Payload; // // UE_LOG(LogTemp, Error, TEXT("收到 MQTT 消息: Topic=%s, audio"), *Topic);*/ // // /*if (GEngine) // // { // // GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Yellow, // // FString::Printf(TEXT("Topic=%s,audio"), *Topic)); // // }*/ // // } // // }); // }); MQTTClient->OnConnect().AddLambda([this](EMQTTConnectReturnCode ReturnCode) { if (ReturnCode == EMQTTConnectReturnCode::Accepted) { UE_LOG(LogTemp, Error, TEXT("IMQTTCoreModule 127.0.0.1--已连接---")); UE_LOG(LogTemp, Log, TEXT("MQTT 已连接!")); TArray<TPair<FString, EMQTTQualityOfService>> TopicsToSubscribe; TopicsToSubscribe.Add(MakeTuple(FString("ue/command"), EMQTTQualityOfService::Once)); TopicsToSubscribe.Add(MakeTuple(FString("ue/audio"), EMQTTQualityOfService::Once)); this->MQTTClient->Subscribe(TopicsToSubscribe); } else { UE_LOG(LogTemp, Warning, TEXT("MQTT 连接失败,ReturnCode=%d"), static_cast<int32>(ReturnCode)); } }); MQTTClient->Connect(); if (GEngine) { GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, TEXT("IMQTTCoreModule 127.0.0.1----------!")); } }