2026/4/17 14:40:29
网站建设
项目流程
西安免费网站搭建制作,做风能的网站,巴州建设局网站,数码印花图案设计网站FastDDS 源码解析#xff08;十五#xff09;接收PDP消息#xff08;下#xff09; 文章目录FastDDS 源码解析#xff08;十五#xff09;接收PDP消息#xff08;下#xff09;1.StatelessReader对于消息的处理1.1类图1.2时序图2.0一个功能彩蛋0xEE 个人信息转载好友文…FastDDS 源码解析十五接收PDP消息下文章目录FastDDS 源码解析十五接收PDP消息下1.StatelessReader对于消息的处理1.1类图1.2时序图2.0一个功能彩蛋0xEE 个人信息转载好友文章作者zhuhit链接https://juejin.cn/column/7388347353408094245上一篇我们介绍了一条pdp消息的大概内容和接收到pdp消息之后到分发给statelessreader处理的大概过程这一篇我们介绍一下statelessreader如何接收这条消息1.StatelessReader对于消息的处理1.1类图1.MessageReceiver 有一个RTPSReader的队列StatelessReader是RTPSReader的子类MessageReceiver收到消息后交给RTPSReader处理2.StatelessReader中有2个对象ReaderHistory 和 ReaderListenerStatelessReader收到消息后存入ReaderHistory然后通过ReaderListener通知其他类处理3.PDPListener是ReaderListener的子类1.2时序图1.StatelessReader::processDataMsg 主要干了这几件事情check一下消息能不能被接收能被接收的话看一下消息之前有没有被处理过根据sequenceNumber更新sequenceNumberCacheChange_t 拷贝一份交给change_received()处理2.change_received 的主要干了2件事情a.将CacheChange_t 交给 ReaderHistory的函数change_received()处理 见3b.调用PDPListener::onNewCacheChangeAdded函数 见43.ReaderHistory的函数change_received() 调用add_change将CacheChange_t 加入到ReaderHistory 中去4.PDPListener::onNewCacheChangeAdded 主要干了这几件事情a.参数校验将CacheChange_t 的数据读入ParticipantProxyDatab.ExternalLocatorsProcessor 的filter_remote_locators 函数c.PDPSimple 的assignRemoteEndpoints 参数是ParticipantProxyData5.ExternalLocatorsProcessor 的filter_remote_locators 函数 过滤 remote_locators, 保留一个新的remote_locator6.PDPSimple 的assignRemoteEndpointsrust 体验AI代码助手 代码解读复制代码boolStatelessReader::processDataMsg(CacheChange_t*change){assert(change);std::unique_lockRecursiveTimedMutexlock(mp_mutex);//check 一下消息是不是能够被接收if(acceptMsgFrom(change-writerGUID,change-kind)){// Always assert liveliness on scope exit// wlp相关内容auto assert_liveliness_lambda[lock,this,change](void*){lock.unlock();// Avoid deadlock with LivelinessManager.assert_writer_liveliness(change-writerGUID);};std::unique_ptrvoid,decltype(assert_liveliness_lambda)p{this,assert_liveliness_lambda};------// Check rejection by history// check一下这个message之前是否被接收过if(!thereIsUpperRecordOf(change-writerGUID,change-sequenceNumber)){boolwill_never_be_acceptedfalse;//check一下mp_history 能否装下changeif(!mp_history-can_change_be_added_nts(change-writerGUID,change-serializedPayload.length,0,will_never_be_accepted)){if(will_never_be_accepted){//更新最后收到的消息number这是为了后续统计消息的接收和缺失情况update_last_notified(change-writerGUID,change-sequenceNumber);}returnfalse;}//接收到不是自己reader的消息更新最后收到的消息number返回true//接收到特定reader的消息就往下走if(data_filter_!data_filter_-is_relevant(*change,m_guid)){update_last_notified(change-writerGUID,change-sequenceNumber);returntrue;}// Ask the pool for a cache changeCacheChange_t*change_to_addnullptr;// 分配一个// change_pool 和payloadpool不是一个poolif(!change_pool_-reserve_cache(change_to_add)){EPROSIMA_LOG_WARNING(RTPS_MSG_IN,IDSTRINGReached the maximum number of samples allowed by this readers QoS. Rejecting change for reader: m_guid);returnfalse;}// Copy metadata to reserved changechange_to_add-copy_not_memcpy(change);// Ask payload pool to copy the payloadIPayloadPool*payload_ownerchange-payload_owner();// 看一下是否是可以通过是能通过跨进程访问到boolis_datasharingstd::any_of(matched_writers_.begin(),matched_writers_.end(),[change](constRemoteWriterInfo_twriter){return(writer.guidchange-writerGUID)(writer.is_datasharing);});if(is_datasharing){//We may receive the change from the listener (with owner a ReaderPool) or intraprocess (with owner a WriterPool)ReaderPool*datasharing_pooldynamic_castReaderPool*(payload_owner);if(!datasharing_pool){datasharing_pooldatasharing_listener_-get_pool_for_writer(change-writerGUID).get();}if(!datasharing_pool){EPROSIMA_LOG_WARNING(RTPS_MSG_IN,IDSTRINGProblem copying DataSharing CacheChange from writer change-writerGUID);change_pool_-release_cache(change_to_add);returnfalse;}datasharing_pool-get_payload(change-serializedPayload,payload_owner,*change_to_add);}// 给change_to_add 分配payloadelseif(payload_pool_-get_payload(change-serializedPayload,payload_owner,*change_to_add)){change-payload_owner(payload_owner);}else{EPROSIMA_LOG_WARNING(RTPS_MSG_IN,IDSTRINGProblem copying CacheChange, received data is: change-serializedPayload.length bytes and max size in reader m_guid is (fixed_payload_size_0?fixed_payload_size_:std::numeric_limitsuint32_t::max()));change_pool_-release_cache(change_to_add);returnfalse;}// Perform reception of cache changeif(!change_received(change_to_add)){EPROSIMA_LOG_INFO(RTPS_MSG_IN,IDSTRINGMessageReceiver not add change change_to_add-sequenceNumber);change_to_add-payload_owner()-release_payload(*change_to_add);change_pool_-release_cache(change_to_add);returnfalse;}}}returntrue;}//这个函数主要是对data消息的处理1.check一下这个消息是不是能够被接收2.check一下这个消息是不是被接收过如果被接收过就不处理3.如果没有被接收过将change加入到history中去如果能够跨进程共享则change的存储空间从共享内存中分配如果不能进程间共享则从其他地方分配4.调用change_received 见步骤2步骤2:scss 体验AI代码助手 代码解读复制代码 bool StatelessReader::change_received( CacheChange_t* change) { // Only make the change visible if there is not another with a bigger sequence number. // TODO Revisar si no hay que incluirlo. // 是不是已经收到 if (!thereIsUpperRecordOf(change-writerGUID, change-sequenceNumber)) { // Update Ownership strength. // 所有权强度 if (EXCLUSIVE_OWNERSHIP_QOS m_att.ownershipKind) { auto writer std::find_if(matched_writers_.begin(), matched_writers_.end(), [change](const RemoteWriterInfo_t item) { return item.guid change-writerGUID; }); assert(matched_writers_.end() ! writer); change-reader_info.writer_ownership_strength writer-ownership_strength; } else { change-reader_info.writer_ownership_strength std::numeric_limitsuint32_t::max(); } if (mp_history-received_change(change, 0)) { auto payload_length change-serializedPayload.length; auto guid change-writerGUID; auto seq change-sequenceNumber; Time_t::now(change-reader_info.receptionTimestamp); SequenceNumber_t previous_seq update_last_notified(change-writerGUID, change-sequenceNumber); total_unread_; //空函数 on_data_notify(guid, change-sourceTimestamp); //PDPListener auto listener getListener(); if (listener ! nullptr) { if (SequenceNumber_t{0, 0} ! previous_seq) { assert(previous_seq seq); uint64_t tmp (seq - previous_seq).to64long() - 1; int32_t lost_samples tmp static_castuint64_t(std::numeric_limitsint32_t::max()) ? std::numeric_limitsint32_t::max() : static_castint32_t(tmp); if (0 lost_samples) // There are lost samples. { //消息丢失做相关处理 //这儿是个空函数 listener-on_sample_lost(this, lost_samples); } } // WARNING! These methods could destroy the change bool notify_single false; // 当change available的时候调用 // 这里将 notify_single变为true listener-on_data_available(this, guid, seq, seq, notify_single); if (notify_single) { //NewCacheChange add listener-onNewCacheChangeAdded(this, change); } } //这个是多线程通知有线程在等待新消息通知这个线程有新消息到来读取新消息 new_notification_cv_.notify_all(); // statistics callback on_subscribe_throughput(payload_length); return true; } } return false; }statelessReader 的这个函数中很多逻辑其实没什么用41-53行statelessReader没有状态也不保存接收的数据所以不存在所谓的数据丢失问题56-59行statelessReader没有必要调用on_data_available这是个空函数67行 statelessreader 也没有需要等待的线程整体来看StatelessReader::change_received 有比较多的冗余代码主要是干了2件事1.ReaderHistory::received_change 步骤32.调用了listener-onNewCacheChangeAdded这个函数调用到了PDPListener的onNewCacheChangeAdded也就是步骤4我们看到这块其实有大量无用代码可能为以后的代码预埋逻辑上说其实没有必要。步骤3:rust 体验AI代码助手 代码解读复制代码boolReaderHistory::received_change(CacheChange_t*change,size_t){returnadd_change(change);}boolReaderHistory::add_change(CacheChange_t*a_change){------eprosima::utilities::collections::sorted_vector_insert(m_changes,a_change,fastdds::rtps::history_order_cmp);------returntrue;}将数据按照时间顺序存入ReaderHistory步骤4:scss 体验AI代码助手 代码解读复制代码 void PDPListener::onNewCacheChangeAdded( RTPSReader* reader, const CacheChange_t* const change_in) { CacheChange_t* change const_castCacheChange_t*(change_in); //远端的id GUID_t writer_guid change-writerGUID; EPROSIMA_LOG_INFO(RTPS_PDP, SPDP Message received from: change_in-writerGUID); // Make sure we have an instance handle (i.e GUID) if (change-instanceHandle c_InstanceHandle_Unknown) { //如果没有guid从mp_PDPReaderHistory 中移除 if (!this-get_key(change)) { EPROSIMA_LOG_WARNING(RTPS_PDP, Problem getting the key of the change, removing); parent_pdp_-builtin_endpoints_-remove_from_pdp_reader_history(change); return; } } // Take GUID from instance handle GUID_t guid; iHandle2GUID(guid, change-instanceHandle); //如果alive 处理change的信息 if (change-kind ALIVE) { // Ignore announcement from own RTPSParticipant // 如果 change是自己的participant发出的就不做处理将change 移除 if (guid parent_pdp_-getRTPSParticipant()-getGuid()) { EPROSIMA_LOG_INFO(RTPS_PDP, Message from own RTPSParticipant, removing); parent_pdp_-builtin_endpoints_-remove_from_pdp_reader_history(change); return; } // Release reader lock to avoid ABBA lock. PDP mutex should always be first. // Keep change information on local variables to check consistency later SequenceNumber_t seq_num change-sequenceNumber; reader-getMutex().unlock(); std::unique_lockstd::recursive_mutex lock(*parent_pdp_-getMutex()); reader-getMutex().lock(); // If change is not consistent, it will be processed on the thread that has overriten it if ((ALIVE ! change-kind) || (seq_num ! change-sequenceNumber) || (writer_guid ! change-writerGUID)) { return; } // Access to temp_participant_data_ is protected by reader lock // Load information on temp_participant_data_ CDRMessage_t msg(change-serializedPayload); temp_participant_data_.clear(); //解析msg消息放入temp_participant_data_ if (temp_participant_data_.readFromCDRMessage(msg, true, parent_pdp_-getRTPSParticipant()-network_factory(), parent_pdp_-getRTPSParticipant()-has_shm_transport())) { // After correctly reading it change-instanceHandle temp_participant_data_.m_key; guid temp_participant_data_.m_guid; if (parent_pdp_-getRTPSParticipant()-is_participant_ignored(guid.guidPrefix)) { return; } // Filter locators const auto pattr parent_pdp_-getRTPSParticipant()-getAttributes(); // 过滤 // 一个新加的功能将temp_participant_data_中的ip地址进行过滤 fastdds::rtps::ExternalLocatorsProcessor::filter_remote_locators(temp_participant_data_, pattr.builtin.metatraffic_external_unicast_locators, pattr.default_external_unicast_locators, pattr.ignore_non_matching_locators); // Check if participant already exists (updated info) ParticipantProxyData* pdata nullptr; for (ParticipantProxyData* it : parent_pdp_-participant_proxies_) { if (guid it-m_guid) { pdata it; break; } } // pdata nullptr 就是新发现的Participant否则CHANGED_QOS_PARTICIPANT 更新信息 auto status (pdata nullptr) ? ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT : ParticipantDiscoveryInfo::CHANGED_QOS_PARTICIPANT; if (pdata nullptr) { // Create a new one when not found pdata parent_pdp_-createParticipantProxyData(temp_participant_data_, writer_guid); reader-getMutex().unlock(); lock.unlock(); if (pdata ! nullptr) { ------ RTPSParticipantListener* listener parent_pdp_-getRTPSParticipant()-getListener(); if (listener ! nullptr) { bool should_be_ignored false; { std::lock_guardstd::mutex cb_lock(parent_pdp_-callback_mtx_); ParticipantDiscoveryInfo info(*pdata); info.status status; // listener-onParticipantDiscovery( parent_pdp_-getRTPSParticipant()-getUserRTPSParticipant(), std::move(info), should_be_ignored); } if (should_be_ignored) { parent_pdp_-getRTPSParticipant()-ignore_participant(guid.guidPrefix); } } // Assigning remote endpoints implies sending a DATA(p) to all matched and fixed readers, since // StatelessWriter::matched_reader_add marks the entire history as unsent if the added readers // durability is bigger or equal to TRANSIENT_LOCAL_DURABILITY_QOS (TRANSIENT_LOCAL or TRANSIENT), // which is the case of ENTITYID_BUILTIN_SDP_PARTICIPANT_READER (TRANSIENT_LOCAL). If a remote // participant is discovered before creating the first DATA(p) change (which happens at the end of // BuiltinProtocols::initBuiltinProtocols), then StatelessWriter::matched_reader_add ends up marking // no changes as unsent (since the history is empty), which is OK because this can only happen if a // participant is discovered in the middle of BuiltinProtocols::initBuiltinProtocols, which will // create the first DATA(p) upon finishing, thus triggering the sent to all fixed and matched // readers anyways. parent_pdp_-assignRemoteEndpoints(pdata); } } else { pdata-updateData(temp_participant_data_); pdata-isAlive true; reader-getMutex().unlock(); ······ if (parent_pdp_-updateInfoMatchesEDP()) { parent_pdp_-mp_EDP-assignRemoteEndpoints(*pdata); } lock.unlock(); RTPSParticipantListener* listener parent_pdp_-getRTPSParticipant()-getListener(); if (listener ! nullptr) { bool should_be_ignored false; { std::lock_guardstd::mutex cb_lock(parent_pdp_-callback_mtx_); ParticipantDiscoveryInfo info(*pdata); info.status status; listener-onParticipantDiscovery( parent_pdp_-getRTPSParticipant()-getUserRTPSParticipant(), std::move(info), should_be_ignored); } if (should_be_ignored) { parent_pdp_-getRTPSParticipant()-ignore_participant(temp_participant_data_.m_guid.guidPrefix); } } } // Take again the reader lock reader-getMutex().lock(); } } else { reader-getMutex().unlock(); if (parent_pdp_-remove_remote_participant(guid, ParticipantDiscoveryInfo::REMOVED_PARTICIPANT)) { reader-getMutex().lock(); // All changes related with this participant have been removed from history by remove_remote_participant return; } reader-getMutex().lock(); } //Remove change form history. parent_pdp_-builtin_endpoints_-remove_from_pdp_reader_history(change); }change 的类型有4种ALIVE,NOT_ALIVE_DISPOSED, //就是writer告知reader我要下线了你把这个消息处理一下NOT_ALIVE_UNREGISTERED, //就是writer告知reader我要把这个instance 注销掉NOT_ALIVE_DISPOSED_UNREGISTERED//就是writer告知reader我要下线你把消息处理一下把这个instance 注销掉如果不是ALIVE就意味着对端的Participant已经下线调用parent_pdp_-remove_remote_participantPDP消息本身包含了这个Participant的主要的信息从消息中读取信息存入ParticipantProxyData 对象读取了信息之后我们看一下这个participant 是否已经存在是否需要被忽略已经存在那么更新这个Participant的信息如果不存在新建一个ParticipantProxyData上面函数主要做了这几件事1.判断 change的类型是否是ALIVE不是ALIVE表示对端的Participant已经下线调用parent_pdp_-remove_remote_participant是ALIVE则到步骤22.从change中读取信息数据存入ParticipantProxyData3.调用fastdds::rtps::ExternalLocatorsProcessor::filter_remote_locators 这个在第三部分介绍4.判断这个participant 是否已经存在如果存在更新这个Participant的ParticipantProxyData不存在则新建ParticipantProxyData5.调用PDPSimple 的assignRemoteEndpoints 这个会在下一篇详细介绍主要功能就是Participant匹配后根据Participant的信息匹配PDP中的 StatelessWriter和StatelessReaderEDP中的众多Writer和ReaderWLP中的众多Writer和Reader。2.0一个功能彩蛋fastdds::rtps::ExternalLocatorsProcessor::filter_remote_locators这是fastdds 最近新增的一个功能需要用户在程序中进行配置metatraffic_external_unicast_locatorsdefault_external_locators对于这些配置的ip进行了分级。举个例子我们的一个房间里租了一个局域网然后整个楼层又组了一个局域网。那么房间中各个设备的ip是一个层级的楼层的ip是另一个层级的。metatraffic_external_unicast_locatorsdefault_external_locators 中配置了ip子网掩码以及这个ip所在的层级。然后我们根据这些配置对ParticipantProxyData中的ip地址进行过滤只留下能够通信的ip地址。还是以我们手机举例有多个地址可以参考车载消息中间件FastDDS 源码解析三RtpsParticipant的创建中中3.2节有wifi p2p地址局域网地址公共网地址那么可以在这儿做设置这样就可以过滤掉不能通信的地址。过滤远端的ParticipantProxyData中的ip地址scss 体验AI代码助手 代码解读复制代码 void filter_remote_locators( ParticipantProxyData data, const ExternalLocators metatraffic_external_locators, const ExternalLocators default_external_locators, bool ignore_non_matching) { // 这儿是Participant自己的metatraffic_locators.unicast filter_remote_locators(data.metatraffic_locators.unicast, metatraffic_external_locators, ignore_non_matching); // 这儿是Participant自己的default_locators.unicast filter_remote_locators(data.default_locators.unicast, default_external_locators, ignore_non_matching); }这里是具体的算法c 体验AI代码助手 代码解读复制代码staticvoidfilter_remote_locators(fastrtps::ResourceLimitedVectorLocatorlocators,constExternalLocatorsexternal_locators,bool ignore_non_matching){autocompare_locators[external_locators,ignore_non_matching](constLocatorlhs,constLocatorrhs)-bool{returnheuristic(lhs,external_locators,ignore_non_matching)heuristic(rhs,external_locators,ignore_non_matching);};/* This will sort the received locators according to the following criteria: * 1. Non-matching locators when not ignored. Heuristic value: 0 * 2. Matching locators. Heuristic value: ((255ull - externality) 16) | (cost 8) * 3. Non-matching locators when ignored. Heuristic value: max_uint64_t * * The heuristic has been chosen so non-matching locators will never give a value that will be given to a matching * locator. Matching locators will be sorted first by highest externality, then by lowest cost. */// 从小到大排列std::sort(locators.begin(),locators.end(),compare_locators);/* Remove non-matching locators if requested to. * This is done by removing all locators at the end with an heuristic value of max_uint64_t. */if(ignore_non_matching){while(!locators.empty()){uint64_thheuristic(locators.back(),external_locators,ignore_non_matching);if(std::numeric_limitsuint64_t::max()!h){break;}//如果没有找到匹配的就去除locators.pop_back();}}// Check what locators to keepautoitlocators.begin();// Keep non-matching locators with an heuristic value of 0.if(!ignore_non_matching){//没有匹配的情况下为0while(it!locators.end()(0heuristic(*it,external_locators,ignore_non_matching))){it;}}it不会是shm的情况从local开始// Traverse external_locators in heuristic order, checking if certain heuristic value should be ignored// external_locators index从高到低遍历for(constautoexternality:external_locators){for(constautocost:externality.second){// Check if the locators on this heuristic value should be ignored//externality.first 是indexcost.first是costuint64_tentry_heuristicheuristic_value(externality.first,cost.first);autoend_itit;size_tnum_exactly_matched0;while(end_it!locators.end()(entry_heuristicheuristic(*end_it,external_locators,ignore_non_matching))){for(constLocatorWithMasklocal_locator:cost.second){if(std::equal(end_it-address,end_it-address16,local_locator.address)){// 地址相等num_exactly_matched;break;}}end_it;}if(end_it!it){// There was at least one locator with this heuristic valueif(externality.first0num_exactly_matchedcost.second.size()end_it!locators.end()static_castsize_t(std::distance(it,end_it))num_exactly_matched){// All locators on this heuristic were the local locators, ignore this heuristic// 值一样itlocators.erase(it,end_it);}else{// We should keep this locators, remove the rest and return// 保持一个locator 这个locator 的cost 或者level 有变化itlocators.erase(end_it,locators.end());return;}}}}}//从大到小排列ExternalLocatorsstd::mapuint8_t,// externality_indexstd::mapuint8_t,// coststd::vectorLocatorWithMask// locators with their mask,std::greateruint8_t// Ordered by greater externality_index;具体算法在这儿主要是过滤了一些无用的ip这个过滤需要在初始化的时候进行设置才能起效。具体如何设置我这边没有设置过后续做了实验之后再把相关结果贴出来。这一篇我们介绍一下statelessreader如何处理这条消息下一篇我们介绍一下收到pdp消息之后PDP如何匹配。0xEE 个人信息★★★★★★关于生活和技术的思考★★★★★★微信公众账号罗西的思考如果您想及时得到个人撰写文章的消息推送或者想看看个人推荐的技术资料敬请关注。