Copy "time"
mlstreamergo "github.com/mevlink/streamer-go"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
Copy // cmd/geth/config.go
func makeFullNode (ctx * cli . Context ) ( * node . Node , ethapi . Backend , * mlstreamergo . Streamer ) {
// ...
// Configure the Mevlink Streamer
mevlinkKeyId := os.Getenv( "MEVLINK_API_KEY_ID" )
mevlinkKeySecret := os.Getenv( "MEVLINK_API_KEY_SECRET" )
if mevlinkKeyId == "" || mevlinkKeySecret == "" {
log.Error( "Mevlink API Key environment variables not set" )
}
var mlstream = mlstreamergo.NewStreamer(mevlinkKeyId, mevlinkKeySecret, 1 )
backend, eth := utils.RegisterEthService(stack, & cfg.Eth, mlstream)
// Configure the mevlink onTransaction callback
mlstream.OnTransaction( func (txb [] byte , hash mlstreamergo . NullableHash , noticed time . Time , propagated time . Time ) {
go func () {
if eth.Synced() {
tx := new ( types . Transaction )
err := rlp.DecodeBytes(txb, & tx)
if err != nil {
log.Error( "[ mevlink-streamer ] error decoding ml tx" )
} else {
validationErrors := eth.TxPool().AddRemotes([] * types . Transaction {tx})
if validationErrors[ 0 ] == nil {
log.Info( "[ mevlink-streamer ] added tx" , "hash" , tx.Hash(), "noticed" , noticed, "propegated" , propagated)
} else {
fmt.Println(validationErrors)
log.Info( "[ mevlink-streamer ] benign err adding tx" , "hash" , tx.Hash(), "err" , validationErrors[ 0 ])
}
}
}
}()
})
// ...
return stack, backend, mlstream
}