no1b4me's picture
Upload 5037 files
95f4e64 verified
import net from 'net' // browser exclude
import debugFactory from 'debug'
import queueMicrotask from 'queue-microtask'
import Peer from './peer.js'
import utp from './utp.cjs' // browser exclude
const debug = debugFactory('webtorrent:conn-pool')
/**
* Connection Pool
*
* A connection pool allows multiple swarms to listen on the same TCP/UDP port and determines
* which swarm incoming connections are intended for by inspecting the bittorrent
* handshake that the remote peer sends.
*
* @param {number} port
*/
export default class ConnPool {
constructor (client) {
debug('create pool (port %s)', client.torrentPort)
this._client = client
// Temporarily store incoming connections so they can be destroyed if the server is
// closed before the connection is passed off to a Torrent.
this._pendingConns = new Set()
this._onTCPConnectionBound = (conn) => {
this._onConnection(conn, 'tcp')
}
this._onUTPConnectionBound = (conn) => {
this._onConnection(conn, 'utp')
}
this._onListening = () => {
this._client._onListening()
}
this._onTCPError = (err) => {
this._client._destroy(err)
}
this._onUTPError = () => {
this._client.utp = false
}
// Setup TCP
this.tcpServer = net.createServer()
this.tcpServer.on('connection', this._onTCPConnectionBound)
this.tcpServer.on('error', this._onTCPError)
// Start TCP
this.tcpServer.listen(client.torrentPort, () => {
debug('creating tcpServer in port %s', this.tcpServer.address().port)
if (this._client.utp) {
// Setup uTP
this.utpServer = utp.createServer()
this.utpServer.on('connection', this._onUTPConnectionBound)
this.utpServer.on('listening', this._onListening)
this.utpServer.on('error', this._onUTPError)
// Start uTP
debug('creating utpServer in port %s', this.tcpServer.address().port)
this.utpServer.listen(this.tcpServer.address().port)
} else {
this._onListening()
}
})
}
/**
* Destroy this Conn pool.
* @param {function} cb
*/
destroy (cb) {
debug('destroy conn pool')
if (this.utpServer) {
this.utpServer.removeListener('connection', this._onUTPConnectionBound)
this.utpServer.removeListener('listening', this._onListening)
this.utpServer.removeListener('error', this._onUTPError)
}
this.tcpServer.removeListener('connection', this._onTCPConnectionBound)
this.tcpServer.removeListener('error', this._onTCPError)
// Destroy all open connection objects so server can close gracefully without waiting
// for connection timeout or remote peer to disconnect.
this._pendingConns.forEach((conn) => {
conn.on('error', noop)
conn.destroy()
})
if (this.utpServer) {
try {
this.utpServer.close(cb)
} catch (err) {
if (cb) queueMicrotask(cb)
}
}
try {
this.tcpServer.close(cb)
} catch (err) {
if (cb) queueMicrotask(cb)
}
this.tcpServer = null
this.utpServer = null
this._client = null
this._pendingConns = null
}
/**
* On incoming connections, we expect the remote peer to send a handshake first. Based
* on the infoHash in that handshake, route the peer to the right swarm.
*/
_onConnection (conn, type) {
const self = this
// If the connection has already been closed before the `connect` event is fired,
// then `remoteAddress` will not be available, and we can't use this connection.
// - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566
// - WebTorrent issue: https://github.com/webtorrent/webtorrent/issues/398
if (!conn.remoteAddress) {
conn.on('error', noop)
conn.destroy()
return
}
self._pendingConns.add(conn)
conn.once('close', cleanupPending)
const peer = type === 'utp'
? Peer.createUTPIncomingPeer(conn, this._client.throttleGroups)
: Peer.createTCPIncomingPeer(conn, this._client.throttleGroups)
const wire = peer.wire
wire.once('pe3', onPe3)
wire.once('handshake', onHandshake)
async function onPe3 (infoHashHash) {
const torrent = await self._client._getByHash(infoHashHash)
if (torrent) {
peer.swarm = torrent
torrent._addIncomingPeer(peer)
peer.onPe3(infoHashHash)
} else {
peer.destroy(new Error(`Unexpected info hash hash ${infoHashHash} from incoming peer ${peer.id}`))
}
}
async function onHandshake (infoHash, peerId) {
cleanupPending()
const torrent = await self._client.get(infoHash)
// only add incoming peer if didn't already do so in protocol encryption handshake
if (torrent) {
if (!peer.swarm) {
peer.swarm = torrent
torrent._addIncomingPeer(peer)
}
peer.onHandshake(infoHash, peerId)
} else {
const err = new Error(
`Unexpected info hash ${infoHash} from incoming peer ${peer.id}`
)
peer.destroy(err)
}
}
function cleanupPending () {
conn.removeListener('close', cleanupPending)
wire.removeListener('handshake', onHandshake)
if (self._pendingConns) {
self._pendingConns.delete(conn)
}
}
}
}
ConnPool.UTP_SUPPORT = Object.keys(utp).length > 0
function noop () {}