From da680e40bc346a3ce51da1f7f07903b94adf46eb Mon Sep 17 00:00:00 2001 From: Cassandra Heart Date: Thu, 13 Nov 2025 04:53:05 -0600 Subject: [PATCH] v2.1.0.6 --- config/version.go | 2 +- consensus/README.md | 689 +++++++++++++++++- consensus/consensus_committee.go | 9 +- consensus/eventhandler/event_handler.go | 10 +- consensus/mocks/dynamic_committee.go | 16 +- consensus/mocks/replicas.go | 16 +- consensus/safetyrules/safety_rules.go | 5 +- consensus/validator/validator.go | 5 +- crates/vdf/build.rs | 2 +- node/consensus/app/app_consensus_engine.go | 4 +- .../app/consensus_dynamic_committee.go | 36 +- .../app/consensus_voting_provider.go | 5 +- .../global/consensus_dynamic_committee.go | 36 +- node/consensus/global/event_distributor.go | 269 +++++-- node/consensus/global/genesis.go | 5 + .../global/global_consensus_engine.go | 223 +++--- node/consensus/global/services.go | 75 +- node/consensus/time/global_time_reel.go | 3 +- .../validator/bls_global_frame_validator.go | 11 +- node/datarpc/data_worker_ipc_server.go | 6 +- .../engines/global_execution_engine.go | 1 + .../intrinsics/global/global_intrinsic.go | 75 +- .../global/global_prover_shard_update.go | 42 +- node/p2p/internal/peer_monitor.go | 1 - node/store/clock.go | 28 +- protobufs/global.go | 19 +- 26 files changed, 1268 insertions(+), 325 deletions(-) diff --git a/config/version.go b/config/version.go index 821643e..c314765 100644 --- a/config/version.go +++ b/config/version.go @@ -43,7 +43,7 @@ func FormatVersion(version []byte) string { } func GetPatchNumber() byte { - return 0x05 + return 0x06 } func GetRCNumber() byte { diff --git a/consensus/README.md b/consensus/README.md index e75035f..0f7bcac 100644 --- a/consensus/README.md +++ b/consensus/README.md @@ -1,4 +1,689 @@ # Consensus State Machine -Consensus State Machine is being swapped out with a fork of the HotStuff implementation by Flow. -This will be updated with appropriate license details when the fork work has finished. +The Consensus State Machine has been swapped with a fork of the HotStuff implementation by Flow. + +High level, there's a few key differences: + +1. Terminology +Flow uses view as the term for the monotonically incrementing consensus state changes, either by +Quorum Certificate or Timeout Certificate. We use the term rank, following our existing +terminology we maintained for the original CSM. + +Flow also refers to the incremental states as blocks. We used the generic term state, as our +incremental states are named frames, but other projects may have a different state bearing +object. + +2. Generics +Flow's core data structures leaked into the consensus package, resulting in state bearing types +and other types being hard-defined to the types flow uses in the higher level of the protocol. +We adopted generic parameters instead, StateT, VoteT, PeerIDT, and CollectedT, to refer to +state-bearing types, vote types, peer identification types, and collected (via mempool or mixer) +payloads. + + +# License + +Flow and Quilibrium are license compatible, with their protocol also utilizing the AGPL. +Regardless, we leave their license in full below for reference: + + GNU AFFERO GENERAL PUBLIC LICENSE + Version 3, 19 November 2007 + + Copyright (C) 2007 Free Software Foundation, Inc. + Everyone is permitted to copy and distribute verbatim copies + of this license document, but changing it is not allowed. + + Preamble + + The GNU Affero General Public License is a free, copyleft license for +software and other kinds of works, specifically designed to ensure +cooperation with the community in the case of network server software. + + The licenses for most software and other practical works are designed +to take away your freedom to share and change the works. By contrast, +our General Public Licenses are intended to guarantee your freedom to +share and change all versions of a program--to make sure it remains free +software for all its users. + + When we speak of free software, we are referring to freedom, not +price. Our General Public Licenses are designed to make sure that you +have the freedom to distribute copies of free software (and charge for +them if you wish), that you receive source code or can get it if you +want it, that you can change the software or use pieces of it in new +free programs, and that you know you can do these things. + + Developers that use our General Public Licenses protect your rights +with two steps: (1) assert copyright on the software, and (2) offer +you this License which gives you legal permission to copy, distribute +and/or modify the software. + + A secondary benefit of defending all users' freedom is that +improvements made in alternate versions of the program, if they +receive widespread use, become available for other developers to +incorporate. Many developers of free software are heartened and +encouraged by the resulting cooperation. However, in the case of +software used on network servers, this result may fail to come about. +The GNU General Public License permits making a modified version and +letting the public access it on a server without ever releasing its +source code to the public. + + The GNU Affero General Public License is designed specifically to +ensure that, in such cases, the modified source code becomes available +to the community. It requires the operator of a network server to +provide the source code of the modified version running there to the +users of that server. Therefore, public use of a modified version, on +a publicly accessible server, gives the public access to the source +code of the modified version. + + An older license, called the Affero General Public License and +published by Affero, was designed to accomplish similar goals. This is +a different license, not a version of the Affero GPL, but Affero has +released a new version of the Affero GPL which permits relicensing under +this license. + + The precise terms and conditions for copying, distribution and +modification follow. + + TERMS AND CONDITIONS + + 0. Definitions. + + "This License" refers to version 3 of the GNU Affero General Public License. + + "Copyright" also means copyright-like laws that apply to other kinds of +works, such as semiconductor masks. + + "The Program" refers to any copyrightable work licensed under this +License. Each licensee is addressed as "you". "Licensees" and +"recipients" may be individuals or organizations. + + To "modify" a work means to copy from or adapt all or part of the work +in a fashion requiring copyright permission, other than the making of an +exact copy. The resulting work is called a "modified version" of the +earlier work or a work "based on" the earlier work. + + A "covered work" means either the unmodified Program or a work based +on the Program. + + To "propagate" a work means to do anything with it that, without +permission, would make you directly or secondarily liable for +infringement under applicable copyright law, except executing it on a +computer or modifying a private copy. Propagation includes copying, +distribution (with or without modification), making available to the +public, and in some countries other activities as well. + + To "convey" a work means any kind of propagation that enables other +parties to make or receive copies. Mere interaction with a user through +a computer network, with no transfer of a copy, is not conveying. + + An interactive user interface displays "Appropriate Legal Notices" +to the extent that it includes a convenient and prominently visible +feature that (1) displays an appropriate copyright notice, and (2) +tells the user that there is no warranty for the work (except to the +extent that warranties are provided), that licensees may convey the +work under this License, and how to view a copy of this License. If +the interface presents a list of user commands or options, such as a +menu, a prominent item in the list meets this criterion. + + 1. Source Code. + + The "source code" for a work means the preferred form of the work +for making modifications to it. "Object code" means any non-source +form of a work. + + A "Standard Interface" means an interface that either is an official +standard defined by a recognized standards body, or, in the case of +interfaces specified for a particular programming language, one that +is widely used among developers working in that language. + + The "System Libraries" of an executable work include anything, other +than the work as a whole, that (a) is included in the normal form of +packaging a Major Component, but which is not part of that Major +Component, and (b) serves only to enable use of the work with that +Major Component, or to implement a Standard Interface for which an +implementation is available to the public in source code form. A +"Major Component", in this context, means a major essential component +(kernel, window system, and so on) of the specific operating system +(if any) on which the executable work runs, or a compiler used to +produce the work, or an object code interpreter used to run it. + + The "Corresponding Source" for a work in object code form means all +the source code needed to generate, install, and (for an executable +work) run the object code and to modify the work, including scripts to +control those activities. However, it does not include the work's +System Libraries, or general-purpose tools or generally available free +programs which are used unmodified in performing those activities but +which are not part of the work. For example, Corresponding Source +includes interface definition files associated with source files for +the work, and the source code for shared libraries and dynamically +linked subprograms that the work is specifically designed to require, +such as by intimate data communication or control flow between those +subprograms and other parts of the work. + + The Corresponding Source need not include anything that users +can regenerate automatically from other parts of the Corresponding +Source. + + The Corresponding Source for a work in source code form is that +same work. + + 2. Basic Permissions. + + All rights granted under this License are granted for the term of +copyright on the Program, and are irrevocable provided the stated +conditions are met. This License explicitly affirms your unlimited +permission to run the unmodified Program. The output from running a +covered work is covered by this License only if the output, given its +content, constitutes a covered work. This License acknowledges your +rights of fair use or other equivalent, as provided by copyright law. + + You may make, run and propagate covered works that you do not +convey, without conditions so long as your license otherwise remains +in force. You may convey covered works to others for the sole purpose +of having them make modifications exclusively for you, or provide you +with facilities for running those works, provided that you comply with +the terms of this License in conveying all material for which you do +not control copyright. Those thus making or running the covered works +for you must do so exclusively on your behalf, under your direction +and control, on terms that prohibit them from making any copies of +your copyrighted material outside their relationship with you. + + Conveying under any other circumstances is permitted solely under +the conditions stated below. Sublicensing is not allowed; section 10 +makes it unnecessary. + + 3. Protecting Users' Legal Rights From Anti-Circumvention Law. + + No covered work shall be deemed part of an effective technological +measure under any applicable law fulfilling obligations under article +11 of the WIPO copyright treaty adopted on 20 December 1996, or +similar laws prohibiting or restricting circumvention of such +measures. + + When you convey a covered work, you waive any legal power to forbid +circumvention of technological measures to the extent such circumvention +is effected by exercising rights under this License with respect to +the covered work, and you disclaim any intention to limit operation or +modification of the work as a means of enforcing, against the work's +users, your or third parties' legal rights to forbid circumvention of +technological measures. + + 4. Conveying Verbatim Copies. + + You may convey verbatim copies of the Program's source code as you +receive it, in any medium, provided that you conspicuously and +appropriately publish on each copy an appropriate copyright notice; +keep intact all notices stating that this License and any +non-permissive terms added in accord with section 7 apply to the code; +keep intact all notices of the absence of any warranty; and give all +recipients a copy of this License along with the Program. + + You may charge any price or no price for each copy that you convey, +and you may offer support or warranty protection for a fee. + + 5. Conveying Modified Source Versions. + + You may convey a work based on the Program, or the modifications to +produce it from the Program, in the form of source code under the +terms of section 4, provided that you also meet all of these conditions: + + a) The work must carry prominent notices stating that you modified + it, and giving a relevant date. + + b) The work must carry prominent notices stating that it is + released under this License and any conditions added under section + 7. This requirement modifies the requirement in section 4 to + "keep intact all notices". + + c) You must license the entire work, as a whole, under this + License to anyone who comes into possession of a copy. This + License will therefore apply, along with any applicable section 7 + additional terms, to the whole of the work, and all its parts, + regardless of how they are packaged. This License gives no + permission to license the work in any other way, but it does not + invalidate such permission if you have separately received it. + + d) If the work has interactive user interfaces, each must display + Appropriate Legal Notices; however, if the Program has interactive + interfaces that do not display Appropriate Legal Notices, your + work need not make them do so. + + A compilation of a covered work with other separate and independent +works, which are not by their nature extensions of the covered work, +and which are not combined with it such as to form a larger program, +in or on a volume of a storage or distribution medium, is called an +"aggregate" if the compilation and its resulting copyright are not +used to limit the access or legal rights of the compilation's users +beyond what the individual works permit. Inclusion of a covered work +in an aggregate does not cause this License to apply to the other +parts of the aggregate. + + 6. Conveying Non-Source Forms. + + You may convey a covered work in object code form under the terms +of sections 4 and 5, provided that you also convey the +machine-readable Corresponding Source under the terms of this License, +in one of these ways: + + a) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by the + Corresponding Source fixed on a durable physical medium + customarily used for software interchange. + + b) Convey the object code in, or embodied in, a physical product + (including a physical distribution medium), accompanied by a + written offer, valid for at least three years and valid for as + long as you offer spare parts or customer support for that product + model, to give anyone who possesses the object code either (1) a + copy of the Corresponding Source for all the software in the + product that is covered by this License, on a durable physical + medium customarily used for software interchange, for a price no + more than your reasonable cost of physically performing this + conveying of source, or (2) access to copy the + Corresponding Source from a network server at no charge. + + c) Convey individual copies of the object code with a copy of the + written offer to provide the Corresponding Source. This + alternative is allowed only occasionally and noncommercially, and + only if you received the object code with such an offer, in accord + with subsection 6b. + + d) Convey the object code by offering access from a designated + place (gratis or for a charge), and offer equivalent access to the + Corresponding Source in the same way through the same place at no + further charge. You need not require recipients to copy the + Corresponding Source along with the object code. If the place to + copy the object code is a network server, the Corresponding Source + may be on a different server (operated by you or a third party) + that supports equivalent copying facilities, provided you maintain + clear directions next to the object code saying where to find the + Corresponding Source. Regardless of what server hosts the + Corresponding Source, you remain obligated to ensure that it is + available for as long as needed to satisfy these requirements. + + e) Convey the object code using peer-to-peer transmission, provided + you inform other peers where the object code and Corresponding + Source of the work are being offered to the general public at no + charge under subsection 6d. + + A separable portion of the object code, whose source code is excluded +from the Corresponding Source as a System Library, need not be +included in conveying the object code work. + + A "User Product" is either (1) a "consumer product", which means any +tangible personal property which is normally used for personal, family, +or household purposes, or (2) anything designed or sold for incorporation +into a dwelling. In determining whether a product is a consumer product, +doubtful cases shall be resolved in favor of coverage. For a particular +product received by a particular user, "normally used" refers to a +typical or common use of that class of product, regardless of the status +of the particular user or of the way in which the particular user +actually uses, or expects or is expected to use, the product. A product +is a consumer product regardless of whether the product has substantial +commercial, industrial or non-consumer uses, unless such uses represent +the only significant mode of use of the product. + + "Installation Information" for a User Product means any methods, +procedures, authorization keys, or other information required to install +and execute modified versions of a covered work in that User Product from +a modified version of its Corresponding Source. The information must +suffice to ensure that the continued functioning of the modified object +code is in no case prevented or interfered with solely because +modification has been made. + + If you convey an object code work under this section in, or with, or +specifically for use in, a User Product, and the conveying occurs as +part of a transaction in which the right of possession and use of the +User Product is transferred to the recipient in perpetuity or for a +fixed term (regardless of how the transaction is characterized), the +Corresponding Source conveyed under this section must be accompanied +by the Installation Information. But this requirement does not apply +if neither you nor any third party retains the ability to install +modified object code on the User Product (for example, the work has +been installed in ROM). + + The requirement to provide Installation Information does not include a +requirement to continue to provide support service, warranty, or updates +for a work that has been modified or installed by the recipient, or for +the User Product in which it has been modified or installed. Access to a +network may be denied when the modification itself materially and +adversely affects the operation of the network or violates the rules and +protocols for communication across the network. + + Corresponding Source conveyed, and Installation Information provided, +in accord with this section must be in a format that is publicly +documented (and with an implementation available to the public in +source code form), and must require no special password or key for +unpacking, reading or copying. + + 7. Additional Terms. + + "Additional permissions" are terms that supplement the terms of this +License by making exceptions from one or more of its conditions. +Additional permissions that are applicable to the entire Program shall +be treated as though they were included in this License, to the extent +that they are valid under applicable law. If additional permissions +apply only to part of the Program, that part may be used separately +under those permissions, but the entire Program remains governed by +this License without regard to the additional permissions. + + When you convey a copy of a covered work, you may at your option +remove any additional permissions from that copy, or from any part of +it. (Additional permissions may be written to require their own +removal in certain cases when you modify the work.) You may place +additional permissions on material, added by you to a covered work, +for which you have or can give appropriate copyright permission. + + Notwithstanding any other provision of this License, for material you +add to a covered work, you may (if authorized by the copyright holders of +that material) supplement the terms of this License with terms: + + a) Disclaiming warranty or limiting liability differently from the + terms of sections 15 and 16 of this License; or + + b) Requiring preservation of specified reasonable legal notices or + author attributions in that material or in the Appropriate Legal + Notices displayed by works containing it; or + + c) Prohibiting misrepresentation of the origin of that material, or + requiring that modified versions of such material be marked in + reasonable ways as different from the original version; or + + d) Limiting the use for publicity purposes of names of licensors or + authors of the material; or + + e) Declining to grant rights under trademark law for use of some + trade names, trademarks, or service marks; or + + f) Requiring indemnification of licensors and authors of that + material by anyone who conveys the material (or modified versions of + it) with contractual assumptions of liability to the recipient, for + any liability that these contractual assumptions directly impose on + those licensors and authors. + + All other non-permissive additional terms are considered "further +restrictions" within the meaning of section 10. If the Program as you +received it, or any part of it, contains a notice stating that it is +governed by this License along with a term that is a further +restriction, you may remove that term. If a license document contains +a further restriction but permits relicensing or conveying under this +License, you may add to a covered work material governed by the terms +of that license document, provided that the further restriction does +not survive such relicensing or conveying. + + If you add terms to a covered work in accord with this section, you +must place, in the relevant source files, a statement of the +additional terms that apply to those files, or a notice indicating +where to find the applicable terms. + + Additional terms, permissive or non-permissive, may be stated in the +form of a separately written license, or stated as exceptions; +the above requirements apply either way. + + 8. Termination. + + You may not propagate or modify a covered work except as expressly +provided under this License. Any attempt otherwise to propagate or +modify it is void, and will automatically terminate your rights under +this License (including any patent licenses granted under the third +paragraph of section 11). + + However, if you cease all violation of this License, then your +license from a particular copyright holder is reinstated (a) +provisionally, unless and until the copyright holder explicitly and +finally terminates your license, and (b) permanently, if the copyright +holder fails to notify you of the violation by some reasonable means +prior to 60 days after the cessation. + + Moreover, your license from a particular copyright holder is +reinstated permanently if the copyright holder notifies you of the +violation by some reasonable means, this is the first time you have +received notice of violation of this License (for any work) from that +copyright holder, and you cure the violation prior to 30 days after +your receipt of the notice. + + Termination of your rights under this section does not terminate the +licenses of parties who have received copies or rights from you under +this License. If your rights have been terminated and not permanently +reinstated, you do not qualify to receive new licenses for the same +material under section 10. + + 9. Acceptance Not Required for Having Copies. + + You are not required to accept this License in order to receive or +run a copy of the Program. Ancillary propagation of a covered work +occurring solely as a consequence of using peer-to-peer transmission +to receive a copy likewise does not require acceptance. However, +nothing other than this License grants you permission to propagate or +modify any covered work. These actions infringe copyright if you do +not accept this License. Therefore, by modifying or propagating a +covered work, you indicate your acceptance of this License to do so. + + 10. Automatic Licensing of Downstream Recipients. + + Each time you convey a covered work, the recipient automatically +receives a license from the original licensors, to run, modify and +propagate that work, subject to this License. You are not responsible +for enforcing compliance by third parties with this License. + + An "entity transaction" is a transaction transferring control of an +organization, or substantially all assets of one, or subdividing an +organization, or merging organizations. If propagation of a covered +work results from an entity transaction, each party to that +transaction who receives a copy of the work also receives whatever +licenses to the work the party's predecessor in interest had or could +give under the previous paragraph, plus a right to possession of the +Corresponding Source of the work from the predecessor in interest, if +the predecessor has it or can get it with reasonable efforts. + + You may not impose any further restrictions on the exercise of the +rights granted or affirmed under this License. For example, you may +not impose a license fee, royalty, or other charge for exercise of +rights granted under this License, and you may not initiate litigation +(including a cross-claim or counterclaim in a lawsuit) alleging that +any patent claim is infringed by making, using, selling, offering for +sale, or importing the Program or any portion of it. + + 11. Patents. + + A "contributor" is a copyright holder who authorizes use under this +License of the Program or a work on which the Program is based. The +work thus licensed is called the contributor's "contributor version". + + A contributor's "essential patent claims" are all patent claims +owned or controlled by the contributor, whether already acquired or +hereafter acquired, that would be infringed by some manner, permitted +by this License, of making, using, or selling its contributor version, +but do not include claims that would be infringed only as a +consequence of further modification of the contributor version. For +purposes of this definition, "control" includes the right to grant +patent sublicenses in a manner consistent with the requirements of +this License. + + Each contributor grants you a non-exclusive, worldwide, royalty-free +patent license under the contributor's essential patent claims, to +make, use, sell, offer for sale, import and otherwise run, modify and +propagate the contents of its contributor version. + + In the following three paragraphs, a "patent license" is any express +agreement or commitment, however denominated, not to enforce a patent +(such as an express permission to practice a patent or covenant not to +sue for patent infringement). To "grant" such a patent license to a +party means to make such an agreement or commitment not to enforce a +patent against the party. + + If you convey a covered work, knowingly relying on a patent license, +and the Corresponding Source of the work is not available for anyone +to copy, free of charge and under the terms of this License, through a +publicly available network server or other readily accessible means, +then you must either (1) cause the Corresponding Source to be so +available, or (2) arrange to deprive yourself of the benefit of the +patent license for this particular work, or (3) arrange, in a manner +consistent with the requirements of this License, to extend the patent +license to downstream recipients. "Knowingly relying" means you have +actual knowledge that, but for the patent license, your conveying the +covered work in a country, or your recipient's use of the covered work +in a country, would infringe one or more identifiable patents in that +country that you have reason to believe are valid. + + If, pursuant to or in connection with a single transaction or +arrangement, you convey, or propagate by procuring conveyance of, a +covered work, and grant a patent license to some of the parties +receiving the covered work authorizing them to use, propagate, modify +or convey a specific copy of the covered work, then the patent license +you grant is automatically extended to all recipients of the covered +work and works based on it. + + A patent license is "discriminatory" if it does not include within +the scope of its coverage, prohibits the exercise of, or is +conditioned on the non-exercise of one or more of the rights that are +specifically granted under this License. You may not convey a covered +work if you are a party to an arrangement with a third party that is +in the business of distributing software, under which you make payment +to the third party based on the extent of your activity of conveying +the work, and under which the third party grants, to any of the +parties who would receive the covered work from you, a discriminatory +patent license (a) in connection with copies of the covered work +conveyed by you (or copies made from those copies), or (b) primarily +for and in connection with specific products or compilations that +contain the covered work, unless you entered into that arrangement, +or that patent license was granted, prior to 28 March 2007. + + Nothing in this License shall be construed as excluding or limiting +any implied license or other defenses to infringement that may +otherwise be available to you under applicable patent law. + + 12. No Surrender of Others' Freedom. + + If conditions are imposed on you (whether by court order, agreement or +otherwise) that contradict the conditions of this License, they do not +excuse you from the conditions of this License. If you cannot convey a +covered work so as to satisfy simultaneously your obligations under this +License and any other pertinent obligations, then as a consequence you may +not convey it at all. For example, if you agree to terms that obligate you +to collect a royalty for further conveying from those to whom you convey +the Program, the only way you could satisfy both those terms and this +License would be to refrain entirely from conveying the Program. + + 13. Remote Network Interaction; Use with the GNU General Public License. + + Notwithstanding any other provision of this License, if you modify the +Program, your modified version must prominently offer all users +interacting with it remotely through a computer network (if your version +supports such interaction) an opportunity to receive the Corresponding +Source of your version by providing access to the Corresponding Source +from a network server at no charge, through some standard or customary +means of facilitating copying of software. This Corresponding Source +shall include the Corresponding Source for any work covered by version 3 +of the GNU General Public License that is incorporated pursuant to the +following paragraph. + + Notwithstanding any other provision of this License, you have +permission to link or combine any covered work with a work licensed +under version 3 of the GNU General Public License into a single +combined work, and to convey the resulting work. The terms of this +License will continue to apply to the part which is the covered work, +but the work with which it is combined will remain governed by version +3 of the GNU General Public License. + + 14. Revised Versions of this License. + + The Free Software Foundation may publish revised and/or new versions of +the GNU Affero General Public License from time to time. Such new versions +will be similar in spirit to the present version, but may differ in detail to +address new problems or concerns. + + Each version is given a distinguishing version number. If the +Program specifies that a certain numbered version of the GNU Affero General +Public License "or any later version" applies to it, you have the +option of following the terms and conditions either of that numbered +version or of any later version published by the Free Software +Foundation. If the Program does not specify a version number of the +GNU Affero General Public License, you may choose any version ever published +by the Free Software Foundation. + + If the Program specifies that a proxy can decide which future +versions of the GNU Affero General Public License can be used, that proxy's +public statement of acceptance of a version permanently authorizes you +to choose that version for the Program. + + Later license versions may give you additional or different +permissions. However, no additional obligations are imposed on any +author or copyright holder as a result of your choosing to follow a +later version. + + 15. Disclaimer of Warranty. + + THERE IS NO WARRANTY FOR THE PROGRAM, TO THE EXTENT PERMITTED BY +APPLICABLE LAW. EXCEPT WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT +HOLDERS AND/OR OTHER PARTIES PROVIDE THE PROGRAM "AS IS" WITHOUT WARRANTY +OF ANY KIND, EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, +THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE PROGRAM +IS WITH YOU. SHOULD THE PROGRAM PROVE DEFECTIVE, YOU ASSUME THE COST OF +ALL NECESSARY SERVICING, REPAIR OR CORRECTION. + + 16. Limitation of Liability. + + IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING +WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MODIFIES AND/OR CONVEYS +THE PROGRAM AS PERMITTED ABOVE, BE LIABLE TO YOU FOR DAMAGES, INCLUDING ANY +GENERAL, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE +USE OR INABILITY TO USE THE PROGRAM (INCLUDING BUT NOT LIMITED TO LOSS OF +DATA OR DATA BEING RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD +PARTIES OR A FAILURE OF THE PROGRAM TO OPERATE WITH ANY OTHER PROGRAMS), +EVEN IF SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF +SUCH DAMAGES. + + 17. Interpretation of Sections 15 and 16. + + If the disclaimer of warranty and limitation of liability provided +above cannot be given local legal effect according to their terms, +reviewing courts shall apply local law that most closely approximates +an absolute waiver of all civil liability in connection with the +Program, unless a warranty or assumption of liability accompanies a +copy of the Program in return for a fee. + + END OF TERMS AND CONDITIONS + + How to Apply These Terms to Your New Programs + + If you develop a new program, and you want it to be of the greatest +possible use to the public, the best way to achieve this is to make it +free software which everyone can redistribute and change under these terms. + + To do so, attach the following notices to the program. It is safest +to attach them to the start of each source file to most effectively +state the exclusion of warranty; and each file should have at least +the "copyright" line and a pointer to where the full notice is found. + + + Copyright (C) + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . + +Also add information on how to contact you by electronic and paper mail. + + If your software can interact with users remotely through a computer +network, you should also make sure that it provides a way for users to +get its source. For example, if your program is a web application, its +interface could display a "Source" link that leads users to an archive +of the code. There are many ways you could offer source, and different +solutions will be better for different programs; see section 13 for the +specific requirements. + + You should also get your employer (if you work as a programmer) or school, +if any, to sign a "copyright disclaimer" for the program, if necessary. +For more information on this, and how to apply and follow the GNU AGPL, see +. diff --git a/consensus/consensus_committee.go b/consensus/consensus_committee.go index 83befed..d90a97f 100644 --- a/consensus/consensus_committee.go +++ b/consensus/consensus_committee.go @@ -38,7 +38,10 @@ type Replicas interface { // Returns the following expected errors for invalid inputs: // - model.ErrRankUnknown if no rank containing the given rank is // known - LeaderForRank(rank uint64) (models.Identity, error) + LeaderForRank( + rank uint64, + selector models.Identity, + ) (models.Identity, error) // QuorumThresholdForRank returns the minimum total weight for a supermajority // at the given rank. This weight threshold is computed using the total weight @@ -78,7 +81,9 @@ type Replicas interface { // - model.ErrRankUnknown if no rank containing the given rank is // known // - IdentitiesByRank(rank uint64) ([]models.WeightedIdentity, error) + IdentitiesByRank( + rank uint64, + ) ([]models.WeightedIdentity, error) // IdentityByRank returns the full Identity for specified HotStuff // participant. The node must be a legitimate HotStuff participant with diff --git a/consensus/eventhandler/event_handler.go b/consensus/eventhandler/event_handler.go index 59c6d0f..4c7d4f6 100644 --- a/consensus/eventhandler/event_handler.go +++ b/consensus/eventhandler/event_handler.go @@ -496,7 +496,10 @@ func (e *EventHandler[ consensus.Uint64Param("current_rank", curRank), consensus.IdentityParam("self", e.committee.Self()), ) - currentLeader, err := e.committee.LeaderForRank(curRank) + currentLeader, err := e.committee.LeaderForRank( + curRank, + e.paceMaker.LatestQuorumCertificate().Identity(), + ) if err != nil { return fmt.Errorf( "failed to determine primary for new rank %d: %w", @@ -691,7 +694,10 @@ func (e *EventHandler[ return nil } // leader (node ID) for next rank - nextLeader, err := e.committee.LeaderForRank(curRank + 1) + nextLeader, err := e.committee.LeaderForRank( + curRank+1, + proposal.State.Identifier, + ) if errors.Is(err, models.ErrRankUnknown) { // We are attempting process a state in an unknown rank // This should never happen, because: diff --git a/consensus/mocks/dynamic_committee.go b/consensus/mocks/dynamic_committee.go index 468182c..2dec261 100644 --- a/consensus/mocks/dynamic_committee.go +++ b/consensus/mocks/dynamic_committee.go @@ -133,8 +133,8 @@ func (_m *DynamicCommittee) IdentityByState(stateID models.Identity, participant } // LeaderForRank provides a mock function with given fields: rank -func (_m *DynamicCommittee) LeaderForRank(rank uint64) (models.Identity, error) { - ret := _m.Called(rank) +func (_m *DynamicCommittee) LeaderForRank(rank uint64, selector models.Identity) (models.Identity, error) { + ret := _m.Called(rank, selector) if len(ret) == 0 { panic("no return value specified for LeaderForRank") @@ -142,17 +142,17 @@ func (_m *DynamicCommittee) LeaderForRank(rank uint64) (models.Identity, error) var r0 models.Identity var r1 error - if rf, ok := ret.Get(0).(func(uint64) (models.Identity, error)); ok { - return rf(rank) + if rf, ok := ret.Get(0).(func(uint64, models.Identity) (models.Identity, error)); ok { + return rf(rank, selector) } - if rf, ok := ret.Get(0).(func(uint64) models.Identity); ok { - r0 = rf(rank) + if rf, ok := ret.Get(0).(func(uint64, models.Identity) models.Identity); ok { + r0 = rf(rank, selector) } else { r0 = ret.Get(0).(models.Identity) } - if rf, ok := ret.Get(1).(func(uint64) error); ok { - r1 = rf(rank) + if rf, ok := ret.Get(1).(func(uint64, models.Identity) error); ok { + r1 = rf(rank, selector) } else { r1 = ret.Error(1) } diff --git a/consensus/mocks/replicas.go b/consensus/mocks/replicas.go index 1daee9d..d061ed9 100644 --- a/consensus/mocks/replicas.go +++ b/consensus/mocks/replicas.go @@ -73,8 +73,8 @@ func (_m *Replicas) IdentityByRank(rank uint64, participantID models.Identity) ( } // LeaderForRank provides a mock function with given fields: rank -func (_m *Replicas) LeaderForRank(rank uint64) (models.Identity, error) { - ret := _m.Called(rank) +func (_m *Replicas) LeaderForRank(rank uint64, selector models.Identity) (models.Identity, error) { + ret := _m.Called(rank, selector) if len(ret) == 0 { panic("no return value specified for LeaderForRank") @@ -82,17 +82,17 @@ func (_m *Replicas) LeaderForRank(rank uint64) (models.Identity, error) { var r0 models.Identity var r1 error - if rf, ok := ret.Get(0).(func(uint64) (models.Identity, error)); ok { - return rf(rank) + if rf, ok := ret.Get(0).(func(uint64, models.Identity) (models.Identity, error)); ok { + return rf(rank, selector) } - if rf, ok := ret.Get(0).(func(uint64) models.Identity); ok { - r0 = rf(rank) + if rf, ok := ret.Get(0).(func(uint64, models.Identity) models.Identity); ok { + r0 = rf(rank, selector) } else { r0 = ret.Get(0).(models.Identity) } - if rf, ok := ret.Get(1).(func(uint64) error); ok { - r1 = rf(rank) + if rf, ok := ret.Get(1).(func(uint64, models.Identity) error); ok { + r1 = rf(rank, selector) } else { r1 = ret.Error(1) } diff --git a/consensus/safetyrules/safety_rules.go b/consensus/safetyrules/safety_rules.go index 091c27d..c1002b7 100644 --- a/consensus/safetyrules/safety_rules.go +++ b/consensus/safetyrules/safety_rules.go @@ -128,7 +128,10 @@ func (r *SafetyRules[StateT, VoteT]) produceVote( ) } - currentLeader, err := r.committee.LeaderForRank(state.Rank) + currentLeader, err := r.committee.LeaderForRank( + state.Rank, + state.ParentQuorumCertificate.Identity(), + ) if err != nil { return nil, fmt.Errorf( "expect to have a valid leader for rank %d: %w", diff --git a/consensus/validator/validator.go b/consensus/validator/validator.go index fa097d1..f215a3b 100644 --- a/consensus/validator/validator.go +++ b/consensus/validator/validator.go @@ -333,7 +333,10 @@ func (v *Validator[StateT, VoteT]) ValidateProposal( } // check the proposer is the leader for the proposed state's rank - leader, err := v.committee.LeaderForRank(state.Rank) + leader, err := v.committee.LeaderForRank( + state.Rank, + state.ParentQuorumCertificate.Identity(), + ) if err != nil { return fmt.Errorf( "error determining leader for state %x: %w", diff --git a/crates/vdf/build.rs b/crates/vdf/build.rs index a8de622..00335fd 100644 --- a/crates/vdf/build.rs +++ b/crates/vdf/build.rs @@ -71,7 +71,7 @@ fn mod_exponentiation(base: usize, exponent: usize, modulus: usize) -> usize { macro_rules! const_fmt { () => { - "#[allow(warnings)]\nconst {}: [{}; {}] = {:#?};\n\n"; + "#[allow(warnings)]\nconst {}: [{}; {}] = {:#?};\n\n" }; } diff --git a/node/consensus/app/app_consensus_engine.go b/node/consensus/app/app_consensus_engine.go index 344323e..944d296 100644 --- a/node/consensus/app/app_consensus_engine.go +++ b/node/consensus/app/app_consensus_engine.go @@ -1948,9 +1948,9 @@ func (e *AppConsensusEngine) OnQuorumCertificateTriggeredRankChange( return } - nextLeader, err := e.LeaderForRank(newRank) + nextLeader, err := e.LeaderForRank(newRank, frame.Identity()) if err != nil { - e.logger.Error("could nto determine next prover", zap.Error(err)) + e.logger.Error("could not determine next prover", zap.Error(err)) return } diff --git a/node/consensus/app/consensus_dynamic_committee.go b/node/consensus/app/consensus_dynamic_committee.go index 7ca1b28..f60b270 100644 --- a/node/consensus/app/consensus_dynamic_committee.go +++ b/node/consensus/app/consensus_dynamic_committee.go @@ -8,7 +8,6 @@ import ( "github.com/iden3/go-iden3-crypto/poseidon" "github.com/pkg/errors" "source.quilibrium.com/quilibrium/monorepo/consensus/models" - "source.quilibrium.com/quilibrium/monorepo/protobufs" tconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus" ) @@ -108,27 +107,8 @@ func (e *AppConsensusEngine) IdentityByState( // LeaderForRank implements consensus.DynamicCommittee. func (e *AppConsensusEngine) LeaderForRank( rank uint64, + selector models.Identity, ) (models.Identity, error) { - lineage, err := e.appTimeReel.GetLineage() - if err != nil { - return "", errors.Wrap(err, "leader for rank") - } - - var found *protobufs.AppShardFrame - for _, l := range lineage { - if l.GetRank() == (rank - 1) { - found = l - break - } - } - - var selector models.Identity - if found == nil { - selector = models.Identity(make([]byte, 32)) - } else { - selector = found.Identity() - } - inputBI, err := poseidon.HashBytes(slices.Concat( []byte(selector), binary.BigEndian.AppendUint64(nil, rank), @@ -177,22 +157,12 @@ func (e *AppConsensusEngine) TimeoutThresholdForRank( ) (uint64, error) { proverInfo, err := e.proverRegistry.GetActiveProvers(e.appAddress) if err != nil { - return 0, errors.Wrap(err, "timeout threshold for rank") - } - - leader, err := e.LeaderForRank(rank) - if err != nil { - return 0, errors.Wrap(err, "timeout threshold for rank") + return 0, errors.Wrap(err, "quorum threshold for rank") } total := uint64(0) - // 2/3 majority doesn't quite work in this scenario, because if the timing out - // prover has a high enough seniority it could get things stuck where no - // timeout can occur for _, p := range proverInfo { - if !bytes.Equal(p.Address, []byte(leader)) { - total += p.Seniority - } + total += p.Seniority } return (total * 2) / 3, nil diff --git a/node/consensus/app/consensus_voting_provider.go b/node/consensus/app/consensus_voting_provider.go index 5babc4a..401f075 100644 --- a/node/consensus/app/consensus_voting_provider.go +++ b/node/consensus/app/consensus_voting_provider.go @@ -148,7 +148,10 @@ func (p *AppVotingProvider) SignVote( ) } - nextLeader, err := p.engine.LeaderForRank(state.Rank) + nextLeader, err := p.engine.LeaderForRank( + state.Rank, + state.ParentQuorumCertificate.Identity(), + ) if err != nil { p.engine.logger.Error("could not determine next prover", zap.Error(err)) return nil, errors.Wrap( diff --git a/node/consensus/global/consensus_dynamic_committee.go b/node/consensus/global/consensus_dynamic_committee.go index 730b1c7..a9b2247 100644 --- a/node/consensus/global/consensus_dynamic_committee.go +++ b/node/consensus/global/consensus_dynamic_committee.go @@ -8,7 +8,6 @@ import ( "github.com/iden3/go-iden3-crypto/poseidon" "github.com/pkg/errors" "source.quilibrium.com/quilibrium/monorepo/consensus/models" - "source.quilibrium.com/quilibrium/monorepo/protobufs" tconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus" ) @@ -108,27 +107,8 @@ func (e *GlobalConsensusEngine) IdentityByState( // LeaderForRank implements consensus.DynamicCommittee. func (e *GlobalConsensusEngine) LeaderForRank( rank uint64, + selector models.Identity, ) (models.Identity, error) { - lineage, err := e.globalTimeReel.GetLineage() - if err != nil { - return "", errors.Wrap(err, "leader for rank") - } - - var found *protobufs.GlobalFrame - for _, l := range lineage { - if l.GetRank() == (rank - 1) { - found = l - break - } - } - - var selector models.Identity - if found == nil { - selector = models.Identity(make([]byte, 32)) - } else { - selector = found.Identity() - } - inputBI, err := poseidon.HashBytes(slices.Concat( []byte(selector), binary.BigEndian.AppendUint64(nil, rank), @@ -174,22 +154,12 @@ func (e *GlobalConsensusEngine) TimeoutThresholdForRank( ) (uint64, error) { proverInfo, err := e.proverRegistry.GetActiveProvers(nil) if err != nil { - return 0, errors.Wrap(err, "timeout threshold for rank") - } - - leader, err := e.LeaderForRank(rank) - if err != nil { - return 0, errors.Wrap(err, "timeout threshold for rank") + return 0, errors.Wrap(err, "quorum threshold for rank") } total := uint64(0) - // 2/3 majority doesn't quite work in this scenario, because if the timing out - // prover has a high enough seniority it could get things stuck where no - // timeout can occur for _, p := range proverInfo { - if !bytes.Equal(p.Address, []byte(leader)) { - total += p.Seniority - } + total += p.Seniority } return (total * 2) / 3, nil diff --git a/node/consensus/global/event_distributor.go b/node/consensus/global/event_distributor.go index 3af314c..a364cdb 100644 --- a/node/consensus/global/event_distributor.go +++ b/node/consensus/global/event_distributor.go @@ -5,22 +5,28 @@ import ( "context" "encoding/hex" "fmt" - "math/rand" + "math/big" "slices" "time" pcrypto "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/peer" + "github.com/multiformats/go-multiaddr" + mn "github.com/multiformats/go-multiaddr/net" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "github.com/shopspring/decimal" "go.uber.org/zap" + "google.golang.org/grpc" "source.quilibrium.com/quilibrium/monorepo/config" "source.quilibrium.com/quilibrium/monorepo/lifecycle" "source.quilibrium.com/quilibrium/monorepo/node/consensus/provers" consensustime "source.quilibrium.com/quilibrium/monorepo/node/consensus/time" globalintrinsics "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global" "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/global/compat" + "source.quilibrium.com/quilibrium/monorepo/node/p2p" "source.quilibrium.com/quilibrium/monorepo/protobufs" + "source.quilibrium.com/quilibrium/monorepo/types/channel" typesconsensus "source.quilibrium.com/quilibrium/monorepo/types/consensus" "source.quilibrium.com/quilibrium/monorepo/types/crypto" "source.quilibrium.com/quilibrium/monorepo/types/schema" @@ -36,6 +42,8 @@ func (e *GlobalConsensusEngine) eventDistributorLoop( } }() + e.logger.Debug("starting event distributor") + // Subscribe to events from the event distributor eventCh := e.eventDistributor.Subscribe("global") defer e.eventDistributor.Unsubscribe("global") @@ -52,6 +60,8 @@ func (e *GlobalConsensusEngine) eventDistributorLoop( return } + e.logger.Debug("received event", zap.Int("event_type", int(event.Type))) + // Handle the event based on its type switch event.Type { case typesconsensus.ControlEventGlobalNewHead: @@ -60,7 +70,7 @@ func (e *GlobalConsensusEngine) eventDistributorLoop( // New global frame has been selected as the head by the time reel if data, ok := event.Data.(*consensustime.GlobalEvent); ok && data.Frame != nil { - e.logger.Debug( + e.logger.Info( "received new global head event", zap.Uint64("frame_number", data.Frame.Header.FrameNumber), ) @@ -82,11 +92,14 @@ func (e *GlobalConsensusEngine) eventDistributorLoop( e.publishKeyRegistry() } - if e.proposer != nil { + if e.proposer != nil && !e.config.Engine.ArchiveMode { workers, err := e.workerManager.RangeWorkers() if err != nil { e.logger.Error("could not retrieve workers", zap.Error(err)) } else { + if len(workers) == 0 { + e.logger.Error("no workers detected for allocation") + } allocated := true for _, w := range workers { allocated = allocated && w.Allocated @@ -372,41 +385,132 @@ func (e *GlobalConsensusEngine) evaluateForProposals( pendingFilters := [][]byte{} proposalDescriptors := []provers.ShardDescriptor{} decideDescriptors := []provers.ShardDescriptor{} - shardKeys, err := e.hypergraph.Commit(data.Frame.Header.FrameNumber) + shards, err := e.shardsStore.RangeAppShards() if err != nil { - e.logger.Error("could not commit", zap.Error(err)) + e.logger.Error("could not obtain app shard info", zap.Error(err)) return } - for key := range shardKeys { - shards, err := e.shardsStore.GetAppShards( - slices.Concat(key.L1[:], key.L2[:]), - []uint32{}, + registry, err := e.keyStore.GetKeyRegistryByProver(data.Frame.Header.Prover) + if err != nil { + e.logger.Info( + "awaiting key registry info for prover", + zap.String( + "prover_address", + hex.EncodeToString(data.Frame.Header.Prover), + ), + ) + return + } + + if registry.IdentityKey == nil || registry.IdentityKey.KeyValue == nil { + e.logger.Info("key registry info missing identity of prover") + return + } + + pub, err := pcrypto.UnmarshalEd448PublicKey(registry.IdentityKey.KeyValue) + if err != nil { + e.logger.Warn("error unmarshaling identity key", zap.Error(err)) + return + } + + peerId, err := peer.IDFromPublicKey(pub) + if err != nil { + e.logger.Warn("error deriving peer id", zap.Error(err)) + return + } + + info := e.peerInfoManager.GetPeerInfo([]byte(peerId)) + if info == nil { + e.logger.Info( + "no peer info known yet", + zap.String("peer", peer.ID(peerId).String()), + ) + return + } + + if len(info.Reachability) == 0 { + e.logger.Info( + "no reachability info known yet", + zap.String("peer", peer.ID(peerId).String()), + ) + return + } + + var client protobufs.GlobalServiceClient = nil + for _, s := range info.Reachability[0].StreamMultiaddrs { + creds, err := p2p.NewPeerAuthenticator( + e.logger, + e.config.P2P, + nil, + nil, + nil, + nil, + [][]byte{[]byte(peerId)}, + map[string]channel.AllowedPeerPolicyType{}, + map[string]channel.AllowedPeerPolicyType{}, + ).CreateClientTLSCredentials([]byte(peerId)) + if err != nil { + return + } + + ma, err := multiaddr.StringCast(s) + if err != nil { + return + } + + mga, err := mn.ToNetAddr(ma) + if err != nil { + return + } + + cc, err := grpc.NewClient( + mga.String(), + grpc.WithTransportCredentials(creds), ) if err != nil { - e.logger.Error("failed to retrieve shards", zap.Error(err)) - continue + e.logger.Debug( + "could not establish direct channel, trying next multiaddr", + zap.String("peer", peer.ID(peerId).String()), + zap.String("multiaddr", ma.String()), + zap.Error(err), + ) + return } + defer func() { + if err := cc.Close(); err != nil { + e.logger.Error("error while closing connection", zap.Error(err)) + } + }() - ps, err := e.proverRegistry.GetActiveProvers(nil) + client = protobufs.NewGlobalServiceClient(cc) + break + } + + if client == nil { + e.logger.Debug("could not get app shards from prover") + return + } + + for _, info := range shards { + resp, err := e.getAppShardsFromProver( + client, + slices.Concat(info.L1, info.L2), + info.Path, + data.Frame.Header.Prover, + ) if err != nil { - e.logger.Error("could not find global provers", zap.Error(err)) - continue + e.logger.Debug("could not get app shards from prover", zap.Error(err)) + return } - idx := rand.Int63n(int64(len(ps))) - e.syncProvider.hyperSyncWithProver(ctx, ps[idx].Address, key) - - for _, shard := range shards { - path := []int{} - bp := []byte{} - for _, p := range shard.Path { - path = append(path, int(p)) + for _, shard := range resp.Info { + bp := slices.Clone(info.L2) + for _, p := range shard.Prefix { bp = append(bp, byte(p)) } - filter := slices.Concat(key.L2[:], bp) - info, err := e.proverRegistry.GetProvers(filter) + prs, err := e.proverRegistry.GetProvers(bp) if err != nil { e.logger.Error("failed to get provers", zap.Error(err)) continue @@ -424,11 +528,11 @@ func (e *GlobalConsensusEngine) evaluateForProposals( hex.EncodeToString(allocation.ConfirmationFilter), ), ) - if bytes.Equal(allocation.ConfirmationFilter, filter) { + if bytes.Equal(allocation.ConfirmationFilter, bp) { allocated = allocation.Status != 4 if e.config.P2P.Network != 0 || data.Frame.Header.FrameNumber > 252840 { - e.logger.Debug( + e.logger.Info( "checking pending status of allocation", zap.Int("status", int(allocation.Status)), zap.Uint64("join_frame_number", allocation.JoinFrameNumber), @@ -442,55 +546,27 @@ func (e *GlobalConsensusEngine) evaluateForProposals( } } - size := e.hypergraph.GetSize(&key, path) - resp, err := e.hypergraph.GetChildrenForPath( - ctx, - &protobufs.GetChildrenForPathRequest{ - ShardKey: slices.Concat(key.L1[:], key.L2[:]), - Path: shard.Path, - }, - ) - if err != nil { - e.logger.Error("failed to get shard info", zap.Error(err)) - continue - } - e.logger.Debug( "checking descriptor for eligibility", - zap.String("shard_key", hex.EncodeToString(filter)), + zap.String("shard_key", hex.EncodeToString(bp)), ) - if len(resp.PathSegments) == 0 { - e.logger.Debug("no path segments") - continue - } - if len( - resp.PathSegments[len(resp.PathSegments)-1].Segments, - ) != 1 { + size := new(big.Int).SetBytes(shard.Size) + if size.Cmp(big.NewInt(0)) == 0 { e.logger.Debug( - "path segment length mismatch", - zap.Int( - "length", - len(resp.PathSegments[len(resp.PathSegments)-1].Segments), - ), + "no data in shard", + zap.String("shard_key", hex.EncodeToString(bp)), ) continue } - shardCount := uint64(0) - if resp.PathSegments[len(resp.PathSegments)-1].Segments[0].GetBranch() != nil { - shardCount = resp.PathSegments[len(resp.PathSegments)-1].Segments[0].GetBranch().LeafCount - } else { - shardCount = 1 - } - e.logger.Debug( "logical shard count", - zap.Int("shard_count", int(shardCount)), + zap.Int("shard_count", int(shard.DataShards)), ) above := []*typesconsensus.ProverInfo{} - for _, i := range info { + for _, i := range prs { if i.Seniority >= effectiveSeniority { above = append(above, i) } @@ -498,33 +574,33 @@ func (e *GlobalConsensusEngine) evaluateForProposals( e.logger.Debug( "appending descriptor for allocation planning", - zap.String("shard_key", hex.EncodeToString(filter)), + zap.String("shard_key", hex.EncodeToString(bp)), zap.Uint64("size", size.Uint64()), zap.Int("ring", len(above)/8), - zap.Int("shard_count", int(shardCount)), + zap.Int("shard_count", int(shard.DataShards)), ) if allocated && pending { - pendingFilters = append(pendingFilters, filter) + pendingFilters = append(pendingFilters, bp) } if !allocated { proposalDescriptors = append( proposalDescriptors, provers.ShardDescriptor{ - Filter: filter, + Filter: bp, Size: size.Uint64(), Ring: uint8(len(above) / 8), - Shards: shardCount, + Shards: shard.DataShards, }, ) } decideDescriptors = append( decideDescriptors, provers.ShardDescriptor{ - Filter: filter, + Filter: bp, Size: size.Uint64(), Ring: uint8(len(above) / 8), - Shards: shardCount, + Shards: shard.DataShards, }, ) } @@ -538,9 +614,26 @@ func (e *GlobalConsensusEngine) evaluateForProposals( if err != nil { e.logger.Error("could not plan shard allocations", zap.Error(err)) } else { + expectedRewardSum := big.NewInt(0) + for _, p := range proposals { + expectedRewardSum.Add(expectedRewardSum, p.ExpectedReward) + } + raw := decimal.NewFromBigInt(expectedRewardSum, 0) + rewardInQuilPerInterval := raw.Div(decimal.NewFromInt(8000000000)) + rewardInQuilPerDay := rewardInQuilPerInterval.Mul( + decimal.NewFromInt(24 * 60 * 6), + ) e.logger.Info( "proposed joins", zap.Int("proposals", len(proposals)), + zap.String( + "estimated_reward_per_interval", + rewardInQuilPerInterval.String(), + ), + zap.String( + "estimated_reward_per_day", + rewardInQuilPerDay.String(), + ), ) } } @@ -727,3 +820,43 @@ func (e *GlobalConsensusEngine) publishKeyRegistry() { kr, ) } + +func (e *GlobalConsensusEngine) getAppShardsFromProver( + client protobufs.GlobalServiceClient, + shardKey []byte, + path []uint32, + prover []byte, +) ( + *protobufs.GetAppShardsResponse, + error, +) { + getCtx, cancelGet := context.WithTimeout( + context.Background(), + e.config.Engine.SyncTimeout, + ) + response, err := client.GetAppShards( + getCtx, + &protobufs.GetAppShardsRequest{ + ShardKey: shardKey, + }, + // The message size limits are swapped because the server is the one + // sending the data. + grpc.MaxCallRecvMsgSize( + e.config.Engine.SyncMessageLimits.MaxSendMsgSize, + ), + grpc.MaxCallSendMsgSize( + e.config.Engine.SyncMessageLimits.MaxRecvMsgSize, + ), + ) + cancelGet() + if err != nil { + return nil, err + } + + if response == nil { + return nil, err + } + + return response, nil + +} diff --git a/node/consensus/global/genesis.go b/node/consensus/global/genesis.go index 0dc797c..f7080d2 100644 --- a/node/consensus/global/genesis.go +++ b/node/consensus/global/genesis.go @@ -158,6 +158,11 @@ func (e *GlobalConsensusEngine) initializeGenesis() ( } } + if err := txn.Commit(); err != nil { + txn.Abort() + panic(err) + } + for i := 0; i < 3; i++ { commitments[l1[i]].Insert( keyBytes, diff --git a/node/consensus/global/global_consensus_engine.go b/node/consensus/global/global_consensus_engine.go index 86e085a..d036535 100644 --- a/node/consensus/global/global_consensus_engine.go +++ b/node/consensus/global/global_consensus_engine.go @@ -494,116 +494,128 @@ func NewGlobalConsensusEngine( // Add execution engines componentBuilder.AddWorker(engine.executionManager.Start) - componentBuilder.AddWorker(engine.eventDistributor.Start) componentBuilder.AddWorker(engine.globalTimeReel.Start) - latest, err := engine.consensusStore.GetConsensusState(nil) - var state *models.CertifiedState[*protobufs.GlobalFrame] - var pending []*models.SignedProposal[ - *protobufs.GlobalFrame, - *protobufs.ProposalVote, - ] - if err != nil { - frame, qc := engine.initializeGenesis() - state = &models.CertifiedState[*protobufs.GlobalFrame]{ - State: &models.State[*protobufs.GlobalFrame]{ - Rank: 0, - Identifier: frame.Identity(), - State: &frame, - }, - CertifyingQuorumCertificate: qc, - } - pending = []*models.SignedProposal[ + if engine.config.P2P.Network == 99 || engine.config.Engine.ArchiveMode { + latest, err := engine.consensusStore.GetConsensusState(nil) + var state *models.CertifiedState[*protobufs.GlobalFrame] + var pending []*models.SignedProposal[ *protobufs.GlobalFrame, *protobufs.ProposalVote, - ]{} - } else { - qc, err := engine.clockStore.GetQuorumCertificate(nil, latest.FinalizedRank) - if err != nil { - panic(err) - } - frame, err := engine.clockStore.GetGlobalClockFrame( - qc.GetFrameNumber(), - ) - if err != nil { - panic(err) - } - parentFrame, err := engine.clockStore.GetGlobalClockFrame( - qc.GetFrameNumber() - 1, - ) - if err != nil { - panic(err) - } - parentQC, err := engine.clockStore.GetQuorumCertificate( - nil, - parentFrame.GetRank(), - ) - if err != nil { - panic(err) - } - state = &models.CertifiedState[*protobufs.GlobalFrame]{ - State: &models.State[*protobufs.GlobalFrame]{ - Rank: frame.GetRank(), - Identifier: frame.Identity(), - ProposerID: frame.Source(), - ParentQuorumCertificate: parentQC, - Timestamp: frame.GetTimestamp(), - State: &frame, - }, - CertifyingQuorumCertificate: qc, - } - pending = engine.getPendingProposals(frame.Header.FrameNumber) - } - - liveness, err := engine.consensusStore.GetLivenessState(nil) - if err == nil { - engine.currentRank = liveness.CurrentRank - } - - engine.voteAggregator, err = voting.NewGlobalVoteAggregator[GlobalPeerID]( - tracing.NewZapTracer(logger), - engine, - voteAggregationDistributor, - engine.signatureAggregator, - engine.votingProvider, - func(qc models.QuorumCertificate) { - select { - case <-engine.haltCtx.Done(): - return - default: + ] + establishGenesis := func() { + frame, qc := engine.initializeGenesis() + state = &models.CertifiedState[*protobufs.GlobalFrame]{ + State: &models.State[*protobufs.GlobalFrame]{ + Rank: 0, + Identifier: frame.Identity(), + State: &frame, + }, + CertifyingQuorumCertificate: qc, } - engine.consensusParticipant.OnQuorumCertificateConstructedFromVotes(qc) - }, - state.Rank()+1, - ) - if err != nil { - return nil, err - } - engine.timeoutAggregator, err = voting.NewGlobalTimeoutAggregator[GlobalPeerID]( - tracing.NewZapTracer(logger), - engine, - engine, - engine.signatureAggregator, - timeoutAggregationDistributor, - engine.votingProvider, - state.Rank()+1, - ) + pending = []*models.SignedProposal[ + *protobufs.GlobalFrame, + *protobufs.ProposalVote, + ]{} + } + if err != nil { + establishGenesis() + } else { + qc, err := engine.clockStore.GetQuorumCertificate( + nil, + latest.FinalizedRank, + ) + if err != nil { + establishGenesis() + } else { + frame, err := engine.clockStore.GetGlobalClockFrame( + qc.GetFrameNumber(), + ) + if err != nil { + establishGenesis() + } else { + parentFrame, err := engine.clockStore.GetGlobalClockFrame( + qc.GetFrameNumber() - 1, + ) + if err != nil { + establishGenesis() + } else { + parentQC, err := engine.clockStore.GetQuorumCertificate( + nil, + parentFrame.GetRank(), + ) + if err != nil { + establishGenesis() + } else { + state = &models.CertifiedState[*protobufs.GlobalFrame]{ + State: &models.State[*protobufs.GlobalFrame]{ + Rank: frame.GetRank(), + Identifier: frame.Identity(), + ProposerID: frame.Source(), + ParentQuorumCertificate: parentQC, + Timestamp: frame.GetTimestamp(), + State: &frame, + }, + CertifyingQuorumCertificate: qc, + } + pending = engine.getPendingProposals(frame.Header.FrameNumber) + } + } + } + } + as, err := engine.shardsStore.RangeAppShards() + engine.logger.Info("verifying app shard information") + if err != nil || len(as) == 0 { + engine.initializeGenesis() + } + } + liveness, err := engine.consensusStore.GetLivenessState(nil) + if err == nil { + engine.currentRank = liveness.CurrentRank + } + engine.voteAggregator, err = voting.NewGlobalVoteAggregator[GlobalPeerID]( + tracing.NewZapTracer(logger), + engine, + voteAggregationDistributor, + engine.signatureAggregator, + engine.votingProvider, + func(qc models.QuorumCertificate) { + select { + case <-engine.haltCtx.Done(): + return + default: + } + engine.consensusParticipant.OnQuorumCertificateConstructedFromVotes(qc) + }, + state.Rank()+1, + ) + if err != nil { + return nil, err + } + engine.timeoutAggregator, err = voting.NewGlobalTimeoutAggregator[GlobalPeerID]( + tracing.NewZapTracer(logger), + engine, + engine, + engine.signatureAggregator, + timeoutAggregationDistributor, + engine.votingProvider, + state.Rank()+1, + ) - notifier := pubsub.NewDistributor[ - *protobufs.GlobalFrame, - *protobufs.ProposalVote, - ]() - notifier.AddConsumer(engine) - engine.notifier = notifier + notifier := pubsub.NewDistributor[ + *protobufs.GlobalFrame, + *protobufs.ProposalVote, + ]() + notifier.AddConsumer(engine) + engine.notifier = notifier - forks, err := forks.NewForks(state, engine, notifier) - if err != nil { - return nil, err - } + forks, err := forks.NewForks(state, engine, notifier) + if err != nil { + return nil, err + } - engine.forks = forks + engine.forks = forks - if engine.config.P2P.Network == 99 || engine.config.Engine.ArchiveMode { componentBuilder.AddWorker(func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, @@ -617,6 +629,12 @@ func NewGlobalConsensusEngine( <-ctx.Done() <-lifecycle.AllDone(engine.voteAggregator, engine.timeoutAggregator) }) + } else { + as, err := engine.shardsStore.RangeAppShards() + engine.logger.Info("verifying app shard information") + if err != nil || len(as) == 0 { + engine.initializeGenesis() + } } componentBuilder.AddWorker(engine.peerInfoManager.Start) @@ -730,6 +748,7 @@ func NewGlobalConsensusEngine( }) // Start event distributor event loop + componentBuilder.AddWorker(engine.eventDistributor.Start) componentBuilder.AddWorker(func( ctx lifecycle.SignalerContext, ready lifecycle.ReadyFunc, @@ -1883,6 +1902,8 @@ func (e *GlobalConsensusEngine) reportPeerInfoPeriodically( zap.Int("capabilities_count", len(peerInfo.Capabilities)), ) } + + e.publishKeyRegistry() } } } diff --git a/node/consensus/global/services.go b/node/consensus/global/services.go index e8b2e1e..82adbaf 100644 --- a/node/consensus/global/services.go +++ b/node/consensus/global/services.go @@ -17,6 +17,7 @@ import ( "source.quilibrium.com/quilibrium/monorepo/node/rpc" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/store" + "source.quilibrium.com/quilibrium/monorepo/types/tries" ) func (e *GlobalConsensusEngine) GetGlobalFrame( @@ -175,6 +176,12 @@ func (e *GlobalConsensusEngine) GetAppShards( Info: []*protobufs.AppShardInfo{}, } + fullPrefix := []uint32{} + for _, p := range tries.GetFullPath(req.ShardKey[3:]) { + fullPrefix = append(fullPrefix, uint32(p)) + } + fullPrefix = slices.Concat(fullPrefix, req.Prefix) + for _, shard := range shards { size := big.NewInt(0) commitment := [][]byte{} @@ -198,21 +205,22 @@ func (e *GlobalConsensusEngine) GetAppShards( continue } - s := c.PathSegments[len(c.PathSegments)-1] - if len(s.Segments) > 1 { - return nil, errors.Wrap(errors.New("no shard found"), "get app shards") + branch, leaf := selectPathSegmentForPrefix(c.PathSegments, fullPrefix) + if branch == nil && leaf == nil { + commitment = append(commitment, make([]byte, 64)) + continue } - switch t := s.Segments[0].Segment.(type) { - case *protobufs.TreePathSegment_Branch: - size = size.Add(size, new(big.Int).SetBytes(t.Branch.Size)) - commitment = append(commitment, t.Branch.Commitment) - dataShards += t.Branch.LeafCount - case *protobufs.TreePathSegment_Leaf: - size = size.Add(size, new(big.Int).SetBytes(t.Leaf.Size)) - commitment = append(commitment, t.Leaf.Commitment) - dataShards += 1 + if branch != nil { + size = size.Add(size, new(big.Int).SetBytes(branch.Size)) + commitment = append(commitment, branch.Commitment) + dataShards += branch.LeafCount + continue } + + size = size.Add(size, new(big.Int).SetBytes(leaf.Size)) + commitment = append(commitment, leaf.Commitment) + dataShards += 1 } shardKey := []byte{} @@ -232,6 +240,49 @@ func (e *GlobalConsensusEngine) GetAppShards( return response, nil } +// selectPathSegmentForPrefix walks the returned path segments and prioritizes +// an exact match to the provided fullPrefix. If no exact match exists, it falls +// back to the first branch whose full prefix extends the requested prefix, +// otherwise it returns the first leaf encountered. +func selectPathSegmentForPrefix( + pathSegments []*protobufs.TreePathSegments, + fullPrefix []uint32, +) (*protobufs.TreePathBranch, *protobufs.TreePathLeaf) { + var fallbackBranch *protobufs.TreePathBranch + for _, ps := range pathSegments { + for _, segment := range ps.Segments { + switch node := segment.Segment.(type) { + case *protobufs.TreePathSegment_Branch: + if slices.Equal(node.Branch.FullPrefix, fullPrefix) { + return node.Branch, nil + } + + if len(node.Branch.FullPrefix) >= len(fullPrefix) && + len(fullPrefix) > 0 && + slices.Equal( + node.Branch.FullPrefix[:len(fullPrefix)], + fullPrefix, + ) && + fallbackBranch == nil { + fallbackBranch = node.Branch + } + + if len(fullPrefix) == 0 && fallbackBranch == nil { + fallbackBranch = node.Branch + } + case *protobufs.TreePathSegment_Leaf: + return nil, node.Leaf + } + } + } + + if fallbackBranch != nil { + return fallbackBranch, nil + } + + return nil, nil +} + func (e *GlobalConsensusEngine) GetGlobalShards( ctx context.Context, req *protobufs.GetGlobalShardsRequest, diff --git a/node/consensus/time/global_time_reel.go b/node/consensus/time/global_time_reel.go index 115f566..775563b 100644 --- a/node/consensus/time/global_time_reel.go +++ b/node/consensus/time/global_time_reel.go @@ -602,7 +602,8 @@ func (g *GlobalTimeReel) findNodeBySelector(selector []byte) *GlobalFrameNode { func (g *GlobalTimeReel) evaluateForkChoice(newNode *GlobalFrameNode) { if g.head == nil || (!g.archiveMode && newNode.Frame.Header.FrameNumber > g.head.Frame.Header.FrameNumber && - newNode.Frame.Header.FrameNumber-g.head.Frame.Header.FrameNumber > 360) { + newNode.Frame.Header.FrameNumber-g.head.Frame.Header.FrameNumber > + maxGlobalTreeDepth) { oldHead := g.head g.head = newNode g.sendHeadEvent(newNode, oldHead) diff --git a/node/consensus/validator/bls_global_frame_validator.go b/node/consensus/validator/bls_global_frame_validator.go index e387e80..134924a 100644 --- a/node/consensus/validator/bls_global_frame_validator.go +++ b/node/consensus/validator/bls_global_frame_validator.go @@ -91,12 +91,6 @@ func (b *BLSGlobalFrameValidator) Validate( return false, err } - throwaway, _, err := b.blsConstructor.New() - if err != nil { - b.logger.Error("could not generate key", zap.Error(err)) - return false, errors.Wrap(err, "validate") - } - provers, err := b.proverRegistry.GetActiveProvers(nil) if err != nil { b.logger.Error("could not get active provers", zap.Error(err)) @@ -113,7 +107,10 @@ func (b *BLSGlobalFrameValidator) Validate( return false, errors.Wrap(err, "validate") } activeProverSet = append(activeProverSet, info.PublicKey) - throwawaySet = append(throwawaySet, throwaway.Public().([]byte)) + throwawaySet = append( + throwawaySet, + frame.Header.PublicKeySignatureBls48581.Signature, + ) } } diff --git a/node/datarpc/data_worker_ipc_server.go b/node/datarpc/data_worker_ipc_server.go index a51c3c9..ab1d22f 100644 --- a/node/datarpc/data_worker_ipc_server.go +++ b/node/datarpc/data_worker_ipc_server.go @@ -95,11 +95,11 @@ func NewDataWorkerIPCServer( proverRegistry: proverRegistry, frameProver: frameProver, peerInfoManager: peerInfoManager, + quit: make(chan struct{}), }, nil } func (r *DataWorkerIPCServer) Start() error { - r.quit = make(chan struct{}) r.RespawnServer(nil) <-r.quit @@ -108,7 +108,9 @@ func (r *DataWorkerIPCServer) Start() error { func (r *DataWorkerIPCServer) Stop() error { r.logger.Info("stopping server gracefully") - r.server.GracefulStop() + if r.server != nil { + r.server.GracefulStop() + } go func() { r.quit <- struct{}{} }() diff --git a/node/execution/engines/global_execution_engine.go b/node/execution/engines/global_execution_engine.go index 96430f7..46aea06 100644 --- a/node/execution/engines/global_execution_engine.go +++ b/node/execution/engines/global_execution_engine.go @@ -525,6 +525,7 @@ func (e *GlobalExecutionEngine) tryGetIntrinsic(address []byte) ( if !exists { // Load the global intrinsic loaded, err := global.LoadGlobalIntrinsic( + e.logger, address, e.hypergraph, e.inclusionProver, diff --git a/node/execution/intrinsics/global/global_intrinsic.go b/node/execution/intrinsics/global/global_intrinsic.go index 8c8fe43..d45b77a 100644 --- a/node/execution/intrinsics/global/global_intrinsic.go +++ b/node/execution/intrinsics/global/global_intrinsic.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" observability "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics" "source.quilibrium.com/quilibrium/monorepo/protobufs" "source.quilibrium.com/quilibrium/monorepo/types/consensus" @@ -22,6 +23,7 @@ import ( ) type GlobalIntrinsic struct { + logger *zap.Logger lockedWrites map[string]struct{} lockedReads map[string]int lockedWritesMx sync.RWMutex @@ -623,7 +625,6 @@ func (a *GlobalIntrinsic) Validate( return nil case protobufs.FrameHeaderType: - // Parse ProverKick directly from input pbHeader := &protobufs.FrameHeader{} if err := pbHeader.FromCanonicalBytes(input); err != nil { observability.ValidateErrors.WithLabelValues( @@ -634,6 +635,7 @@ func (a *GlobalIntrinsic) Validate( } op, err := NewProverShardUpdate( + a.logger, pbHeader, a.keyManager, a.hypergraph, @@ -1236,6 +1238,75 @@ func (a *GlobalIntrinsic) InvokeStep( observability.InvokeStepTotal.WithLabelValues("global", "prover_kick").Inc() return a.state, nil + case protobufs.FrameHeaderType: + opTimer := prometheus.NewTimer( + observability.OperationDuration.WithLabelValues( + "global", + "prover_shard_update", + ), + ) + defer opTimer.ObserveDuration() + + pbHeader := &protobufs.FrameHeader{} + if err := pbHeader.FromCanonicalBytes(input); err != nil { + observability.InvokeStepErrors.WithLabelValues( + "global", + "prover_shard_update", + ).Inc() + return nil, errors.Wrap(err, "invoke step") + } + + op, err := NewProverShardUpdate( + a.logger, + pbHeader, + a.keyManager, + a.hypergraph, + a.rdfMultiprover, + a.frameProver, + a.rewardIssuance, + a.proverRegistry, + a.blsConstructor, + ) + + valid, err := op.Verify(frameNumber) + if err != nil { + observability.InvokeStepErrors.WithLabelValues( + "global", + "prover_shard_update", + ).Inc() + return nil, errors.Wrap(err, "invoke step") + } + + if !valid { + observability.InvokeStepErrors.WithLabelValues( + "global", + "prover_shard_update", + ).Inc() + return nil, errors.Wrap( + errors.New("invalid prover shard update"), + "invoke step", + ) + } + + matTimer := prometheus.NewTimer( + observability.MaterializeDuration.WithLabelValues("global"), + ) + a.state, err = op.Materialize(frameNumber, state) + matTimer.ObserveDuration() + if err != nil { + observability.InvokeStepErrors.WithLabelValues( + "global", + "prover_shard_update", + ).Inc() + return nil, errors.Wrap(err, "invoke step") + } + + observability.InvokeStepTotal.WithLabelValues( + "global", + "prover_shard_update", + ).Inc() + return a.state, nil + default: observability.InvokeStepErrors.WithLabelValues( "global", @@ -1821,6 +1892,7 @@ func (a *GlobalIntrinsic) tryLockKick(frameNumber uint64, input []byte) ( // address. The global intrinsic is implicitly deployed and always exists at the // global address. func LoadGlobalIntrinsic( + logger *zap.Logger, address []byte, hypergraph hypergraph.Hypergraph, inclusionProver crypto.InclusionProver, @@ -1845,6 +1917,7 @@ func LoadGlobalIntrinsic( // The global intrinsic doesn't need any initialization since it's implicitly // deployed return &GlobalIntrinsic{ + logger: logger, lockedWrites: make(map[string]struct{}), lockedReads: make(map[string]int), state: nil, diff --git a/node/execution/intrinsics/global/global_prover_shard_update.go b/node/execution/intrinsics/global/global_prover_shard_update.go index bdf2efc..8c88be6 100644 --- a/node/execution/intrinsics/global/global_prover_shard_update.go +++ b/node/execution/intrinsics/global/global_prover_shard_update.go @@ -10,6 +10,7 @@ import ( "github.com/iden3/go-iden3-crypto/poseidon" "github.com/pkg/errors" + "go.uber.org/zap" "source.quilibrium.com/quilibrium/monorepo/node/execution/intrinsics/token" hgstate "source.quilibrium.com/quilibrium/monorepo/node/execution/state/hypergraph" "source.quilibrium.com/quilibrium/monorepo/protobufs" @@ -33,6 +34,7 @@ type ProverShardUpdate struct { FrameHeader *protobufs.FrameHeader // Private dependencies + logger *zap.Logger keyManager keys.KeyManager hypergraph hypergraph.Hypergraph rdfMultiprover *schema.RDFMultiprover @@ -40,9 +42,13 @@ type ProverShardUpdate struct { rewardIssuance consensus.RewardIssuance proverRegistry consensus.ProverRegistry blsConstructor crypto.BlsConstructor + + // Internal + selfProverAddress []byte } func NewProverShardUpdate( + logger *zap.Logger, frameHeader *protobufs.FrameHeader, keyManager keys.KeyManager, hypergraph hypergraph.Hypergraph, @@ -52,15 +58,28 @@ func NewProverShardUpdate( proverRegistry consensus.ProverRegistry, blsConstructor crypto.BlsConstructor, ) (*ProverShardUpdate, error) { + selfProverAddress := []byte{} + if keyManager != nil { + p, err := keyManager.GetSigningKey("q-prover-key") + if err == nil { + pub := p.Public().([]byte) + addrBI, err := poseidon.HashBytes(pub) + if err == nil { + selfProverAddress = addrBI.FillBytes(make([]byte, 32)) + } + } + } return &ProverShardUpdate{ - FrameHeader: frameHeader, - keyManager: keyManager, - hypergraph: hypergraph, - rdfMultiprover: rdfMultiprover, - frameProver: frameProver, - rewardIssuance: rewardIssuance, - proverRegistry: proverRegistry, - blsConstructor: blsConstructor, + logger: logger, + FrameHeader: frameHeader, + keyManager: keyManager, + hypergraph: hypergraph, + rdfMultiprover: rdfMultiprover, + frameProver: frameProver, + rewardIssuance: rewardIssuance, + proverRegistry: proverRegistry, + blsConstructor: blsConstructor, + selfProverAddress: selfProverAddress, }, nil } @@ -442,6 +461,13 @@ func (p *ProverShardUpdate) applyReward( balanceBytes := make([]byte, 32) currentBalance.FillBytes(balanceBytes) + if bytes.Equal(rewardAddress, p.selfProverAddress) { + p.logger.Info("reward updated", zap.String( + "raw_unit_balance", + currentBalance.String(), + )) + } + if err := p.rdfMultiprover.Set( GLOBAL_RDF_SCHEMA, intrinsics.GLOBAL_INTRINSIC_ADDRESS[:], diff --git a/node/p2p/internal/peer_monitor.go b/node/p2p/internal/peer_monitor.go index 06385eb..f741b2b 100644 --- a/node/p2p/internal/peer_monitor.go +++ b/node/p2p/internal/peer_monitor.go @@ -36,7 +36,6 @@ func (pm *peerMonitor) pingOnce( return false case res := <-pm.ps.Ping(pingCtx, peer): if res.Error != nil { - logger.Debug("ping error", zap.Error(res.Error)) return false } } diff --git a/node/store/clock.go b/node/store/clock.go index 4e81fd3..a8acb6d 100644 --- a/node/store/clock.go +++ b/node/store/clock.go @@ -919,28 +919,18 @@ func (p *PebbleClockStore) PutGlobalClockFrame( } } - frameNumberBytes := make([]byte, 8) - binary.BigEndian.PutUint64(frameNumberBytes, frameNumber) - - _, closer, err := p.db.Get(clockGlobalEarliestIndex()) - if err != nil { - if !errors.Is(err, pebble.ErrNotFound) { - return errors.Wrap(err, "put global clock frame") - } - - if err = txn.Set( - clockGlobalEarliestIndex(), - frameNumberBytes, - ); err != nil { - return errors.Wrap(err, "put global clock frame") - } - } else { - _ = closer.Close() + if err = p.updateEarliestIndex( + txn, + clockGlobalEarliestIndex(), + frameNumber, + ); err != nil { + return errors.Wrap(err, "put global clock frame") } - if err = txn.Set( + if err = p.updateLatestIndex( + txn, clockGlobalLatestIndex(), - frameNumberBytes, + frameNumber, ); err != nil { return errors.Wrap(err, "put global clock frame") } diff --git a/protobufs/global.go b/protobufs/global.go index a4979bd..c65831c 100644 --- a/protobufs/global.go +++ b/protobufs/global.go @@ -4328,7 +4328,7 @@ func (t *ProverJoin) Validate() error { return errors.Wrap(errors.New("no filters provided"), "validate") } for _, filter := range t.Filters { - if len(filter) != 32 && len(filter) != 64 { + if len(filter) < 32 || len(filter) > 64 { return errors.Wrap(errors.New("invalid filter"), "validate") } } @@ -4352,7 +4352,7 @@ func (t *ProverLeave) Validate() error { return errors.Wrap(errors.New("no filters provided"), "validate") } for _, filter := range t.Filters { - if len(filter) != 32 && len(filter) != 64 { + if len(filter) < 32 || len(filter) > 64 { return errors.Wrap(errors.New("invalid filter"), "validate") } } @@ -4370,7 +4370,7 @@ func (t *ProverPause) Validate() error { if t == nil { return errors.Wrap(errors.New("nil announce prover pause"), "validate") } - if len(t.Filter) != 32 { + if len(t.Filter) < 32 || len(t.Filter) > 64 { return errors.Wrap(errors.New("invalid filter"), "validate") } if err := t.PublicKeySignatureBls48581.Validate(); err != nil { @@ -4387,7 +4387,7 @@ func (t *ProverResume) Validate() error { if t == nil { return errors.Wrap(errors.New("nil announce prover resume"), "validate") } - if len(t.Filter) != 32 { + if len(t.Filter) < 32 || len(t.Filter) > 64 { return errors.Wrap(errors.New("invalid filter"), "validate") } if err := t.PublicKeySignatureBls48581.Validate(); err != nil { @@ -4455,7 +4455,7 @@ func (t *ProverConfirm) Validate() error { if t == nil { return errors.Wrap(errors.New("nil prover confirm"), "validate") } - if len(t.Filter) != 32 && len(t.Filter) != 64 { + if len(t.Filter) < 32 || len(t.Filter) > 64 { return errors.Wrap(errors.New("invalid filter"), "validate") } if err := t.PublicKeySignatureBls48581.Validate(); err != nil { @@ -4471,7 +4471,7 @@ func (t *ProverReject) Validate() error { if t == nil { return errors.Wrap(errors.New("nil prover reject"), "validate") } - if len(t.Filter) != 32 && len(t.Filter) != 64 { + if len(t.Filter) < 32 || len(t.Filter) > 64 { return errors.Wrap(errors.New("invalid filter"), "validate") } if err := t.PublicKeySignatureBls48581.Validate(); err != nil { @@ -4784,13 +4784,12 @@ func (p *ProverLivenessCheck) Validate() error { } // Filter should be 64 bytes or fewer - if len(p.Filter) > 64 { + if len(p.Filter) < 32 || len(p.Filter) > 64 { return errors.Wrap(errors.New("invalid filter length"), "validate") } - // Commitment hash should be 32 bytes if global, at least 32 if not - if (len(p.Filter) == 0 && len(p.CommitmentHash) != 32) || - (len(p.Filter) != 0 && len(p.CommitmentHash) < 32) { + // Commitment hash should be at least 32 bytes + if len(p.Filter) != 0 && len(p.CommitmentHash) < 32 { return errors.Wrap(errors.New("invalid commitment hash length"), "validate") }