Propagation

Learn how to quickly propagate txs with Mevlink

To show how you can use transaction propagation as part of Mevlink streamer-go we'll continue working off of the /mevlink/stream.go code we worked through in the Transaction Streamstep.

This is what stream.go looks like at this point:

// mevlink/stream.go

package main

import (
	"encoding/hex"
	"log"
	"time"

	mlstreamer "github.com/mevlink/streamer-go"
	"golang.org/x/crypto/sha3"
)

func main() {

    // create streamer
    var str = mlstreamer.NewStreamer("<api-key-id>", "<api-key-secret>", 1)
    
    // transaction callback
    str.OnTransaction(func(txb []byte, noticed, propagated time.Time) {
        
        //Getting the transaction hash and printing the relevant times
        var hasher = sha3.NewLegacyKeccak256()
        hasher.Write(txb)
        var tx_hash = hasher.Sum(nil)

        log.Println("Got tx '" + hex.EncodeToString(tx_hash) + "'! Was noticed on ", noticed, "and sent on", propagated)
    })

    log.Fatal(str.Stream())
}

New imports

We'll import a few more packages to help encode / decode the RLP in case you'd like to do some validation.

// mevlink/stream.go

package main

import (
	"log"
	"time"

	"github.com/ethereum/go-ethereum/core/types"
	"github.com/ethereum/go-ethereum/rlp"
	mlstreamer "github.com/mevlink/streamer-go"
)

Decoding

Next we'll use the imported go-ethereum packages to decode incoming transactions in the str.OnTransaction callback.

// mevlink/stream.go

// transaction callback
str.OnTransaction(func(txb []byte, hash mlstreamergo.NullableHash, noticed time.Time, propagated time.Time) {
    
	// Parsing the transaction
	tx := new(types.Transaction)
	err := rlp.DecodeBytes(txb, &tx)

	if err != nil {
		log.Println("error decoding ml tx", err)
	} else {
		log.Println("got tx '"+tx.Hash().String()+"'! noticed on ", noticed, "and sent on", propagated)
	}
	
})

Propagation

To test propagation, we'll add a little call to emmit two transactions and close the streamer once those have been emitted as well as log any emission errors we get.

We'll first add a counter where we instantiate the streamer:

// mevlink/stream.go

// create streamer
var str = mlstreamer.NewStreamer("<api-key-id>", "<api-key-secret>", 1)

emittedTxs := 0

And then we'll add the check and emission call in the transaction callback:

// mevlink/stream.go

// transaction callback
str.OnTransaction(func(txb []byte, hash mlstreamergo.NullableHash, noticed time.Time, propagated time.Time) {
	// Parsing the transaction
	tx := new(types.Transaction)
	err := rlp.DecodeBytes(txb, &tx)

	if err != nil {
		log.Println("error decoding ml tx", err)
	} else {
		log.Println("got tx '"+tx.Hash().String()+"'! noticed on ", noticed, "and sent on", propagated)
		if emittedTxs < 2 {
			str.EmitTransaction(txb)
			if err != nil {
				log.Println("error emitting tx: "+tx.Hash().String(), err)
			}
			emittedTxs++
		} else {
			str.Stop()
			log.Println("Done emitting test transactions.")
		}
	}
})

Full Snippet

// mevlink/stream.go

package main

import (
	"encoding/hex"
	"log"
	"time"

	mlstreamer "github.com/mevlink/streamer-go"
	"golang.org/x/crypto/sha3"
)

func main() {

    // create streamer
    var str = mlstreamer.NewStreamer("<api-key-id>", "<api-key-secret>", 1)
    
    // transaction callback
    str.OnTransaction(func(txb []byte, hash mlstreamergo.NullableHash, noticed time.Time, propagated time.Time) {

        // Parsing the transaction
	tx := new(types.Transaction)
	err := rlp.DecodeBytes(txb, &tx)

	if err != nil {
		log.Println("error decoding ml tx", err)
	} else {
		log.Println("got tx '"+tx.Hash().String()+"'! noticed on ", noticed, "and sent on", propagated)
		if emittedTxs < 2 {
			str.EmitTransaction(txb)
			if err != nil {
				log.Println("error emitting tx: "+tx.Hash().String(), err)
			}
			emittedTxs++
		} else {
			str.Stop()
			log.Println("Done emitting test transactions.")
		}
	}
    })

    log.Fatal(str.Stream())
}

Last updated