tupdate interface.py - electrum - Electrum Bitcoin wallet
 (HTM) git clone https://git.parazyd.org/electrum
 (DIR) Log
 (DIR) Files
 (DIR) Refs
 (DIR) Submodules
       ---
 (DIR) commit 28df27fba2b37237c97500533d5bd86f5bd4ee71
 (DIR) parent 035ecbc7cd41036bf96dd8a9b6b57d09b7c4eabc
 (HTM) Author: ThomasV <thomasv@gitorious>
       Date:   Mon, 28 Jul 2014 00:13:40 +0200
       
       update interface.py
       
       Diffstat:
         M lib/blockchain.py                   |       4 ++--
         M lib/interface.py                    |     422 +++++++++++--------------------
         M lib/network.py                      |      90 +++++++++++++++-----------------
         M lib/network_proxy.py                |      11 +++++++++--
         M lib/synchronizer.py                 |       4 ++--
         M lib/util.py                         |      52 +++++++++++++++++++++++++++----
       
       6 files changed, 250 insertions(+), 333 deletions(-)
       ---
 (DIR) diff --git a/lib/blockchain.py b/lib/blockchain.py
       t@@ -289,14 +289,14 @@ class Blockchain(threading.Thread):
        
            def request_header(self, i, h, queue):
                print_error("requesting header %d from %s"%(h, i.server))
       -        i.send([ ('blockchain.block.get_header',[h])], lambda i,r: queue.put((i,r)))
       +        i.send_request({'method':'blockchain.block.get_header', 'params':[h]}, queue)
        
            def retrieve_header(self, i, queue):
                while True:
                    try:
                        ir = queue.get(timeout=1)
                    except Queue.Empty:
       -                print_error('timeout')
       +                print_error('retrieve_header: timeout', i.server)
                        continue
        
                    if not ir: 
 (DIR) diff --git a/lib/interface.py b/lib/interface.py
       t@@ -33,7 +33,8 @@ DEFAULT_TIMEOUT = 5
        proxy_modes = ['socks4', 'socks5', 'http']
        
        
       -from util import parse_json    
       +import util
       +
        
        def cert_verify_hostname(s):
            # hostname verification (disabled)
       t@@ -48,15 +49,11 @@ def cert_verify_hostname(s):
        
        class Interface(threading.Thread):
        
       -
            def __init__(self, server, config = None):
       -
                threading.Thread.__init__(self)
                self.daemon = True
                self.config = config if config is not None else SimpleConfig()
                self.connect_event = threading.Event()
       -
       -        self.subscriptions = {}
                self.lock = threading.Lock()
        
                self.rtime = 0
       t@@ -65,8 +62,6 @@ class Interface(threading.Thread):
                self.poll_interval = 1
        
                self.debug = False # dump network messages. can be changed at runtime using the console
       -
       -        #json
                self.message_id = 0
                self.unanswered_requests = {}
        
       t@@ -91,36 +86,31 @@ class Interface(threading.Thread):
                    self.proxy_mode = proxy_modes.index(self.proxy["mode"]) + 1
        
        
       -
       -
       -
            def process_response(self, c):
       -
       -        # uncomment to debug
                if self.debug:
                    print_error( "<--",c )
        
                msg_id = c.get('id')
                error = c.get('error')
       -        
       +        result = c.get('result')
       +
                if error:
                    print_error("received error:", c)
       -            if msg_id is not None:
       -                with self.lock: 
       -                    method, params, callback = self.unanswered_requests.pop(msg_id)
       -                callback(self,{'method':method, 'params':params, 'error':error, 'id':msg_id})
       -
       +            #queue.put((self,{'method':method, 'params':params, 'error':error, 'id':_id}))
                    return
       -
       +        
                if msg_id is not None:
       -            with self.lock: 
       -                method, params, callback = self.unanswered_requests.pop(msg_id)
       -            result = c.get('result')
       +            with self.lock:
       +                method, params, _id, queue = self.unanswered_requests.pop(msg_id)
       +            if queue is None:
       +                queue = self.response_queue
        
                else:
       +            queue = self.response_queue
                    # notification
                    method = c.get('method')
                    params = c.get('params')
       +            _id = None
        
                    if method == 'blockchain.numblocks.subscribe':
                        result = params[0]
       t@@ -135,138 +125,16 @@ class Interface(threading.Thread):
                        result = params[1]
                        params = [addr]
        
       -            with self.lock:
       -                for k,v in self.subscriptions.items():
       -                    if (method, params) in v:
       -                        callback = k
       -                        break
       -                else:
       -                    print_error( "received unexpected notification", method, params)
       -                    print_error( self.subscriptions )
       -                    return
       -
       -
       -        callback(self, {'method':method, 'params':params, 'result':result, 'id':msg_id})
       +        queue.put((self, {'method':method, 'params':params, 'result':result, 'id':_id}))
        
        
            def on_version(self, i, result):
                self.server_version = result
       -
       -
       -    def start_http(self):
       -        self.session_id = None
       -        self.is_connected = True
       -        self.connection_msg = ('https' if self.use_ssl else 'http') + '://%s:%d'%( self.host, self.port )
       -        try:
       -            self.poll()
       -        except Exception:
       -            print_error("http init session failed")
       -            self.is_connected = False
       -            return
       -
       -        if self.session_id:
       -            print_error('http session:',self.session_id)
       -            self.is_connected = True
       -        else:
       -            self.is_connected = False
       -
       -    def run_http(self):
       -        self.is_connected = True
       -        while self.is_connected:
       -            try:
       -                if self.session_id:
       -                    self.poll()
       -                time.sleep(self.poll_interval)
       -            except socket.gaierror:
       -                break
       -            except socket.error:
       -                break
       -            except Exception:
       -                traceback.print_exc(file=sys.stdout)
       -                break
       -            
       -        self.is_connected = False
       -
                        
       -    def poll(self):
       -        self.send([], None)
       -
       -
       -    def send_http(self, messages, callback):
       -        import urllib2, json, time, cookielib
       -        print_error( "send_http", messages )
       -        
       -        if self.proxy:
       -            socks.setdefaultproxy(self.proxy_mode, self.proxy["host"], int(self.proxy["port"]) )
       -            socks.wrapmodule(urllib2)
       -
       -        cj = cookielib.CookieJar()
       -        opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
       -        urllib2.install_opener(opener)
       -
       -        t1 = time.time()
       -
       -        data = []
       -        ids = []
       -        for m in messages:
       -            method, params = m
       -            if type(params) != type([]): params = [params]
       -            data.append( { 'method':method, 'id':self.message_id, 'params':params } )
       -            self.unanswered_requests[self.message_id] = method, params, callback
       -            ids.append(self.message_id)
       -            self.message_id += 1
       -
       -        if data:
       -            data_json = json.dumps(data)
       -        else:
       -            # poll with GET
       -            data_json = None 
       -
       -            
       -        headers = {'content-type': 'application/json'}
       -        if self.session_id:
       -            headers['cookie'] = 'SESSION=%s'%self.session_id
       -
       -        try:
       -            req = urllib2.Request(self.connection_msg, data_json, headers)
       -            response_stream = urllib2.urlopen(req, timeout=DEFAULT_TIMEOUT)
       -        except Exception:
       -            return
       -
       -        for index, cookie in enumerate(cj):
       -            if cookie.name=='SESSION':
       -                self.session_id = cookie.value
       -
       -        response = response_stream.read()
       -        self.bytes_received += len(response)
       -        if response: 
       -            response = json.loads( response )
       -            if type(response) is not type([]):
       -                self.process_response(response)
       -            else:
       -                for item in response:
       -                    self.process_response(item)
       -
       -        if response: 
       -            self.poll_interval = 1
       -        else:
       -            if self.poll_interval < 15: 
       -                self.poll_interval += 1
       -        #print self.poll_interval, response
       -
       -        self.rtime = time.time() - t1
       -        self.is_connected = True
       -        return ids
       -
       -
       -
        
            def start_tcp(self):
        
       -        self.connection_msg = self.host + ':%d' % self.port
       -
                if self.proxy is not None:
       -
                    socks.setdefaultproxy(self.proxy_mode, self.proxy["host"], int(self.proxy["port"]))
                    socket.socket = socks.socksocket
                    # prevent dns leaks, see http://stackoverflow.com/questions/13184205/dns-over-proxy
       t@@ -276,7 +144,6 @@ class Interface(threading.Thread):
        
                if self.use_ssl:
                    cert_path = os.path.join( self.config.path, 'certs', self.host)
       -
                    if not os.path.exists(cert_path):
                        is_new = True
                        # get server certificate.
       t@@ -387,133 +254,45 @@ class Interface(threading.Thread):
                self.s = s
                self.is_connected = True
                print_error("connected to", self.host, self.port)
       +        self.pipe = util.SocketPipe(s)
        
        
            def run_tcp(self):
       -        try:
       -            #if self.use_ssl: self.s.do_handshake()
       -            message = ''
       -            while self.is_connected:
       -                try: 
       -                    timeout = False
       -                    msg = self.s.recv(1024)
       -                except socket.timeout:
       -                    timeout = True
       -                except ssl.SSLError:
       -                    timeout = True
       -                except socket.error, err:
       -                    if err.errno == 60:
       -                        timeout = True
       -                    elif err.errno in [11, 10035]:
       -                        print_error("socket errno", err.errno)
       -                        time.sleep(0.1)
       -                        continue
       -                    else:
       -                        traceback.print_exc(file=sys.stdout)
       -                        raise
       -
       -                if timeout:
       -                    # ping the server with server.version, as a real ping does not exist yet
       -                    self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])], self.on_version)
       -                    continue
       -
       -                message += msg
       -                self.bytes_received += len(msg)
       -                if msg == '': 
       -                    self.is_connected = False
       -
       -                while True:
       -                    response, message = parse_json(message)
       -                    if response is None:
       -                        break
       -                    self.process_response(response)
       -
       -        except Exception:
       -            traceback.print_exc(file=sys.stdout)
       +        t = time.time()
       +        while self.is_connected:
       +            # ping the server with server.version
       +            if time.time() - t > 60:
       +                self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]})
       +                t = time.time()
       +            try:
       +                response = self.pipe.get()
       +            except util.timeout:
       +                continue
       +            if response is None:
       +                break
       +            self.process_response(response)
        
                self.is_connected = False
       -
       -
       -    def send_tcp(self, messages, callback):
       -        """return the ids of the requests that we sent"""
       -        out = ''
       -        ids = []
       -        for m in messages:
       -            method, params = m 
       -            request = json.dumps( { 'id':self.message_id, 'method':method, 'params':params } )
       -            self.unanswered_requests[self.message_id] = method, params, callback
       -            ids.append(self.message_id)
       -            if self.debug:
       -                print_error("-->", request)
       +        print_error("exit interface", self.server)
       +
       +    def send_request(self, request, queue=None):
       +        _id = request.get('id')
       +        method = request.get('method')
       +        params = request.get('params')
       +        with self.lock:
       +            self.pipe.send({'id':self.message_id, 'method':method, 'params':params})
       +            self.unanswered_requests[self.message_id] = method, params, _id, queue
                    self.message_id += 1
       -            out += request + '\n'
       -        while out:
       -            try:
       -                sent = self.s.send( out )
       -                out = out[sent:]
       -            except socket.error,e:
       -                if e[0] in (errno.EWOULDBLOCK,errno.EAGAIN):
       -                    print_error( "EAGAIN: retrying")
       -                    time.sleep(0.1)
       -                    continue
       -                else:
       -                    traceback.print_exc(file=sys.stdout)
       -                    # this happens when we get disconnected
       -                    print_error( "Not connected, cannot send" )
       -                    return None
       -        return ids
       -
       -
       -
       -
       +        if self.debug:
       +            print_error("-->", request)
        
            def start_interface(self):
       -
                if self.protocol in 'st':
                    self.start_tcp()
                elif self.protocol in 'gh':
                    self.start_http()
       -
                self.connect_event.set()
        
       -
       -
       -    def stop_subscriptions(self):
       -        for callback in self.subscriptions.keys():
       -            callback(self, None)
       -        self.subscriptions = {}
       -
       -
       -    def send(self, messages, callback):
       -
       -        sub = []
       -        for message in messages:
       -            m, v = message
       -            if m[-10:] == '.subscribe':
       -                sub.append(message)
       -
       -        if sub:
       -            with self.lock:
       -                if self.subscriptions.get(callback) is None: 
       -                    self.subscriptions[callback] = []
       -                for message in sub:
       -                    if message not in self.subscriptions[callback]:
       -                        self.subscriptions[callback].append(message)
       -
       -        if not self.is_connected: 
       -            print_error("interface: trying to send while not connected")
       -            return
       -
       -        if self.protocol in 'st':
       -            with self.lock:
       -                out = self.send_tcp(messages, callback)
       -        else:
       -            # do not use lock, http is synchronous
       -            out = self.send_http(messages, callback)
       -
       -        return out
       -
       -
            def parse_proxy_options(self, s):
                if type(s) == type({}): return s  # fixme: type should be fixed
                if type(s) != type(""): return None  
       t@@ -533,43 +312,34 @@ class Interface(threading.Thread):
                    proxy["port"] = "8080" if proxy["mode"] == "http" else "1080"
                return proxy
        
       -
       -
            def stop(self):
                if self.is_connected and self.protocol in 'st' and self.s:
                    self.s.shutdown(socket.SHUT_RDWR)
                    self.s.close()
       -
                self.is_connected = False
        
       -
            def is_up_to_date(self):
                return self.unanswered_requests == {}
        
       -
       -
       -    def start(self, queue = None, wait = False):
       +    def start(self, response_queue, wait = False):
                if not self.server:
                    return
       -        self.queue = queue if queue else Queue.Queue()
       +        self.response_queue = response_queue
                threading.Thread.start(self)
                if wait:
                    self.connect_event.wait()
        
       -
            def run(self):
                self.start_interface()
                if self.is_connected:
       -            self.send([('server.version', [ELECTRUM_VERSION, PROTOCOL_VERSION])], self.on_version)
       +            self.send_request({'method':'server.version', 'params':[ELECTRUM_VERSION, PROTOCOL_VERSION]})
                    self.change_status()
                    self.run_tcp() if self.protocol in 'st' else self.run_http()
                self.change_status()
                
       -
            def change_status(self):
       -        #print "change status", self.server, self.is_connected
       -        self.queue.put(self)
       -
       +        # print_error( "change status", self.server, self.is_connected)
       +        self.response_queue.put((self, None))
        
            def synchronous_get(self, requests, timeout=100000000):
                queue = Queue.Queue()
       t@@ -588,6 +358,112 @@ class Interface(threading.Thread):
                return out
        
        
       +class HTTP_Interface(Interface):
       +
       +    def send_request(self, request, queue=None):
       +        import urllib2, json, time, cookielib
       +        print_error( "send_http", messages )
       +        
       +        if self.proxy:
       +            socks.setdefaultproxy(self.proxy_mode, self.proxy["host"], int(self.proxy["port"]) )
       +            socks.wrapmodule(urllib2)
       +
       +        cj = cookielib.CookieJar()
       +        opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(cj))
       +        urllib2.install_opener(opener)
       +
       +        t1 = time.time()
       +
       +        data = []
       +        ids = []
       +        for m in messages:
       +            method, params = m
       +            if type(params) != type([]): params = [params]
       +            data.append( { 'method':method, 'id':self.message_id, 'params':params } )
       +            self.unanswered_requests[self.message_id] = method, params, callback
       +            ids.append(self.message_id)
       +            self.message_id += 1
       +
       +        if data:
       +            data_json = json.dumps(data)
       +        else:
       +            # poll with GET
       +            data_json = None 
       +
       +            
       +        headers = {'content-type': 'application/json'}
       +        if self.session_id:
       +            headers['cookie'] = 'SESSION=%s'%self.session_id
       +
       +        try:
       +            req = urllib2.Request(self.connection_msg, data_json, headers)
       +            response_stream = urllib2.urlopen(req, timeout=DEFAULT_TIMEOUT)
       +        except Exception:
       +            return
       +
       +        for index, cookie in enumerate(cj):
       +            if cookie.name=='SESSION':
       +                self.session_id = cookie.value
       +
       +        response = response_stream.read()
       +        self.bytes_received += len(response)
       +        if response: 
       +            response = json.loads( response )
       +            if type(response) is not type([]):
       +                self.process_response(response)
       +            else:
       +                for item in response:
       +                    self.process_response(item)
       +        if response: 
       +            self.poll_interval = 1
       +        else:
       +            if self.poll_interval < 15: 
       +                self.poll_interval += 1
       +        #print self.poll_interval, response
       +        self.rtime = time.time() - t1
       +        self.is_connected = True
       +        return ids
       +
       +    def poll(self):
       +        self.send([], None)
       +
       +    def start_http(self):
       +        self.session_id = None
       +        self.is_connected = True
       +        self.connection_msg = ('https' if self.use_ssl else 'http') + '://%s:%d'%( self.host, self.port )
       +        try:
       +            self.poll()
       +        except Exception:
       +            print_error("http init session failed")
       +            self.is_connected = False
       +            return
       +
       +        if self.session_id:
       +            print_error('http session:',self.session_id)
       +            self.is_connected = True
       +        else:
       +            self.is_connected = False
       +
       +    def run_http(self):
       +        self.is_connected = True
       +        while self.is_connected:
       +            try:
       +                if self.session_id:
       +                    self.poll()
       +                time.sleep(self.poll_interval)
       +            except socket.gaierror:
       +                break
       +            except socket.error:
       +                break
       +            except Exception:
       +                traceback.print_exc(file=sys.stdout)
       +                break
       +            
       +        self.is_connected = False
       +
       +
       +
       +
        
        
        def check_cert(host, cert):
 (DIR) diff --git a/lib/network.py b/lib/network.py
       t@@ -109,16 +109,13 @@ class Network(threading.Thread):
                if not os.path.exists(dir_path):
                    os.mkdir(dir_path)
        
       -        # default subscriptions
       -        self.subscriptions = {}
       -        self.subscriptions[self.on_banner] = [('server.banner',[])]
       -        self.subscriptions[self.on_peers] = [('server.peers.subscribe',[])]
       -        self.pending_transactions_for_notifications = []
       +        # address subscriptions and cached results
       +        self.addresses = {} 
        
                self.connection_status = 'connecting'
        
                self.requests_queue = Queue.Queue()
       -        self.unanswered_requests = {}
       +
        
            def get_server_height(self):
                return self.heights.get(self.default_server,0)
       t@@ -139,31 +136,16 @@ class Network(threading.Thread):
                return self.interface and self.interface.is_connected
        
            def is_up_to_date(self):
       +        raise
                return self.interface.is_up_to_date()
        
            def send_subscriptions(self):
       -        for cb, sub in self.subscriptions.items():
       -            self.interface.send(sub, cb)
       -
       -    def subscribe(self, messages, callback):
       -        with self.lock:
       -            if self.subscriptions.get(callback) is None: 
       -                self.subscriptions[callback] = []
       -            for message in messages:
       -                if message not in self.subscriptions[callback]:
       -                    self.subscriptions[callback].append(message)
       -
       -        if self.is_connected():
       -            self.interface.send( messages, callback )
       +        for addr in self.addresses:
       +            self.interface.send_request({'method':'blockchain.address.subscribe', 'params':[addr]})
       +        self.interface.send_request({'method':'server.banner','params':[]})
       +        self.interface.send_request({'method':'server.peers.subscribe','params':[]})
        
        
       -    def send(self, messages, callback):
       -        if self.is_connected():
       -            self.interface.send( messages, callback )
       -            return True
       -        else:
       -            return False
       -
            def get_status_value(self, key):
                if key == 'status':
                    value = self.connection_status
       t@@ -225,7 +207,7 @@ class Network(threading.Thread):
                i = interface.Interface(server, self.config)
                self.pending_servers.add(server)
                i.start(self.queue)
       -        return i 
       +        return i
        
            def start_random_interface(self):
                server = self.random_server()
       t@@ -241,9 +223,9 @@ class Network(threading.Thread):
                self.running = True
                self.response_queue = response_queue
                self.start_interfaces()
       -        threading.Thread.start(self)
       -        threading.Thread(target=self.process_thread).start()
       +        threading.Thread(target=self.process_requests_thread).start()
                self.blockchain.start()
       +        threading.Thread.start(self)
        
            def set_parameters(self, host, port, protocol, proxy, auto_connect):
                self.config.set_key('auto_cycle', auto_connect, True)
       t@@ -331,15 +313,24 @@ class Network(threading.Thread):
                self.trigger_callback('updated')
        
        
       -    def process_thread(self):
       +    def process_response(self, i, response):
       +        method = response['method']
       +        if method == 'blockchain.address.subscribe':
       +            self.on_address(i, response)
       +        elif method == 'blockchain.headers.subscribe':
       +            self.on_header(i, response)
       +        elif method == 'server.peers.subscribe':
       +            self.on_peers(i, response)
       +
       +    def process_requests_thread(self):
                while self.is_running():
                    try:
                        request = self.requests_queue.get(timeout=0.1)
                    except Queue.Empty:
                        continue
       -            self.process(request)
       +            self.process_request(request)
        
       -    def process(self, request):
       +    def process_request(self, request):
                method = request['method']
                params = request['params']
                _id = request['id']
       t@@ -359,32 +350,29 @@ class Network(threading.Thread):
                    self.response_queue.put(out)
                    return
        
       -        def cb(i,r):
       -            _id = r.get('id')
       -            if _id is not None:
       -                my_id = self.unanswered_requests.pop(_id)
       -                r['id'] = my_id
       -            self.response_queue.put(r)
       -
       -        try:
       -            new_id = self.interface.send([(method, params)], cb) [0]
       -        except Exception as e:
       -            self.response_queue.put({'id':_id, 'error':str(e)}) 
       -            print_error("network interface error", str(e))
       -            return
       +        if method == 'blockchain.address.subscribe':
       +            addr = params[0]
       +            if addr in self.addresses:
       +                self.response_queue.put({'id':_id, 'result':self.addresses[addr]}) 
       +                return
        
       -        self.unanswered_requests[new_id] = _id
       +        self.interface.send_request(request)
        
        
            def run(self):
                while self.is_running():
                    try:
       -                i = self.queue.get(timeout = 30 if self.interfaces else 3)
       +                i, response = self.queue.get(0.1) #timeout = 30 if self.interfaces else 3)
                    except Queue.Empty:
                        if len(self.interfaces) < self.num_server:
                            self.start_random_interface()
                        continue
       +            
       +            if response is not None:
       +                self.process_response(i, response)
       +                continue
        
       +            # if response is None it is a notification about the interface
                    if i.server in self.pending_servers:
                        self.pending_servers.remove(i.server)
        
       t@@ -392,7 +380,7 @@ class Network(threading.Thread):
                        #if i.server in self.interfaces: raise
                        self.interfaces[i.server] = i
                        self.add_recent_server(i)
       -                i.send([ ('blockchain.headers.subscribe',[])], self.on_header)
       +                i.send_request({'method':'blockchain.headers.subscribe','params':[]})
                        if i == self.interface:
                            print_error('sending subscriptions to', self.interface.server)
                            self.send_subscriptions()
       t@@ -439,6 +427,12 @@ class Network(threading.Thread):
                self.banner = r.get('result')
                self.trigger_callback('banner')
        
       +    def on_address(self, i, r):
       +        addr = r.get('params')[0]
       +        result = r.get('result')
       +        self.addresses[addr] = result
       +        self.response_queue.put(r)
       +
            def stop(self):
                with self.lock:
                    self.running = False
 (DIR) diff --git a/lib/network_proxy.py b/lib/network_proxy.py
       t@@ -32,6 +32,12 @@ from simple_config import SimpleConfig
        from daemon import NetworkServer, DAEMON_PORT
        
        
       +# policies
       +SPAWN_DAEMON=0
       +NEED_DAEMON=1
       +NO_DAEMON=2
       +USE_DAEMON_IF_AVAILABLE=3
       +
        
        class NetworkProxy(threading.Thread):
        
       t@@ -119,8 +125,9 @@ class NetworkProxy(threading.Thread):
                            print_error( "received unexpected notification", method, params)
                            return
        
       -        callback({'method':method, 'params':params, 'result':result, 'id':msg_id})
       -
       +        
       +        r = {'method':method, 'params':params, 'result':result, 'id':msg_id}
       +        callback(r)
        
        
            def send(self, messages, callback):
 (DIR) diff --git a/lib/synchronizer.py b/lib/synchronizer.py
       t@@ -54,7 +54,7 @@ class WalletSynchronizer(threading.Thread):
                messages = []
                for addr in addresses:
                    messages.append(('blockchain.address.subscribe', [addr]))
       -        self.network.send(messages, lambda r: self.queue.put(r))
       +        self.network.send(messages, self.queue.put)
        
            def run(self):
                with self.lock:
       t@@ -147,7 +147,7 @@ class WalletSynchronizer(threading.Thread):
                        addr = params[0]
                        if self.wallet.get_status(self.wallet.get_history(addr)) != result:
                            if requested_histories.get(addr) is None:
       -                        self.network.send([('blockchain.address.get_history', [addr])], lambda i,r:self.queue.put(r))
       +                        self.network.send([('blockchain.address.get_history', [addr])], self.queue.put)
                                requested_histories[addr] = result
        
                    elif method == 'blockchain.address.get_history':
 (DIR) diff --git a/lib/util.py b/lib/util.py
       t@@ -230,7 +230,12 @@ def parse_json(message):
        class timeout(Exception):
            pass
        
       -import socket, json
       +import socket
       +import errno
       +import json
       +import ssl
       +import traceback
       +import time
        
        class SocketPipe:
        
       t@@ -251,8 +256,21 @@ class SocketPipe:
                        data = self.socket.recv(1024)
                    except socket.timeout:
                        raise timeout
       +            except ssl.SSLError:
       +                raise timeout
       +            except socket.error, err:
       +                if err.errno == 60:
       +                    raise timeout
       +                elif err.errno in [11, 10035]:
       +                    print_error("socket errno", err.errno)
       +                    time.sleep(0.1)
       +                    continue
       +                else:
       +                    traceback.print_exc(file=sys.stdout)
       +                    data = ''
                    except:
                        data = ''
       +
                    if not data:
                        self.socket.close()
                        return None
       t@@ -260,15 +278,37 @@ class SocketPipe:
        
            def send(self, request):
                out = json.dumps(request) + '\n'
       -        while out:
       -            sent = self.socket.send( out )
       -            out = out[sent:]
       +        self._send(out)
        
            def send_all(self, requests):
                out = ''.join(map(lambda x: json.dumps(x) + '\n', requests))
       +        self._send(out)
       +
       +    def _send(self, out):
                while out:
       -            sent = self.socket.send( out )
       -            out = out[sent:]
       +            try:
       +                sent = self.socket.send( out )
       +                out = out[sent:]
       +            except ssl.SSLError as e:
       +                print_error( "SSLError: retry", e)
       +                time.sleep(0.1)
       +                continue
       +
       +            except socket.error as e:
       +                if e[0] in (errno.EWOULDBLOCK,errno.EAGAIN):
       +                    print_error( "EAGAIN: retrying")
       +                    time.sleep(0.1)
       +                    continue
       +                elif e[0] == 'The write operation timed out':
       +                    print_error( "ssl: retry")
       +                    time.sleep(0.1)
       +                    continue
       +                else:
       +                    print repr(e[0])
       +                    traceback.print_exc(file=sys.stdout)
       +                    print_error( "Not connected, cannot send" )
       +                    return False
       +
        
        
        import Queue