Modify makeFullNode()

Next, we'll modify cmd/geth/config.go so that we instantiate the Mevlink node at the same time and pass it to RegisterEthService.

Imports

First we'll add the following packages to the import block:

"time"
mlstreamergo "github.com/mevlink/streamer-go"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"

makeFullNode()

First we'll create the streamer with mlstreamergo.NewStreamer and then define the on transaction callback so that it adds incoming transactions to Geth's transaction pool. Finally we'll have it return the stream so we can start it after Geth's other services have begun.

// cmd/geth/config.go

func makeFullNode(ctx *cli.Context) (*node.Node, ethapi.Backend, *mlstreamergo.Streamer) {
    
    // ...

    // Configure the Mevlink Streamer
	mevlinkKeyId := os.Getenv("MEVLINK_API_KEY_ID")
	mevlinkKeySecret := os.Getenv("MEVLINK_API_KEY_SECRET")
	if mevlinkKeyId == "" || mevlinkKeySecret == "" {
		log.Error("Mevlink API Key environment variables not set")
	}
	var mlstream = mlstreamergo.NewStreamer(mevlinkKeyId, mevlinkKeySecret, 1)

	backend, eth := utils.RegisterEthService(stack, &cfg.Eth, mlstream)

	// Configure the mevlink onTransaction callback
	mlstream.OnTransaction(func(txb []byte, hash mlstreamergo.NullableHash, noticed time.Time, propagated time.Time) {
		go func() {
			if eth.Synced() {
				tx := new(types.Transaction)
				err := rlp.DecodeBytes(txb, &tx)
				if err != nil {
					log.Error("[ mevlink-streamer ] error decoding ml tx")
				} else {
					validationErrors := eth.TxPool().AddRemotes([]*types.Transaction{tx})
					if validationErrors[0] == nil {
						log.Info("[ mevlink-streamer ] added tx", "hash", tx.Hash(), "noticed", noticed, "propegated", propagated)
					} else {
						fmt.Println(validationErrors)
						log.Info("[ mevlink-streamer ] benign err adding tx", "hash", tx.Hash(), "err", validationErrors[0])
					}
				}
			}
		}()
	})
	
	// ...

	return stack, backend, mlstream

}
    

Last updated