Copy "time"
mlstreamergo "github.com/mevlink/streamer-go"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/rlp"
Copy // cmd/geth/config.go
func makeFullNode (ctx * cli . Context ) ( * node . Node , ethapi . Backend , * mlstreamergo . Streamer ) {
// ...
// Configure the Mevlink Streamer
mevlinkKeyId := os. Getenv ( "MEVLINK_API_KEY_ID" )
mevlinkKeySecret := os. Getenv ( "MEVLINK_API_KEY_SECRET" )
if mevlinkKeyId == "" || mevlinkKeySecret == "" {
log. Error ( "Mevlink API Key environment variables not set" )
}
var mlstream = mlstreamergo. NewStreamer (mevlinkKeyId, mevlinkKeySecret, 1 )
backend, eth := utils. RegisterEthService (stack, & cfg.Eth, mlstream)
// Configure the mevlink onTransaction callback
mlstream. OnTransaction ( func (txb [] byte , hash mlstreamergo . NullableHash , noticed time . Time , propagated time . Time ) {
go func () {
if eth. Synced () {
tx := new ( types . Transaction )
err := rlp. DecodeBytes (txb, & tx)
if err != nil {
log. Error ( "[ mevlink-streamer ] error decoding ml tx" )
} else {
validationErrors := eth. TxPool (). AddRemotes ([] * types . Transaction {tx})
if validationErrors[ 0 ] == nil {
log. Info ( "[ mevlink-streamer ] added tx" , "hash" , tx. Hash (), "noticed" , noticed, "propegated" , propagated)
} else {
fmt. Println (validationErrors)
log. Info ( "[ mevlink-streamer ] benign err adding tx" , "hash" , tx. Hash (), "err" , validationErrors[ 0 ])
}
}
}
}()
})
// ...
return stack, backend, mlstream
}