基于steem的群签名方案实现3:节点间互相发送消息

技术文档网 2021-04-20

这部分主要涉及到steem的p2p_plugin这个插件和net部分,在原先消息的基础上,增加了两种消息:vss_message和vk_message,这两种消息的发送方式有些不同,vss_meassage是使用vss分享秘钥时,针对不同节点的秘钥分享,因此对其他节点发送其对应的应该获得的消息,这个消息应该通过可信信道发送,使得其他人无法得知消息的内容,但是,在本系统中并未对消息进行加密。vk_message是节点需要对其他节点进行广播的消息。

  1. p2p_plugin的工作过程

    p2p_plugin中的connect_to_p2p_network,启动多个线程,不断进行节点,发送消息,同步交易,区块的工作:

         void node_impl::connect_to_p2p_network()
         {
         VERIFY_CORRECT_THREAD();
    
         assert(_node_public_key != fc::ecc::public_key_data());
    
         assert(!_accept_loop_complete.valid() &&
                 !_p2p_network_connect_loop_done.valid() &&
                 !_fetch_sync_items_loop_done.valid() &&
                 !_fetch_item_loop_done.valid() &&
                 !_advertise_inventory_loop_done.valid() &&
                 !_terminate_inactive_connections_loop_done.valid() &&
                 !_fetch_updated_peer_lists_loop_done.valid() &&
                 !_bandwidth_monitor_loop_done.valid() &&
                 !_dump_node_status_task_done.valid() &&
                 !_broadcast_vk_message_loop_done.valid());
         if (_node_configuration.accept_incoming_connections)
             _accept_loop_complete = async_task( [=](){ accept_loop(); }, "accept_loop");//将已知的节点加入handshaking_connections,send_hello_message
         _p2p_network_connect_loop_done = async_task( [=]() { p2p_network_connect_loop(); }, "p2p_network_connect_loop" );//不断监听链接请求,将新节点加入handshaking_connections,connect_to_endpoint,connect_to_task(节点间相互发送消息,获取节点状态),read_loop,on_message,根据收到的消息进行处理,建立连接等
         _fetch_sync_items_loop_done = async_task( [=]() { fetch_sync_items_loop(); }, "fetch_sync_items_loop" );
         _fetch_item_loop_done = async_task( [=]() { fetch_items_loop(); }, "fetch_items_loop" );
         _advertise_inventory_loop_done = async_task( [=]() { advertise_inventory_loop(); }, "advertise_inventory_loop" );
         _terminate_inactive_connections_loop_done = async_task( [=]() { terminate_inactive_connections_loop(); }, "terminate_inactive_connections_loop" );
         _fetch_updated_peer_lists_loop_done = async_task([=](){ fetch_updated_peer_lists_loop(); }, "fetch_updated_peer_lists_loop");//address_request_message,更新自己的地址列表,将别的节点的已知节点地址加入自己的潜在列表,在p2p_network_connect_loop中进行链接
         _bandwidth_monitor_loop_done = async_task([=](){ bandwidth_monitor_loop(); }, "bandwidth_monitor_loop");
         _dump_node_status_task_done = async_task([=](){ dump_node_status_task(); }, "dump_node_status_task");
         _broadcast_vk_message_loop_done = async_task([=] () {broadcast_vk_message_loop();}, "broadcast vk message");// 发送vk,函数定下在下文
         }
    

    这段代码中最重要的函数是p2p_network_connect_loop 这个函数将自己已知的加点加入handshaking_connections的队列,调用connect_to_endpoint与节点建立连接,-->connect_to_task --> read_loop 不断监听该节点发送的消息-->on_message处理消息(收到),其他节点广播的交易,区块也是通过on_message处理。

  2. 增加节点的编号属性

    由于发送vss消息时,需要对应节点的编号发送相应的vss_message,所以一个节点在与另一个节点建立链接的时必须告诉节点,它的编号是多少,因此,我把节点的编号的通知放在hello_message中

        struct hello_message
        {
            static const core_message_type_enum type;
    
            std::string                user_agent;
            uint32_t                   core_protocol_version;
            fc::ip::address            inbound_address;
            uint16_t                   inbound_port;
            uint16_t                   outbound_port;
            node_id_t                  node_public_key;
            fc::ecc::compact_signature signed_shared_secret;
            fc::variant_object         user_data;
        //--------group-signature-------------------
            uint16_t                   group_number; //节点的编号,从1开始
        //--------------------------------------------
            hello_message() {}
            hello_message(const std::string& user_agent,
                        uint32_t core_protocol_version,
                        const fc::ip::address& inbound_address,
                        uint16_t inbound_port,
                        uint16_t outbound_port,
                        const node_id_t& node_public_key,
                        const fc::ecc::compact_signature& signed_shared_secret,
                        const fc::variant_object& user_data,
                        uint16_t group_number ) :
            user_agent(user_agent),
            core_protocol_version(core_protocol_version),
            inbound_address(inbound_address),
            inbound_port(inbound_port),
            outbound_port(outbound_port),
            node_public_key(node_public_key),
            signed_shared_secret(signed_shared_secret),
            user_data(user_data),
            group_number(group_number)
            {}
        };
    

    在hello_message的处理函数,on_hello_message中,增加记录节点编号的代码

        originating_peer->group_number = hello_message_received.group_number;
    
  3. vss_message, vk_message 增加,发送,接收

    1. 定义vss_message,vk_message 在libraries/net/include/graphene/net/core_messages.hpp中增加vss_message和vk_message的类型定义。

        struct vss_message
        {
            static const core_message_type_enum type;
            std::string si;
            std::string ti;
            std::vector<std::string> Ei;
            uint16_t number;
            vss_message(){}
            vss_message(std::string s, std::string t, std::vector<std::string>Ei,uint16_t number):
            si(s),ti(t),Ei(Ei),number(number)
            {}
        };
      
        struct vk_message
        {
            static const core_message_type_enum type;
            std::string vk;
            uint16_t number;
            vk_message(){}
            vk_message(std::string vk, uint16_t number):
            vk(vk), number(number)
            {}
        };
      
        FC_REFLECT(graphene::net::vss_message,(si)(ti)(Ei)(number))
        FC_REFLECT(graphene::net::vk_message,(vk)(number))
      

      libraries/net/core_messages.cpp中,增加类型定义:

        const core_message_type_enum vss_message::type    = core_message_type_enum::vss_message_type;
        const core_message_type_enum vk_message::type   = core_message_type_enum::vk_message_type;
      
    2. 定义send_vss_message

      libraries/net/node.cpp中定义,向某个节点发送vss_message的函数:

        void node_impl::send_vss_message(const peer_connection_ptr& peer)
        {
            std::list<peer_connection_ptr> original_active_peers(_active_connections.begin(),_active_connections.end());
            for( const peer_connection_ptr& active_peer: original_active_peers )
            {
                try
                {
                    uint16_t n = active_peer->group_number;
                    steem::plugins::group_signature::group_signature_plugin& pp = appbase::app().get_plugin<steem::plugins::group_signature::group_signature_plugin>();
                    vss_message vss( pp.my->si_string[n-1], pp.my->ti_string[n-1], pp.my->Ei_string,_group_number);
                    active_peer->send_message(vss);
                    ilog("send vss message to ${peer}\n\n",("peer",active_peer->get_remote_endpoint()));
                }
                catch(const std::exception& e)
                {
                    std::cerr << e.what() << '\n';
                }
            }
        }
      

      设置send_vss_message函数的调用时机,我选在在每当有一个节点加入active_connections时,向该节点发送vss_message

        void node_impl::move_peer_to_active_list(const peer_connection_ptr& peer)
        {
        VERIFY_CORRECT_THREAD();//同步与互斥管理
        _active_connections.insert(peer);
        _handshaking_connections.erase(peer);
        _closing_connections.erase(peer);
        _terminating_connections.erase(peer);
        fc_ilog(fc::logger::get("sync"), "New peer is connected (${peer}), now ${count} active peers",
                ("peer", peer->get_remote_endpoint())
                ("count", _active_connections.size()));
        ilog("New peer is connected (${peer}), now ${count} active peers",
                ("peer", peer->get_remote_endpoint())
                ("count", _active_connections.size()));
        std::cout<<"this is active list\n\n";
        send_vss_message(peer);
        //不断检查是否形成了vk
      
        ilog("send a vss message to ${peer}",("peer",peer->get_remote_endpoint()));
      
        }
      
    3. 在node.cpp中,定义vss_message的处理函数,on_vss_message,vk_message的处理函数,on_vk_message

       void node_impl::on_vss_message(peer_connection* originating_peer, const vss_message& vss_message_received)
       {
           VERIFY_CORRECT_THREAD();
           ilog("received a vss message from ${peer}",("peer",originating_peer->get_remote_endpoint()));
           uint16_t n = vss_message_received.number -1;
           for(int i = 0; i< (int)vss_message_received.Ei.size(); i++){
               std::cout<<vss_message_received.Ei[i]<<"\n";
           }
           steem::plugins::group_signature::group_signature_plugin& pp = appbase::app().get_plugin<steem::plugins::group_signature::group_signature_plugin>();
           if(pp.my->si_string_received[n] == "" || pp.my->si_string_received[n] != vss_message_received.si){
               if(pp.my->VerifyShare(vss_message_received.si, vss_message_received.ti, vss_message_received.Ei))//验证
               {
                   pp.my->si_string_received[n] = vss_message_received.si;//验证通过后,记录到group_signature_plugin
                   pp.my->ti_string_received[n] = vss_message_received.ti;
                   pp.my->mpkGen();
                   trigger_broadcast_vk_message_loop();
               }
               else{
                   std::cout<<"vss message is invalid\n\n";
               }
           }
           else{
               std::cout<<"vss mesage is already received\n\n";
           }
           return;
       }
       void node_impl::on_vk_message(peer_connection* originating_peer, const vk_message& vk_message_received)
       {
           VERIFY_CORRECT_THREAD();
           // ilog("received a vk message from ${peer}",("peer",originating_peer->get_remote_endpoint()));
           uint16_t n = vk_message_received.number -1;
           std::cout<<"received a vk message\n\n";
           steem::plugins::group_signature::group_signature_plugin& pp = appbase::app().get_plugin<steem::plugins::group_signature::group_signature_plugin>();
           if(pp.my->g_alpha_i_string_received[n] == "" || pp.my->g_alpha_i_string_received[n] != vk_message_received.vk){
               std::cout<<"ok\n\n";
               pp.my->g_alpha_i_string_received[n] = vk_message_received.vk;
               pp.my->g_alphaGen();
           }
           else{
               std::cout<<"this vk message is already received\n\n";
           }
       }
      

      这三个函数都需要在node.cpp中进行声明

       void on_vss_message( peer_connection* originating_peer,const vss_message& vss_message_received);
       void send_vss_message(const peer_connection_ptr& peer);
       void on_vk_message(peer_connection* originating_peer,const vk_message& vk_message_received);
      

      在on_message函数的switch-case语句中,增加vss_message和vk_message的处理函数:

       case core_message_type_enum::vk_message_type:
           on_vk_message(originating_peer, received_message.as<vk_message>());
           break;
       case core_message_type_enum::vss_message_type:
           on_vss_message(originating_peer, received_message.as<vss_message>());
           break;
      

      vk_messagen不是对某个特定的节点发送,是对所有的节点进行广播,因此,我们定义一个vk的广播函数,以及可以触发广播的函数

       void node_impl::broadcast_vk_message_loop()
       {
           const steem::plugins::group_signature::group_signature_plugin& pp = appbase::app().get_plugin<steem::plugins::group_signature::group_signature_plugin>();
           if(pp.my->vk_string != ""){
               //广播vk
               std::cout<<"broadcast vk\n\n";
               broadcast(vk_message(pp.my->vk_string, _group_number));
           }
           if(!_node_is_shutting_down&&!_broadcast_vk_message_loop_done.canceled())
           {
               _broadcast_vk_message_loop_done = schedule_task( [this](){broadcast_vk_message_loop();},
                                                               fc::time_point::now() + fc::minutes(GRAPHENE_NET_BROADCAST_VK_MESSAGE_INTERVAL_MINUTES),
                                                               "broadcast vk message loop");
           }
       }
       void node_impl::trigger_broadcast_vk_message_loop()
       {
           VERIFY_CORRECT_THREAD();
           if( _retrigger_broadcast_vk_message_loop_promise )
               _retrigger_broadcast_vk_message_loop_promise->set_value();
       }
       fc::promise<void>::ptr         _retrigger_broadcast_vk_message_loop_promise;
       fc::future<void>              _broadcast_vk_message_loop_done;
      

相关文章

  1. 硅谷互联网公司的开发流程

    开发流程包括这么几个阶段: OKR 的设立; 主项目及其子项目的确立; 每个子项目的生命周期; 主项目的生命周期; 收尾、维护、复盘。 第一点,OKR 的设立 所有项目的起始,都应该从 Ro

  2. RESTful-表述性状态转移风格

    REST英文全拼:Representational State Transfer 面向资源编程 资源指的就是一类数据 产品表-&gt;就是产品资源 最重要的是如何表示一个资源 地址即

  3. 稳定性思考

    产品功能线 0-1: 当系统从无到有的时候,首要考虑的是研发效率,功能快速迭代,满足快速增长的业务需求 1-10 系统已经搭建起来,此时考虑的是系统的稳定性。 可用性:1.隔离:区分出核心和非核心功能

  4. Supervisor守护队列发邮件

    安装 CentOS: yum -y install supervisor Debien/Ubuntu适用:apt-get install supervisor 配置 修改主配置文件:vim /et

  5. 安装libsodium,让服务器支持chacha20等加密方式

    用chacha20加密方式需要安装libsodium 注意:libsodium从1.0.15开始就废弃了aes-128-ctr yum install wget m2crypto git libsod

随机推荐

  1. 硅谷互联网公司的开发流程

    开发流程包括这么几个阶段: OKR 的设立; 主项目及其子项目的确立; 每个子项目的生命周期; 主项目的生命周期; 收尾、维护、复盘。 第一点,OKR 的设立 所有项目的起始,都应该从 Ro

  2. RESTful-表述性状态转移风格

    REST英文全拼:Representational State Transfer 面向资源编程 资源指的就是一类数据 产品表-&gt;就是产品资源 最重要的是如何表示一个资源 地址即

  3. 稳定性思考

    产品功能线 0-1: 当系统从无到有的时候,首要考虑的是研发效率,功能快速迭代,满足快速增长的业务需求 1-10 系统已经搭建起来,此时考虑的是系统的稳定性。 可用性:1.隔离:区分出核心和非核心功能

  4. Supervisor守护队列发邮件

    安装 CentOS: yum -y install supervisor Debien/Ubuntu适用:apt-get install supervisor 配置 修改主配置文件:vim /et

  5. 安装libsodium,让服务器支持chacha20等加密方式

    用chacha20加密方式需要安装libsodium 注意:libsodium从1.0.15开始就废弃了aes-128-ctr yum install wget m2crypto git libsod