In either go-ethereum or your own geth fork, we're going to add a Mevlink Stream instance to eth/backend.go
. First add Mevlink's streamer-go to the project with
Copy go get github.com/mevlink/streamer-go
and import it as mlstreamergo "github.com/mevlink/streamer-go"
at the end of your import block.
Then, modify the New()
function so that it accepts one more parameter, a mlstream
of type *mlstreamergo.Streamer
:
Copy func New (stack * node . Node , config * ethconfig . Config , mlstream * mlstreamergo . Streamer ) ( * Ethereum , error ) {
Now that we have the mlstream
, change both the Ethereum
struct and the eth
definition so that they now define the mlstream:
Copy // eth/backend.go
type Ethereum struct {
// ...
// mevlink additions
mlstream * mlstreamergo . Streamer
}
// ...
eth := & Ethereum {
config: config,
merger: consensus. NewMerger (chainDb),
chainDb: chainDb,
eventMux: stack. EventMux (),
accountManager: stack. AccountManager (),
engine: engine,
closeBloomHandler: make ( chan struct {}),
networkID: config.NetworkId,
gasPrice: config.Miner.GasPrice,
etherbase: config.Miner.Etherbase,
bloomRequests: make ( chan chan * bloombits . Retrieval ),
bloomIndexer: core. NewBloomIndexer (chainDb, params.BloomBitsBlocks, params.BloomConfirms),
p2pServer: stack. Server (),
shutdownTracker: shutdowncheck. NewShutdownTracker (chainDb),
mlstream: mlstream,
}
// ...
Stop Mevlink
Finally, in the Stop()
definition, add a call to eth.mlstream.Stop()
so that the mevlink service also stops gracefully:
Copy // Stop mevlink service
s.mlstream. Stop ()
eth/backend.go modifications
Here's a what the file looks like after those changes are applied:
Copy // eth/backend.go
import (
// ...
mlstreamergo "github.com/mevlink/streamer-go"
)
// ...
// Ethereum implements the Ethereum full node service.
type Ethereum struct {
config * ethconfig . Config
// Handlers
txPool * txpool . TxPool
blockchain * core . BlockChain
handler * handler
ethDialCandidates enode . Iterator
snapDialCandidates enode . Iterator
merger * consensus . Merger
// DB interfaces
chainDb ethdb . Database // Block chain database
eventMux * event . TypeMux
engine consensus . Engine
accountManager * accounts . Manager
bloomRequests chan chan * bloombits . Retrieval // Channel receiving bloom data retrieval requests
bloomIndexer * core . ChainIndexer // Bloom indexer operating during block imports
closeBloomHandler chan struct {}
APIBackend * EthAPIBackend
miner * miner . Miner
gasPrice * big . Int
etherbase common . Address
networkID uint64
netRPCService * ethapi . NetAPI
p2pServer * p2p . Server
lock sync . RWMutex // Protects the variadic fields (e.g. gas price and etherbase)
shutdownTracker * shutdowncheck . ShutdownTracker // Tracks if and when the node has shutdown ungracefully
// mevlink additions
mlstream * mlstreamergo . Streamer
}
// ...
func New (stack * node . Node , config * ethconfig . Config , mlstream * mlstreamergo . Streamer ) ( * Ethereum , error ) {
// ...
eth := & Ethereum {
config: config,
merger: consensus. NewMerger (chainDb),
chainDb: chainDb,
eventMux: stack. EventMux (),
accountManager: stack. AccountManager (),
engine: engine,
closeBloomHandler: make ( chan struct {}),
networkID: config.NetworkId,
gasPrice: config.Miner.GasPrice,
etherbase: config.Miner.Etherbase,
bloomRequests: make ( chan chan * bloombits . Retrieval ),
bloomIndexer: core. NewBloomIndexer (chainDb, params.BloomBitsBlocks, params.BloomConfirms),
p2pServer: stack. Server (),
shutdownTracker: shutdowncheck. NewShutdownTracker (chainDb),
mlstream: mlstream,
}
// ...
}
// ...
func (s * Ethereum ) Stop () error {
// ...
// Stop mevlink service
s.mlstream. Stop ()
// ...
}