news 2026/3/26 15:55:26

ue 5.5 c++ mqtt 订阅/发布消息 字符串

作者头像

张小明

前端开发工程师

1.2k 24
文章封面图
ue 5.5 c++ mqtt 订阅/发布消息 字符串

插件 mqtt支持

测试可以发送,接收长度小于100的字符串消息,长消息,会崩溃。

PublicDependencyModuleNames.AddRange(new string[] { "Core", "CoreUObject", "Engine", "InputCore", "EnhancedInput", "WebSockets", "Json", "JsonUtilities", "MQTTCore", "AudioMixer"});

MyObject.h

// Fill out your copyright notice in the Description page of Project Settings. #pragma once #include "CoreMinimal.h" #include "UObject/NoExportTypes.h" #include "IMQTTClient.h" #include "MyObject.generated.h" /** * */ UCLASS(BlueprintType, Blueprintable) class METAHUMANCHARACTERHEIXI_API UMyObject : public UObject { GENERATED_BODY() public: TSharedPtr<IMQTTClient, ESPMode::ThreadSafe> MQTTClient; UFUNCTION(BlueprintCallable, Category = "Demo") void HelloWorld(); void SaveWav(const FString& FilePath, const TArray<uint8>& AudioBytes, int32 SampleRate, int32 NumChannels); TMap<int32, FString> AudioChunks; // key = 片索引 int32 TotalChunks = 0; int32 FileIndex = 0; };

MyObject.cpp

// Fill out your copyright notice in the Description page of Project Settings. #include "MyObject.h" #include "MQTTSubsystem.h" #include "IMQTTCoreModule.h" #include "MQTTClientSettings.h" #include "MQTTShared.h" #include "Engine/Engine.h" #include "Misc/FileHelper.h" #include "Misc/Paths.h" #include "Async/Async.h" void UMyObject::SaveWav(const FString& FilePath, const TArray<uint8>& AudioBytes, int32 SampleRate, int32 NumChannels) { if (AudioBytes.Num() == 0) return; const int32 BitsPerSample = 16; const int32 BlockAlign = NumChannels * BitsPerSample / 8; const int32 ByteRate = SampleRate * BlockAlign; const int32 DataSize = AudioBytes.Num(); const int32 ChunkSize = 36 + DataSize; TArray<uint8> Wav; Wav.Reserve(44 + DataSize); // 头44字节 + PCM数据 auto AppendInt32 = [&Wav](int32 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int32)); }; auto AppendInt16 = [&Wav](int16 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int16)); }; // --- WAV Header --- Wav.Append(reinterpret_cast<const uint8*>("RIFF"), 4); AppendInt32(ChunkSize); Wav.Append(reinterpret_cast<const uint8*>("WAVE"), 4); Wav.Append(reinterpret_cast<const uint8*>("fmt "), 4); AppendInt32(16); // fmt chunk size AppendInt16(1); // PCM format AppendInt16(NumChannels); AppendInt32(SampleRate); AppendInt32(ByteRate); AppendInt16(BlockAlign); AppendInt16(BitsPerSample); Wav.Append(reinterpret_cast<const uint8*>("data"), 4); AppendInt32(DataSize); // --- PCM Data --- Wav.Append(AudioBytes); FFileHelper::SaveArrayToFile(Wav, *FilePath); } inline void SaveWavToFile(const FString& FilePath, const TArray<uint8>& AudioBytes, int32 SampleRate, int32 NumChannels) { if (AudioBytes.Num() == 0) return; const int32 BitsPerSample = 16; const int32 BlockAlign = NumChannels * BitsPerSample / 8; const int32 ByteRate = SampleRate * BlockAlign; const int32 DataSize = AudioBytes.Num(); const int32 ChunkSize = 36 + DataSize; TArray<uint8> Wav; Wav.Reserve(44 + DataSize); // 头44字节 + PCM数据 auto AppendInt32 = [&Wav](int32 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int32)); }; auto AppendInt16 = [&Wav](int16 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int16)); }; // --- WAV Header --- Wav.Append(reinterpret_cast<const uint8*>("RIFF"), 4); AppendInt32(ChunkSize); Wav.Append(reinterpret_cast<const uint8*>("WAVE"), 4); Wav.Append(reinterpret_cast<const uint8*>("fmt "), 4); AppendInt32(16); // fmt chunk size AppendInt16(1); // PCM format AppendInt16(NumChannels); AppendInt32(SampleRate); AppendInt32(ByteRate); AppendInt16(BlockAlign); AppendInt16(BitsPerSample); Wav.Append(reinterpret_cast<const uint8*>("data"), 4); AppendInt32(DataSize); // --- PCM Data --- Wav.Append(AudioBytes); FFileHelper::SaveArrayToFile(Wav, *FilePath); } void UMyObject::HelloWorld() { UE_LOG(LogTemp, Warning, TEXT("Hello World from UMyObject!")); if (GEngine) { GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, TEXT("Hello World from UMyObject!")); } IMQTTCoreModule& MQTTModule = FModuleManager::LoadModuleChecked<IMQTTCoreModule>("MQTTCore"); FMQTTURL URL; URL.Host = TEXT("127.0.0.1"); URL.Port = 1883; MQTTClient = MQTTModule.GetOrCreateClient(URL); if (!MQTTClient.IsValid()) { UE_LOG(LogTemp, Error, TEXT("MQTTClient 无效!")); return; } //TMap<int32, FString> AudioChunks; // key = 片索引 //int32 TotalChunks = 0; MQTTClient->OnMessage().AddLambda([this](const FMQTTClientMessage& Msg) { FString Topic = Msg.Topic; if (Topic == TEXT("ue/audio")) { FString Payload = Msg.GetPayloadAsString(); // 格式解析: "index/total:chunk" FString IndexStr, TotalStr, ChunkData; if (Payload.Split(TEXT(":"), &IndexStr, &ChunkData) && IndexStr.Split(TEXT("/"), &IndexStr, &TotalStr)) { int32 Index = FCString::Atoi(*IndexStr); int32 Total = FCString::Atoi(*TotalStr); TotalChunks = Total; AudioChunks.Add(Index, ChunkData); // 检查是否收到所有分片 if (AudioChunks.Num() == TotalChunks) { // 拼接 FString FullB64; for (int32 i = 0; i < TotalChunks; ++i) { FullB64 += AudioChunks[i]; } // Base64 decode TArray<uint8> AudioBytes; if (FBase64::Decode(FullB64, AudioBytes)) { //UE_LOG(LogTemp, Log, TEXT("音频接收完成,字节数=%d"), AudioBytes.Num()); FString FilePath = FString::Printf(TEXT("D:/tmp/received_%03d.wav"), FileIndex++); SaveWavToFile(FilePath, AudioBytes, 16000, 1); } AudioChunks.Empty(); // 清理,准备下一条音频 } } } }); //MQTTClient->OnMessage().AddLambda([](const FMQTTClientMessage& Msg_a) // { // FMQTTClientMessage Msg = Msg_a; // const FString Topic = Msg_a.Topic; // //const FString PayloadStr = Msg_a.GetPayloadAsString(); // //TArray<uint8> PayloadBytes = Msg_a.Payload; // AsyncTask(ENamedThreads::GameThread, [Topic, Msg]() // { // if (Topic == TEXT("ue/command")) // { // // 只在“明确是文本 topic”时才解析字符串 // FString TextPayload = Msg.GetPayloadAsString(); // UE_LOG(LogTemp, Log, TEXT("CMD: %s"), *TextPayload); // } // else if (Topic == TEXT("ue/audio")) // { // // Base64 一定用 string // FString Payload = Msg.GetPayloadAsString(); // // 格式解析: "index/total:chunk" // FString IndexStr, TotalStr, ChunkData; // if (Payload.Split(TEXT(":"), &IndexStr, &ChunkData) && // IndexStr.Split(TEXT("/"), &IndexStr, &TotalStr)) // { // int32 Index = FCString::Atoi(*IndexStr); // int32 Total = FCString::Atoi(*TotalStr); // TotalChunks = Total; // AudioChunks.Add(Index, ChunkData); // // 检查是否收到所有分片 // if (AudioChunks.Num() == TotalChunks) // { // // 拼接 // FString FullB64; // for (int32 i = 0; i < TotalChunks; ++i) // { // FullB64 += AudioChunks[i]; // } // // Base64 decode // TArray<uint8> AudioBytes; // if (FBase64::Decode(FullB64, AudioBytes)) // { // UE_LOG(LogTemp, Log, TEXT("音频接收完成,字节数=%d"), AudioBytes.Num()); // FString FilePath = TEXT("D:/tmp/received.wav"); // SaveWavToFile(FilePath, AudioBytes, 44100, 1); // } // AudioChunks.Empty(); // 清理,准备下一条音频 // } // ////// 再 decode // //TArray<uint8> AudioBytes; // //if (FBase64::Decode(B64, AudioBytes)) // //{ // // UE_LOG(LogTemp, Error, TEXT("Audio bytes: %s"), *B64); // // //UE_LOG(LogTemp, Error, TEXT("Audio bytes")); // // // 这里可以调用 SaveWav 函数保存 // // FString FilePath = TEXT("D:/tmp/received.wav"); // // //SaveWavToFile(FilePath, AudioBytes, 44100, 1); // //} // //else // //{ // // UE_LOG(LogTemp, Warning, TEXT("Base64 decode failed!")); // //} // } // else // { // UE_LOG(LogTemp, Warning, TEXT("Unknown topic: %s"), *Topic); // } // } // }); // }); //MQTTClient->OnMessage().AddLambda([](const FMQTTClientMessage& Msg) // { // const FString TopicCopy = Msg.Topic; // const bool bIsCommand = (TopicCopy == TEXT("ue/command")); // FString TextPayload; // TArray<uint8> BinaryPayload; // if (bIsCommand) // { // TextPayload = Msg.GetPayloadAsString(); // 文本 OK // } // else // { // BinaryPayload.SetNumUninitialized(Msg.Payload.Num()); // FMemory::Memcpy( // BinaryPayload.GetData(), // Msg.Payload.GetData(), // Msg.Payload.Num() // ); // // } // AsyncTask(ENamedThreads::GameThread, // [TopicCopy, bIsCommand, TextPayload, BinaryPayload]() // { // if (bIsCommand) // { // UE_LOG(LogTemp, Log, TEXT("CMD: %s"), *TextPayload); // /*if (GEngine) // { // GEngine->AddOnScreenDebugMessage( // -1, 5.f, FColor::Yellow, TextPayload); // }*/ // } // else // { // UE_LOG(LogTemp, Log, // TEXT("Audio received, bytes=%d"), // BinaryPayload.Num()); // // 这里再存 wav / 播放 / 推 Audio2Face // } // }); // //AsyncTask(ENamedThreads::GameThread, [Msg](){ // // const FString& TopicCopy = Msg.Topic; // // //FString Topic = Msg.Topic; // // if (TopicCopy == "ue/command") { // // FString Payload = Msg.GetPayloadAsString(); // // UE_LOG(LogTemp, Error, TEXT("收到 MQTT 消息: Topic=%s, Payload=%s"), *TopicCopy, *Payload); // // if (GEngine) // // { // // GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Yellow, // // FString::Printf(TEXT("Topic=%s, Payload=%s"), *TopicCopy, *Payload)); // // } // // } // // else { // // // 拷贝音频数据,避免后台线程释放导致崩溃 // // //TArray<uint8> PayloadCopy = Msg.Payload; // // //FString FilePath = FPaths::ProjectSavedDir() / TEXT("received.wav"); // // //FString FilePath = FPaths::ProjectSavedDir() + TEXT("received.wav"); // // FString FilePath = TEXT("C:/Users/ChanJing-01/Desktop/tmp/0121/received.wav"); // // //if (PayloadCopy.Num() > 0) // // //{ // // // UE_LOG(LogTemp, Error, TEXT("[MQTT] start Saved audio ")); // // // const int32 SampleRate = 44100; // // // const int32 NumChannels = 1; // // // const int32 BitsPerSample = 16; // // // const int32 BlockAlign = NumChannels * BitsPerSample / 8; // // // const int32 ByteRate = SampleRate * BlockAlign; // // // const int32 DataSize = PayloadCopy.Num(); // // // const int32 ChunkSize = 36 + DataSize; // // // //TArray<uint8> Wav; // // // //Wav.Reserve(44 + DataSize); // // // //auto AppendInt32 = [&Wav](int32 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int32)); }; // // // //auto AppendInt16 = [&Wav](int16 V) { Wav.Append(reinterpret_cast<uint8*>(&V), sizeof(int16)); }; // // // //// WAV header // // // //Wav.Append(reinterpret_cast<const uint8*>("RIFF"), 4); // // // //AppendInt32(ChunkSize); // // // //Wav.Append(reinterpret_cast<const uint8*>("WAVE"), 4); // // // //Wav.Append(reinterpret_cast<const uint8*>("fmt "), 4); // // // //AppendInt32(16); // // // //AppendInt16(1); // // // //AppendInt16(NumChannels); // // // //AppendInt32(SampleRate); // // // //AppendInt32(ByteRate); // // // //AppendInt16(BlockAlign); // // // //AppendInt16(BitsPerSample); // // // //Wav.Append(reinterpret_cast<const uint8*>("data"), 4); // // // //AppendInt32(DataSize); // // // //// PCM 数据 // // // //Wav.Append(PayloadCopy); // // // //FFileHelper::SaveArrayToFile(Wav, *FilePath); // // // UE_LOG(LogTemp, Error, TEXT("[MQTT] Saved audio, size=%d bytes"), DataSize); // // // // 调用你的 SaveWav 函数保存 // // // SaveWavToFile(FilePath, PayloadCopy, 44100, 1); // // // if (GEngine) // // // { // // // GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, // // // FString::Printf(TEXT("音频已保存: %s"), *FilePath)); // // // } // // //} // // /* const TArray<uint8>& AudioBytes = Msg.Payload; // // UE_LOG(LogTemp, Error, TEXT("收到 MQTT 消息: Topic=%s, audio"), *Topic);*/ // // /*if (GEngine) // // { // // GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Yellow, // // FString::Printf(TEXT("Topic=%s,audio"), *Topic)); // // }*/ // // } // // }); // }); MQTTClient->OnConnect().AddLambda([this](EMQTTConnectReturnCode ReturnCode) { if (ReturnCode == EMQTTConnectReturnCode::Accepted) { UE_LOG(LogTemp, Error, TEXT("IMQTTCoreModule 127.0.0.1--已连接---")); UE_LOG(LogTemp, Log, TEXT("MQTT 已连接!")); TArray<TPair<FString, EMQTTQualityOfService>> TopicsToSubscribe; TopicsToSubscribe.Add(MakeTuple(FString("ue/command"), EMQTTQualityOfService::Once)); TopicsToSubscribe.Add(MakeTuple(FString("ue/audio"), EMQTTQualityOfService::Once)); this->MQTTClient->Subscribe(TopicsToSubscribe); } else { UE_LOG(LogTemp, Warning, TEXT("MQTT 连接失败,ReturnCode=%d"), static_cast<int32>(ReturnCode)); } }); MQTTClient->Connect(); if (GEngine) { GEngine->AddOnScreenDebugMessage(-1, 5.f, FColor::Green, TEXT("IMQTTCoreModule 127.0.0.1----------!")); } }
版权声明: 本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若内容造成侵权/违法违规/事实不符,请联系邮箱:809451989@qq.com进行投诉反馈,一经查实,立即删除!
网站建设 2026/3/19 16:33:34

FTDI支持jtag协议格式详细规范

FTDI JTAG 数据传输格式与 SCAN 命令完整规范 &#x1f4cb; 文档概述 驱动文件&#xff1a;src/jtag/drivers/openjtag.c适配变体&#xff1a;Standard FTDI (libftdi)核心目的&#xff1a;定义 ftdi_write_data()/ftdi_read_data() 数据格式&#xff0c;规范 SCAN 命令在 I…

作者头像 李华
网站建设 2026/3/25 11:38:23

Chatbox终极指南:从入门到精通AI桌面客户端完整教程

Chatbox终极指南&#xff1a;从入门到精通AI桌面客户端完整教程 【免费下载链接】chatbox Chatbox是一款开源的AI桌面客户端&#xff0c;它提供简单易用的界面&#xff0c;助用户高效与AI交互。可以有效提升工作效率&#xff0c;同时确保数据安全。源项目地址&#xff1a;https…

作者头像 李华
网站建设 2026/3/25 7:17:47

零基础搭建ASR系统|FunASR + speech_ngram_lm_zh-cn完整实践

零基础搭建ASR系统&#xff5c;FunASR speech_ngram_lm_zh-cn完整实践 语音识别&#xff08;ASR&#xff09;技术正在快速走进我们的日常工作与生活。无论是会议记录、视频字幕生成&#xff0c;还是智能客服、语音输入法&#xff0c;背后都离不开高效的语音转文字能力。但对很…

作者头像 李华
网站建设 2026/3/25 23:13:43

计算机毕业设计springboot宿舍报修管理系统 基于Spring Boot的高校公寓故障在线处理平台 Spring Boot驱动的学生宿舍维修服务系统

计算机毕业设计springboot宿舍报修管理系统z3tw1&#xff08;配套有源码 程序 mysql数据库 论文&#xff09; 本套源码可以在文本联xi,先看具体系统功能演示视频领取&#xff0c;可分享源码参考。 高校宿舍是学生在校生活的核心场景&#xff0c;设施故障若得不到快速响应&#…

作者头像 李华
网站建设 2026/3/23 14:14:26

运维必备:10行shell脚本解决ssl证书更换遗漏难题

SSL证书有效期正在逐年缩短&#xff0c;今年3月15日之前购买有效期是1年&#xff0c;3月15日之后购买的有效期只有200天&#xff0c;2027年缩短到100天&#xff0c;最终2029年有效期会缩短到47天。 作为运维人员&#xff0c;如果你也像我一样&#xff0c;手里有好几百个使用htt…

作者头像 李华
网站建设 2026/3/13 3:13:52

Glyph学术研究应用:论文综述生成系统部署步骤

Glyph学术研究应用&#xff1a;论文综述生成系统部署步骤 1. 引言&#xff1a;为什么需要Glyph&#xff1f; 在学术研究中&#xff0c;面对海量文献时&#xff0c;快速掌握某一领域的研究进展是一项挑战。传统的论文阅读方式效率低、耗时长&#xff0c;而现有的文本摘要工具又…

作者头像 李华