Add Mevlink to Eth Backend

In either go-ethereum or your own geth fork, we're going to add a Mevlink Stream instance to eth/backend.go. First add Mevlink's streamer-go to the project with

go get github.com/mevlink/streamer-go

and import it as mlstreamergo "github.com/mevlink/streamer-go" at the end of your import block.

Then, modify the New() function so that it accepts one more parameter, a mlstream of type *mlstreamergo.Streamer :

func New(stack *node.Node, config *ethconfig.Config, mlstream *mlstreamergo.Streamer) (*Ethereum, error) {

Now that we have the mlstream, change both the Ethereum struct and the eth definition so that they now define the mlstream:

// eth/backend.go

type Ethereum struct {
	
	// ...
	
	// mevlink additions
	mlstream *mlstreamergo.Streamer
}

// ...

eth := &Ethereum{
	config:            config,
	merger:            consensus.NewMerger(chainDb),
	chainDb:           chainDb,
	eventMux:          stack.EventMux(),
	accountManager:    stack.AccountManager(),
	engine:            engine,
	closeBloomHandler: make(chan struct{}),
	networkID:         config.NetworkId,
	gasPrice:          config.Miner.GasPrice,
	etherbase:         config.Miner.Etherbase,
	bloomRequests:     make(chan chan *bloombits.Retrieval),
	bloomIndexer:      core.NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
	p2pServer:         stack.Server(),
	shutdownTracker:   shutdowncheck.NewShutdownTracker(chainDb),
	mlstream:          mlstream,
}

// ...

Finally, in the Stop() definition, add a call to eth.mlstream.Stop() so that the mevlink service also stops gracefully:

// Stop mevlink service
s.mlstream.Stop()

eth/backend.go modifications

Here's a what the file looks like after those changes are applied:

// eth/backend.go

import (
	// ...
	mlstreamergo "github.com/mevlink/streamer-go"
)

// ...

// Ethereum implements the Ethereum full node service.
type Ethereum struct {
	config *ethconfig.Config

	// Handlers
	txPool             *txpool.TxPool
	blockchain         *core.BlockChain
	handler            *handler
	ethDialCandidates  enode.Iterator
	snapDialCandidates enode.Iterator
	merger             *consensus.Merger

	// DB interfaces
	chainDb ethdb.Database // Block chain database

	eventMux       *event.TypeMux
	engine         consensus.Engine
	accountManager *accounts.Manager

	bloomRequests     chan chan *bloombits.Retrieval // Channel receiving bloom data retrieval requests
	bloomIndexer      *core.ChainIndexer             // Bloom indexer operating during block imports
	closeBloomHandler chan struct{}

	APIBackend *EthAPIBackend

	miner     *miner.Miner
	gasPrice  *big.Int
	etherbase common.Address

	networkID     uint64
	netRPCService *ethapi.NetAPI

	p2pServer *p2p.Server

	lock sync.RWMutex // Protects the variadic fields (e.g. gas price and etherbase)

	shutdownTracker *shutdowncheck.ShutdownTracker // Tracks if and when the node has shutdown ungracefully

	// mevlink additions
	mlstream *mlstreamergo.Streamer
}

// ...

func New(stack *node.Node, config *ethconfig.Config, mlstream *mlstreamergo.Streamer) (*Ethereum, error) {
	
	// ... 
	
	eth := &Ethereum{
		config:            config,
		merger:            consensus.NewMerger(chainDb),
		chainDb:           chainDb,
		eventMux:          stack.EventMux(),
		accountManager:    stack.AccountManager(),
		engine:            engine,
		closeBloomHandler: make(chan struct{}),
		networkID:         config.NetworkId,
		gasPrice:          config.Miner.GasPrice,
		etherbase:         config.Miner.Etherbase,
		bloomRequests:     make(chan chan *bloombits.Retrieval),
		bloomIndexer:      core.NewBloomIndexer(chainDb, params.BloomBitsBlocks, params.BloomConfirms),
		p2pServer:         stack.Server(),
		shutdownTracker:   shutdowncheck.NewShutdownTracker(chainDb),
		mlstream:          mlstream,
	}
	
	// ...
	
}

// ...

func (s *Ethereum) Stop() error {

	// ... 
	
	// Stop mevlink service
	s.mlstream.Stop()

	// ...
	
}

Last updated