tlnbase: split out BOLT-08 (Noise) implementation - electrum - Electrum Bitcoin wallet
 (HTM) git clone https://git.parazyd.org/electrum
 (DIR) Log
 (DIR) Files
 (DIR) Refs
 (DIR) Submodules
       ---
 (DIR) commit 71afa3cc70ab22aeed8ce8b6bb19f78d29d1ec53
 (DIR) parent 0578bbd5d0ac07323bb96d6bc0fa9a5fc4e64064
 (HTM) Author: Janus <ysangkok@gmail.com>
       Date:   Sun, 14 Oct 2018 22:36:23 +0200
       
       lnbase: split out BOLT-08 (Noise) implementation
       
       Diffstat:
         M electrum/gui/qt/main_window.py      |       4 ++--
         M electrum/lnbase.py                  |     240 +++++++++++++++----------------
         M electrum/lnworker.py                |      15 +++++++++++----
       
       3 files changed, 132 insertions(+), 127 deletions(-)
       ---
 (DIR) diff --git a/electrum/gui/qt/main_window.py b/electrum/gui/qt/main_window.py
       t@@ -1854,10 +1854,10 @@ class ElectrumWindow(QMainWindow, MessageBoxMixin, Logger):
            def open_channel(self, *args, **kwargs):
                def task():
                    return self.wallet.lnworker.open_channel(*args, **kwargs)
       -        def on_success(chan):
       +        def on_success(node_id):
                    self.show_message('\n'.join([
                        _('Channel established.'),
       -                _('Remote peer ID') + ':' + bh2u(chan.node_id),
       +                _('Remote peer ID') + ':' + node_id,
                        _('This channel will be usable after 3 confirmations')
                    ]))
                WaitingDialog(self, _('Opening channel...'), task, on_success, self.on_error)
 (DIR) diff --git a/electrum/lnbase.py b/electrum/lnbase.py
       t@@ -266,53 +266,14 @@ def create_ephemeral_key() -> (bytes, bytes):
            privkey = ecc.ECPrivkey.generate_random_key()
            return privkey.get_secret_bytes(), privkey.get_public_key_bytes()
        
       -
       -class Peer(PrintError):
       -
       -    def __init__(self, lnworker, host, port, pubkey, request_initial_sync=False):
       -        self.host = host
       -        self.port = port
       -        self.pubkey = pubkey
       -        self.peer_addr = LNPeerAddr(host, port, pubkey)
       -        self.lnworker = lnworker
       -        self.privkey = lnworker.node_keypair.privkey
       -        self.network = lnworker.network
       -        self.lnwatcher = lnworker.network.lnwatcher
       -        self.channel_db = lnworker.network.channel_db
       -        self.read_buffer = b''
       -        self.ping_time = 0
       -        self.initialized = asyncio.Future()
       -        self.channel_accepted = defaultdict(asyncio.Queue)
       -        self.channel_reestablished = defaultdict(asyncio.Future)
       -        self.funding_signed = defaultdict(asyncio.Queue)
       -        self.funding_created = defaultdict(asyncio.Queue)
       -        self.revoke_and_ack = defaultdict(asyncio.Queue)
       -        self.commitment_signed = defaultdict(asyncio.Queue)
       -        self.announcement_signatures = defaultdict(asyncio.Queue)
       -        self.closing_signed = defaultdict(asyncio.Queue)
       -        self.payment_preimages = defaultdict(asyncio.Queue)
       -        self.localfeatures = LnLocalFeatures(0)
       -        if request_initial_sync:
       -            self.localfeatures |= LnLocalFeatures.INITIAL_ROUTING_SYNC
       -        self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_OPT
       -        self.invoices = lnworker.invoices
       -        self.attempted_route = {}
       -
       -    @property
       -    def channels(self):
       -        return self.lnworker.channels_for_peer(self.pubkey)
       -
       -    def diagnostic_name(self):
       -        return 'lnbase:' + str(self.host)
       -
       -    def ping_if_required(self):
       -        if time.time() - self.ping_time > 120:
       -            self.send_message(gen_msg('ping', num_pong_bytes=4, byteslen=4))
       -            self.ping_time = time.time()
       -
       -    def send_message(self, msg):
       -        message_type, payload = decode_msg(msg)
       -        self.print_error("Sending '%s'"%message_type.upper())
       +class InitiatorSession:
       +    def __init__(self, privkey, remote_pubkey, reader, writer):
       +        self.privkey = privkey
       +        self.remote_pubkey = remote_pubkey
       +        self.reader = reader
       +        self.writer = writer
       +
       +    def send_bytes(self, msg):
                l = len(msg).to_bytes(2, 'big')
                lc = aead_encrypt(self.sk, self.sn(), b'', l)
                c = aead_encrypt(self.sk, self.sn(), b'', msg)
       t@@ -320,30 +281,33 @@ class Peer(PrintError):
                assert len(c) == len(msg) + 16
                self.writer.write(lc+c)
        
       -    async def read_message(self):
       -        rn_l, rk_l = self.rn()
       -        rn_m, rk_m = self.rn()
       +    async def read_messages(self):
       +        read_buffer = b''
                while True:
       -            if len(self.read_buffer) >= 18:
       -                lc = self.read_buffer[:18]
       -                l = aead_decrypt(rk_l, rn_l, b'', lc)
       -                length = int.from_bytes(l, 'big')
       -                offset = 18 + length + 16
       -                if len(self.read_buffer) >= offset:
       -                    c = self.read_buffer[18:offset]
       -                    self.read_buffer = self.read_buffer[offset:]
       -                    msg = aead_decrypt(rk_m, rn_m, b'', c)
       -                    return msg
       -            try:
       -                s = await self.reader.read(2**10)
       -            except:
       -                s = None
       -            if not s:
       -                raise LightningPeerConnectionClosed()
       -            self.read_buffer += s
       +            rn_l, rk_l = self.rn()
       +            rn_m, rk_m = self.rn()
       +            while True:
       +                if len(read_buffer) >= 18:
       +                    lc = read_buffer[:18]
       +                    l = aead_decrypt(rk_l, rn_l, b'', lc)
       +                    length = int.from_bytes(l, 'big')
       +                    offset = 18 + length + 16
       +                    if len(read_buffer) >= offset:
       +                        c = read_buffer[18:offset]
       +                        read_buffer = read_buffer[offset:]
       +                        msg = aead_decrypt(rk_m, rn_m, b'', c)
       +                        yield msg
       +                        break
       +                try:
       +                    s = await self.reader.read(2**10)
       +                except:
       +                    s = None
       +                if not s:
       +                    raise LightningPeerConnectionClosed()
       +                read_buffer += s
        
            async def handshake(self):
       -        hs = HandshakeState(self.pubkey)
       +        hs = HandshakeState(self.remote_pubkey)
                # Get a new ephemeral key
                epriv, epub = create_ephemeral_key()
        
       t@@ -396,6 +360,57 @@ class Peer(PrintError):
                    self._sn = 0
                return o
        
       +
       +class Peer(PrintError):
       +
       +    def __init__(self, lnworker, peer_addr, request_initial_sync=False):
       +        self.initialized = asyncio.Future()
       +        self.transport = None
       +        self.peer_addr = peer_addr
       +        self.lnworker = lnworker
       +        self.privkey = lnworker.node_keypair.privkey
       +        self.network = lnworker.network
       +        self.lnwatcher = lnworker.network.lnwatcher
       +        self.channel_db = lnworker.network.channel_db
       +        self.ping_time = 0
       +        self.channel_accepted = defaultdict(asyncio.Queue)
       +        self.channel_reestablished = defaultdict(asyncio.Future)
       +        self.funding_signed = defaultdict(asyncio.Queue)
       +        self.funding_created = defaultdict(asyncio.Queue)
       +        self.revoke_and_ack = defaultdict(asyncio.Queue)
       +        self.commitment_signed = defaultdict(asyncio.Queue)
       +        self.announcement_signatures = defaultdict(asyncio.Queue)
       +        self.closing_signed = defaultdict(asyncio.Queue)
       +        self.payment_preimages = defaultdict(asyncio.Queue)
       +        self.localfeatures = LnLocalFeatures(0)
       +        if request_initial_sync:
       +            self.localfeatures |= LnLocalFeatures.INITIAL_ROUTING_SYNC
       +        self.localfeatures |= LnLocalFeatures.OPTION_DATA_LOSS_PROTECT_OPT
       +        self.invoices = lnworker.invoices
       +        self.attempted_route = {}
       +
       +    def send_message(self, message_name, **kwargs):
       +        assert type(message_name) is str
       +        self.print_error("Sending '%s'"%message_name.upper())
       +        self.transport.send_bytes(gen_msg(message_name, **kwargs))
       +
       +    async def initialize(self):
       +        await self.transport.handshake()
       +        self.send_message("init", gflen=0, lflen=1, localfeatures=self.localfeatures)
       +        self.initialized.set_result(True)
       +
       +    @property
       +    def channels(self):
       +        return self.lnworker.channels_for_peer(self.peer_addr.pubkey)
       +
       +    def diagnostic_name(self):
       +        return 'lnbase:' + str(self.peer_addr.host)
       +
       +    def ping_if_required(self):
       +        if time.time() - self.ping_time > 120:
       +            self.send_message('ping', num_pong_bytes=4, byteslen=4)
       +            self.ping_time = time.time()
       +
            def process_message(self, message):
                message_type, payload = decode_msg(message)
                try:
       t@@ -421,7 +436,7 @@ class Peer(PrintError):
        
            def on_ping(self, payload):
                l = int.from_bytes(payload['num_pong_bytes'], 'big')
       -        self.send_message(gen_msg('pong', byteslen=l))
       +        self.send_message('pong', byteslen=l)
        
            def on_pong(self, payload):
                pass
       t@@ -484,16 +499,6 @@ class Peer(PrintError):
                else:
                    self.announcement_signatures[channel_id].put_nowait(payload)
        
       -    async def initialize(self):
       -        self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
       -        await self.handshake()
       -        # send init
       -        self.send_message(gen_msg("init", gflen=0, lflen=1, localfeatures=self.localfeatures))
       -        # read init
       -        msg = await self.read_message()
       -        self.process_message(msg)
       -        self.initialized.set_result(True)
       -
            def handle_disconnect(func):
                async def wrapper_func(self, *args, **kwargs):
                    try:
       t@@ -502,7 +507,7 @@ class Peer(PrintError):
                        self.print_error("disconnecting gracefully. {}".format(e))
                    finally:
                        self.close_and_cleanup()
       -                self.lnworker.peers.pop(self.pubkey)
       +                self.lnworker.peers.pop(self.peer_addr.pubkey)
                return wrapper_func
        
            @ignore_exceptions  # do not kill main_taskgroup
       t@@ -516,10 +521,9 @@ class Peer(PrintError):
                    return
                self.channel_db.add_recent_peer(self.peer_addr)
                # loop
       -        while True:
       -            self.ping_if_required()
       -            msg = await self.read_message()
       +        async for msg in self.transport.read_messages():
                    self.process_message(msg)
       +            self.ping_if_required()
        
            def close_and_cleanup(self):
                try:
       t@@ -564,7 +568,7 @@ class Peer(PrintError):
                # for the first commitment transaction
                per_commitment_secret_first = get_per_commitment_secret_from_seed(per_commitment_secret_seed, RevocationStore.START_INDEX)
                per_commitment_point_first = secret_to_pubkey(int.from_bytes(per_commitment_secret_first, 'big'))
       -        msg = gen_msg(
       +        self.send_message(
                    "open_channel",
                    temporary_channel_id=temp_channel_id,
                    chain_hash=constants.net.rev_genesis_bytes(),
       t@@ -584,7 +588,6 @@ class Peer(PrintError):
                    channel_flags=0x00,  # not willing to announce channel
                    channel_reserve_satoshis=546
                )
       -        self.send_message(msg)
                payload = await self.channel_accepted[temp_channel_id].get()
                if payload.get('error'):
                    raise Exception(payload.get('error'))
       t@@ -625,7 +628,7 @@ class Peer(PrintError):
                # remote commitment transaction
                channel_id, funding_txid_bytes = channel_id_from_funding_tx(funding_txid, funding_index)
                chan = {
       -                "node_id": self.pubkey,
       +                "node_id": self.peer_addr.pubkey,
                        "channel_id": channel_id,
                        "short_channel_id": None,
                        "funding_outpoint": Outpoint(funding_txid, funding_index),
       t@@ -645,11 +648,11 @@ class Peer(PrintError):
                m.lnwatcher = self.lnwatcher
                m.sweep_address = self.lnworker.sweep_address
                sig_64, _ = m.sign_next_commitment()
       -        self.send_message(gen_msg("funding_created",
       +        self.send_message("funding_created",
                    temporary_channel_id=temp_channel_id,
                    funding_txid=funding_txid_bytes,
                    funding_output_index=funding_index,
       -            signature=sig_64))
       +            signature=sig_64)
                payload = await self.funding_signed[channel_id].get()
                self.print_error('received funding_signed')
                remote_sig = payload['signature']
       t@@ -679,7 +682,7 @@ class Peer(PrintError):
                per_commitment_point_first = secret_to_pubkey(int.from_bytes(per_commitment_secret_first, 'big'))
        
                min_depth = 3
       -        self.send_message(gen_msg('accept_channel',
       +        self.send_message('accept_channel',
                    temporary_channel_id=temp_chan_id,
                    dust_limit_satoshis=local_config.dust_limit_sat,
                    max_htlc_value_in_flight_msat=local_config.max_htlc_value_in_flight_msat,
       t@@ -694,7 +697,7 @@ class Peer(PrintError):
                    delayed_payment_basepoint=local_config.delayed_basepoint.pubkey,
                    htlc_basepoint=local_config.htlc_basepoint.pubkey,
                    first_per_commitment_point=per_commitment_point_first,
       -        ))
       +        )
                funding_created = await self.funding_created[temp_chan_id].get()
                funding_idx = int.from_bytes(funding_created['funding_output_index'], 'big')
                funding_txid = bh2u(funding_created['funding_txid'][::-1])
       t@@ -742,10 +745,10 @@ class Peer(PrintError):
                remote_sig = funding_created['signature']
                m.receive_new_commitment(remote_sig, [])
                sig_64, _ = m.sign_next_commitment()
       -        self.send_message(gen_msg('funding_signed',
       +        self.send_message('funding_signed',
                    channel_id=channel_id,
                    signature=sig_64,
       -        ))
       +        )
                m.set_state('OPENING')
                m.remote_commitment_to_be_revoked = m.pending_remote_commitment
                m.config[REMOTE] = m.config[REMOTE]._replace(ctn=0)
       t@@ -778,11 +781,11 @@ class Peer(PrintError):
                    return
                chan.set_state('REESTABLISHING')
                self.network.trigger_callback('channel', chan)
       -        self.send_message(gen_msg("channel_reestablish",
       +        self.send_message("channel_reestablish",
                    channel_id=chan_id,
                    next_local_commitment_number=chan.config[LOCAL].ctn+1,
                    next_remote_revocation_number=chan.config[REMOTE].ctn
       -        ))
       +        )
                await self.channel_reestablished[chan_id]
                chan.set_state('OPENING')
                if chan.config[LOCAL].funding_locked_received and chan.short_channel_id:
       t@@ -799,11 +802,11 @@ class Peer(PrintError):
        
                def try_to_get_remote_to_force_close_with_their_latest():
                    self.print_error("trying to get remote to force close", bh2u(chan_id))
       -            self.send_message(gen_msg("channel_reestablish",
       +            self.send_message("channel_reestablish",
                                              channel_id=chan_id,
                                              next_local_commitment_number=0,
                                              next_remote_revocation_number=0
       -                                      ))
       +                                      )
        
                channel_reestablish_msg = payload
                # compare remote ctns
       t@@ -854,7 +857,7 @@ class Peer(PrintError):
                per_commitment_point_second = secret_to_pubkey(int.from_bytes(
                    get_per_commitment_secret_from_seed(chan.config[LOCAL].per_commitment_secret_seed, per_commitment_secret_index), 'big'))
                # note: if funding_locked was not yet received, we might send it multiple times
       -        self.send_message(gen_msg("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second))
       +        self.send_message("funding_locked", channel_id=channel_id, next_per_commitment_point=per_commitment_point_second)
                if chan.config[LOCAL].funding_locked_received:
                    self.mark_open(chan)
        
       t@@ -903,7 +906,7 @@ class Peer(PrintError):
        
                node_sigs = [local_node_sig, remote_node_sig]
                bitcoin_sigs = [local_bitcoin_sig, remote_bitcoin_sig]
       -        node_ids = [privkey_to_pubkey(self.privkey), self.pubkey]
       +        node_ids = [privkey_to_pubkey(self.privkey), self.peer_addr.pubkey]
                bitcoin_keys = [chan.config[LOCAL].multisig_key.pubkey, chan.config[REMOTE].multisig_key.pubkey]
        
                if node_ids[0] > node_ids[1]:
       t@@ -912,7 +915,7 @@ class Peer(PrintError):
                    node_ids.reverse()
                    bitcoin_keys.reverse()
        
       -        channel_announcement = gen_msg("channel_announcement",
       +        self.send_message("channel_announcement",
                    node_signatures_1=node_sigs[0],
                    node_signatures_2=node_sigs[1],
                    bitcoin_signature_1=bitcoin_sigs[0],
       t@@ -927,8 +930,6 @@ class Peer(PrintError):
                    bitcoin_key_2=bitcoin_keys[1]
                )
        
       -        self.send_message(channel_announcement)
       -
                print("SENT CHANNEL ANNOUNCEMENT")
        
            def mark_open(self, chan):
       t@@ -940,7 +941,7 @@ class Peer(PrintError):
                self.network.trigger_callback('channel', chan)
                # add channel to database
                pubkey_ours = self.lnworker.node_keypair.pubkey
       -        pubkey_theirs = self.pubkey
       +        pubkey_theirs = self.peer_addr.pubkey
                node_ids = [pubkey_theirs, pubkey_ours]
                bitcoin_keys = [chan.config[LOCAL].multisig_key.pubkey, chan.config[REMOTE].multisig_key.pubkey]
                sorted_node_ids = list(sorted(node_ids))
       t@@ -968,8 +969,8 @@ class Peer(PrintError):
                # peer may have sent us a channel update for the incoming direction previously
                # note: if we were offline when the 3rd conf happened, lnd will never send us this channel_update
                # see https://github.com/lightningnetwork/lnd/issues/1347
       -        #self.send_message(gen_msg("query_short_channel_ids", chain_hash=constants.net.rev_genesis_bytes(),
       -        #                          len=9, encoded_short_ids=b'\x00'+chan.short_channel_id))
       +        #self.send_message("query_short_channel_ids", chain_hash=constants.net.rev_genesis_bytes(),
       +        #                          len=9, encoded_short_ids=b'\x00'+chan.short_channel_id)
                if hasattr(chan, 'pending_channel_update_message'):
                    self.on_channel_update(chan.pending_channel_update_message)
        
       t@@ -981,7 +982,7 @@ class Peer(PrintError):
                                chan.config[REMOTE].multisig_key.pubkey]
        
                node_ids = [privkey_to_pubkey(self.privkey),
       -                    self.pubkey]
       +                    self.peer_addr.pubkey]
        
                sorted_node_ids = list(sorted(node_ids))
                if sorted_node_ids != node_ids:
       t@@ -1002,12 +1003,12 @@ class Peer(PrintError):
                h = bitcoin.Hash(to_hash)
                bitcoin_signature = ecc.ECPrivkey(chan.config[LOCAL].multisig_key.privkey).sign(h, sig_string_from_r_and_s, get_r_and_s_from_sig_string)
                node_signature = ecc.ECPrivkey(self.privkey).sign(h, sig_string_from_r_and_s, get_r_and_s_from_sig_string)
       -        self.send_message(gen_msg("announcement_signatures",
       +        self.send_message("announcement_signatures",
                    channel_id=chan.channel_id,
                    short_channel_id=chan.short_channel_id,
                    node_signature=node_signature,
                    bitcoin_signature=bitcoin_signature
       -        ))
       +        )
        
                return h, node_signature, bitcoin_signature
        
       t@@ -1068,12 +1069,12 @@ class Peer(PrintError):
        
            def send_commitment(self, chan):
                sig_64, htlc_sigs = chan.sign_next_commitment()
       -        self.send_message(gen_msg("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs)))
       +        self.send_message("commitment_signed", channel_id=chan.channel_id, signature=sig_64, num_htlcs=len(htlc_sigs), htlc_signature=b"".join(htlc_sigs))
                return len(htlc_sigs)
        
       -    async def update_channel(self, chan, update):
       +    async def update_channel(self, chan, message_name, **kwargs):
                """ generic channel update flow """
       -        self.send_message(update)
       +        self.send_message(message_name, **kwargs)
                self.send_commitment(chan)
                await self.receive_revoke(chan)
                await self.receive_commitment(chan)
       t@@ -1114,9 +1115,8 @@ class Peer(PrintError):
                    raise PaymentFailure('not enough local balance')
                htlc_id = chan.add_htlc(htlc)
                chan.onion_keys[htlc_id] = secret_key
       -        update = gen_msg("update_add_htlc", channel_id=chan.channel_id, id=htlc_id, cltv_expiry=final_cltv_expiry_with_deltas, amount_msat=amount_msat, payment_hash=payment_hash, onion_routing_packet=onion.to_bytes())
                self.attempted_route[(chan.channel_id, htlc_id)] = route
       -        await self.update_channel(chan, update)
       +        await self.update_channel(chan, "update_add_htlc", channel_id=chan.channel_id, id=htlc_id, cltv_expiry=final_cltv_expiry_with_deltas, amount_msat=amount_msat, payment_hash=payment_hash, onion_routing_packet=onion.to_bytes())
        
            async def receive_revoke(self, m):
                revoke_and_ack_msg = await self.revoke_and_ack[m.channel_id].get()
       t@@ -1125,10 +1125,10 @@ class Peer(PrintError):
            def revoke(self, m):
                rev, _ = m.revoke_current_commitment()
                self.lnworker.save_channel(m)
       -        self.send_message(gen_msg("revoke_and_ack",
       +        self.send_message("revoke_and_ack",
                    channel_id=m.channel_id,
                    per_commitment_secret=rev.per_commitment_secret,
       -            next_per_commitment_point=rev.next_per_commitment_point))
       +            next_per_commitment_point=rev.next_per_commitment_point)
        
            async def receive_commitment(self, m, commitment_signed_msg=None):
                if commitment_signed_msg is None:
       t@@ -1158,8 +1158,7 @@ class Peer(PrintError):
                self.send_commitment(chan)
                await self.receive_revoke(chan)
                chan.settle_htlc(payment_preimage, htlc_id)
       -        fulfillment = gen_msg("update_fulfill_htlc", channel_id=channel_id, id=htlc_id, payment_preimage=payment_preimage)
       -        await self.update_channel(chan, fulfillment)
       +        await self.update_channel(chan, "update_fulfill_htlc", channel_id=channel_id, id=htlc_id, payment_preimage=payment_preimage)
                self.lnworker.save_channel(chan)
        
            def on_commitment_signed(self, payload):
       t@@ -1234,8 +1233,7 @@ class Peer(PrintError):
                else:
                    return
                chan.update_fee(feerate_per_kw)
       -        update = gen_msg("update_fee", channel_id=chan.channel_id, feerate_per_kw=feerate_per_kw)
       -        await self.update_channel(chan, update)
       +        await self.update_channel(chan, "update_fee", channel_id=chan.channel_id, feerate_per_kw=feerate_per_kw)
        
            def current_feerate_per_kw(self):
                if constants.net is constants.BitcoinRegtest:
       t@@ -1258,9 +1256,9 @@ class Peer(PrintError):
                    raise Exception('scriptpubkey length in received shutdown message invalid: ' + str(payload['len']))
                chan = self.channels[payload['channel_id']]
                scriptpubkey = bfh(bitcoin.address_to_script(chan.sweep_address))
       -        self.send_message(gen_msg('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey))
       +        self.send_message('shutdown', channel_id=chan.channel_id, len=len(scriptpubkey), scriptpubkey=scriptpubkey)
                signature, fee = chan.make_closing_tx(scriptpubkey, payload['scriptpubkey'])
       -        self.send_message(gen_msg('closing_signed', channel_id=chan.channel_id, fee_satoshis=fee, signature=signature))
       +        self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=fee, signature=signature)
                while chan.get_state() != 'CLOSED':
                    try:
                        closing_signed = await asyncio.wait_for(self.closing_signed[chan.channel_id].get(), 1)
       t@@ -1269,5 +1267,5 @@ class Peer(PrintError):
                    else:
                        fee = int.from_bytes(closing_signed['fee_satoshis'], 'big')
                        signature, _ = chan.make_closing_tx(scriptpubkey, payload['scriptpubkey'], fee_sat=fee)
       -                self.send_message(gen_msg('closing_signed', channel_id=chan.channel_id, fee_satoshis=fee, signature=signature))
       +                self.send_message('closing_signed', channel_id=chan.channel_id, fee_satoshis=fee, signature=signature)
                self.print_error('REMOTE PEER CLOSED CHANNEL')
 (DIR) diff --git a/electrum/lnworker.py b/electrum/lnworker.py
       t@@ -16,7 +16,7 @@ from . import bitcoin
        from .keystore import BIP32_KeyStore
        from .bitcoin import sha256, COIN
        from .util import bh2u, bfh, PrintError, InvoiceError, resolve_dns_srv, is_ip_address, log_exceptions
       -from .lnbase import Peer
       +from .lnbase import Peer, InitiatorSession
        from .lnaddr import lnencode, LnAddr, lndecode
        from .ecc import der_sig_from_sig_string
        from .lnchan import Channel
       t@@ -112,8 +112,14 @@ class LNWorker(PrintError):
                    return
                self._last_tried_peer[peer_addr] = time.time()
                self.print_error("adding peer", peer_addr)
       -        peer = Peer(self, host, port, node_id, request_initial_sync=self.config.get("request_initial_sync", True))
       -        asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(peer.main_loop()), self.network.asyncio_loop)
       +        fut = asyncio.ensure_future(asyncio.open_connection(peer_addr.host, peer_addr.port))
       +        def cb(fut):
       +            reader, writer = fut.result()
       +            transport = InitiatorSession(self.node_keypair.privkey, node_id, reader, writer)
       +            peer.transport = transport
       +            asyncio.run_coroutine_threadsafe(self.network.main_taskgroup.spawn(peer.main_loop()), self.network.asyncio_loop)
       +        fut.add_done_callback(cb)
       +        peer = Peer(self, peer_addr, request_initial_sync=self.config.get("request_initial_sync", True))
                self.peers[node_id] = peer
                self.network.trigger_callback('ln_status')
                return peer
       t@@ -238,7 +244,8 @@ class LNWorker(PrintError):
                    peer = self.add_peer(host, port, node_id)
                coro = self._open_channel_coroutine(peer, local_amt_sat, push_amt_sat, password)
                f = asyncio.run_coroutine_threadsafe(coro, self.network.asyncio_loop)
       -        return f.result(timeout)
       +        chan = f.result(timeout)
       +        return bh2u(chan.node_id)
        
            def pay(self, invoice, amount_sat=None):
                addr = lndecode(invoice, expected_hrp=constants.net.SEGWIT_HRP)