tMerge branch 'master' of gitorious.org:electrum/electrum - electrum - Electrum Bitcoin wallet
 (HTM) git clone https://git.parazyd.org/electrum
 (DIR) Log
 (DIR) Files
 (DIR) Refs
 (DIR) Submodules
       ---
 (DIR) commit b36a71cf93353aa9d390bd01396bd83bd23feee0
 (DIR) parent 9e316b7d2ba72a89e8bf4253aae02cb3ac4a77f7
 (HTM) Author: ThomasV <thomasv@gitorious>
       Date:   Tue, 13 Mar 2012 16:44:10 +0100
       
       Merge branch 'master' of gitorious.org:electrum/electrum
       
       Diffstat:
         M server/server.py                    |     115 ++++++++++++++++++++++---------
       
       1 file changed, 84 insertions(+), 31 deletions(-)
       ---
 (DIR) diff --git a/server/server.py b/server/server.py
       t@@ -75,12 +75,19 @@ bitcoind_url = 'http://%s:%s@%s:%s/' % ( config.get('bitcoind','user'), config.g
        
        stopping = False
        block_number = -1
       +old_block_number = -1
        sessions = {}
       +sessions_sub_numblocks = [] # sessions that have subscribed to the service
       +
        dblock = thread.allocate_lock()
        peer_list = {}
        
        wallets = {} # for ultra-light clients such as bccapi
        
       +from Queue import Queue
       +input_queue = Queue()
       +output_queue = Queue()
       +
        class MyStore(Datastore_class):
        
            def import_tx(self, tx, is_coinbase):
       t@@ -408,18 +415,7 @@ def poll_session(session_id):
                k = 0
                for addr in addresses:
                    if store.tx_cache.get( addr ) is not None: k += 1
       -
       -            # get addtess status, i.e. the last block for that address.
       -            tx_points = store.get_history(addr)
       -            if not tx_points:
       -                status = None
       -            else:
       -                lastpoint = tx_points[-1]
       -                status = lastpoint['blk_hash']
       -                # this is a temporary hack; move it up once old clients have disappeared
       -                if status == 'mempool' and session['version'] != "old":
       -                    status = status + ':%d'% len(tx_points)
       -
       +            status = get_address_status( addr )
                    last_status = addresses.get( addr )
                    if last_status != status:
                        addresses[addr] = status
       t@@ -433,6 +429,36 @@ def poll_session(session_id):
        
                return out
        
       +def get_address_status(addr):
       +    # get addtess status, i.e. the last block for that address.
       +    tx_points = store.get_history(addr)
       +    if not tx_points:
       +        status = None
       +    else:
       +        lastpoint = tx_points[-1]
       +        status = lastpoint['blk_hash']
       +        # this is a temporary hack; move it up once old clients have disappeared
       +        if status == 'mempool': # and session['version'] != "old":
       +            status = status + ':%d'% len(tx_points)
       +    return status
       +
       +
       +def send_numblocks(session_id):
       +    out = json.dumps( {'method':'numblocks.subscribe', 'result':block_number} )
       +    output_queue.put((session_id, out))
       +
       +def subscribe_to_numblocks(session_id):
       +    sessions_sub_numblocks.append(session_id)
       +    send_numblocks(session_id)
       +
       +def subscribe_to_address(session_id, address):
       +    #print "%s subscribing to %s"%(session_id,address)
       +    status = get_address_status(address)
       +    sessions[session_id]['type'] = 'subscribe'
       +    sessions[session_id]['addresses'][address] = status
       +    sessions[session_id]['last_time'] = time.time()
       +    out = json.dumps( { 'method':'address.subscribe', 'address':address, 'status':status } )
       +    output_queue.put((session_id, out))
        
        def new_session(version, addresses):
            session_id = random_string(10)
       t@@ -443,11 +469,6 @@ def new_session(version, addresses):
            sessions[session_id]['last_time'] = time.time()
            return out
        
       -def subscribe_to_address(session_id, address):
       -    sessions[session_id]['addresses'][address] = ''
       -    sessions[session_id]['last_time'] = time.time()
       -
       -
        def update_session(session_id,addresses):
            sessions[session_id]['addresses'] = {}
            for a in addresses:
       t@@ -611,6 +632,9 @@ def do_command(cmd, data, ipaddr):
        ####################################################################
        
        def tcp_server_thread():
       +    thread.start_new_thread(process_input_queue, ())
       +    thread.start_new_thread(process_output_queue, ())
       +
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            s.bind((config.get('server','host'), 50001))
       t@@ -624,19 +648,19 @@ def tcp_server_thread():
                    traceback.print_exc(file=sys.stdout)
        
        
       -
       +# one thread per client. put requests in a queue.
        def tcp_client_thread(ipaddr,conn):
            """ use a persistent connection. put commands in a queue."""
            print "persistent client thread", ipaddr
            global sessions
        
            session_id = random_string(10)
       -    sessions[session_id] = { 'addresses':{}, 'version':'unknown' }
       +    sessions[session_id] = { 'conn':conn, 'addresses':{}, 'version':'unknown' }
        
            ipaddr = ipaddr[0]
            msg = ''
        
       -    while True:
       +    while not stopping:
                d = conn.recv(1024)
                msg += d
                if not d: break
       t@@ -655,18 +679,41 @@ def tcp_client_thread(ipaddr,conn):
                            print "syntax error", repr(c), ipaddr
                            continue
        
       -                out = None
       -                if cmd == 'blockchain.address.subscribe':
       -                    subscribe_to_address(session_id,data)
       -                elif cmd == 'client.version':
       -                    sessions[session_id]['version'] = data
       -                elif cmd == 'server.banner':
       -                    out = json.dumps( { 'method':'server.banner', 'result':config.get('server','banner').replace('\\n','\n') } )
       -                else:
       -                    print "unknown command", cmd
       +                # add to queue
       +                input_queue.put((session_id, cmd, data))
       +
       +
       +# read commands from the input queue. perform requests, etc. this should be called from the main thread.
       +def process_input_queue():
       +    while not stopping:
       +        session_id, cmd, data = input_queue.get()
       +        out = None
       +        if cmd == 'address.subscribe':
       +            subscribe_to_address(session_id,data)
       +        elif cmd == 'numblocks.subscribe':
       +            subscribe_to_numblocks(session_id)
       +        elif cmd == 'client.version':
       +            sessions[session_id]['version'] = data
       +        elif cmd == 'server.banner':
       +            out = json.dumps( { 'method':'server.banner', 'result':config.get('server','banner').replace('\\n','\n') } )
       +        elif cmd == 'address.get_history':
       +            address = data
       +            out = json.dumps( { 'method':'address.get_history', 'address':address, 'result':store.get_history( address ) } )
       +        elif cmd == 'transaction.broadcast':
       +            out = json.dumps( { 'method':'transaction.broadcast', 'result':send_tx(data) } )
       +        else:
       +            print "unknown command", cmd
       +        if out:
       +            output_queue.put((session_id, out))
        
       -                if out:
       -                    conn.send(out+'\n')
       +# this is a separate thread
       +def process_output_queue():
       +    while not stopping:
       +        session_id, out = output_queue.get()
       +        session = sessions.get(session_id)
       +        if session: 
       +            conn = session.get('conn')
       +            conn.send(out+'\n')
        
        
        
       t@@ -835,6 +882,12 @@ if __name__ == '__main__':
                    store.catch_up()
                    memorypool_update(store)
                    block_number = store.get_block_number(1)
       +
       +            if block_number != old_block_number:
       +                old_block_number = block_number
       +                for session_id in sessions_sub_numblocks:
       +                    send_numblocks(session_id)
       +
                except IOError:
                    print "IOError: cannot reach bitcoind"
                    block_number = 0