写一个分布式存储系统有多简单? (How easy to write a Distributed Storage System ?)

用python 写了个用户态分布式存储系统(且这么称吧),实现文件存储。(今天在google 上看到两个人和我干了同样工作,PyGfs 使用了pyro 库封装了一些远程调用,后者则是模拟实现了GFS 功能,两份代码都不长。)

自己的分布式文件系统总体分为Client 端和DataServer 端,没有NameServer 是因为通过文件名hash 得到对应的DS 服务器。存储的粒度为文件,提供write 和fetch,写文件和获取文件功能,读写过程:通过对路径+文件名进行hash,用机器数量为模数,得到目的主机编号和地址,数据通过socket 传过去,DS 的机器数在部署时可配置,写在配置文件。

TODO:文件分片,NS 实现(只保存目录树)

配置文件为ip , port 格式:

 1: 127.0.0.1,10001
 2: 127.0.0.1,10002
 3: 127.0.0.1,10003

 

DataServer 端:

 1: #!/usr/bin/env python
 2: #encoding=utf-8
 3: import os,sys
 4: import socket
 5:
 6: class DataServer:
 7:     port=10000
 8:     def __init__(self):
 9:         pass
 10:     def run(self):
 11:         dsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 12:         dsoc.bind(('localhost',self.port))
 13:         print "listening at",self.port
 14:         while True:
 15:             dsoc.listen(1)
 16:             conn,addr=dsoc.accept()
 17:             print "connected",addr
 18:             databuf=conn.recv(13)
 19:             print "received",databuf
 20:             if databuf.startswith('WRITFILE'):
 21:                 print "OPS == write file"
 22:                 dlist=databuf.split(',')
 23:                 fnamelen=int(dlist[1])
 24:                 conn.send('OK')
 25:                 print "filenamelen is",fnamelen
 26:                 filename=conn.recv(fnamelen)
 27:                 filename=filename[filename.rindex('\\')+1:]
 28:                 print "file is",filename
 29:                 fp=open(filename,'wb')
 30:                 while True:
 31:                     data=conn.recv(1024)
 32:                     if not data:break
 33:                     fp.write(data)
 34:                 fp.flush()
 35:                 fp.close()
 36:                 print "finished!",filename
 37:             if databuf.startswith('FETCFILE'):
 38:                 print "OPS == fetch file"
 39:                 dlist=databuf.split(',')
 40:                 fnamelen=int(dlist[1])
 41:                 conn.send('OK')
 42:                 print "filenamelen is",fnamelen
 43:                 filename=conn.recv(fnamelen)
 44:                 filename=filename[filename.rindex('\\')+1:]
 45:                 print "file is",filename
 46:                 fp=open(filename,'rb')
 47:                 while True:
 48:                     data=fp.read(4096)
 49:                     if not data:
 50:                         break
 51:                     while len(data)>0:
 52:                         sent=conn.send(data)
 53:                         data=data[sent:]
 54:                 print "Fished to send ",filename
 55:             conn.close()
 56:
 57:     def setPort(self,port):
 58:         self.port=int(port)
 59:
 60: if __name__=="__main__":
 61:     ds=DataServer()
 62:     ds.setPort(sys.argv[1])
 63:     ds.run()

 

Client 端:

 1: #!/usr/bin/env python
 2: #encoding=utf-8
 3: import os
 4: import socket
 5: import hashlib
 6:
 7: class Configuration:
 8:     count=0
 9:     clist=[]
 10:     def __init__(self):
 11:         self.readPath("machineconf.txt")
 12:     def readPath(self,pStr):
 13:         self.pathStr=pStr
 14:         fp=open(self.pathStr,'r')
 15:         while True:
 16:             line=fp.readline()
 17:             if not line:
 18:                 break
 19:             line=line.rstrip()
 20:             self.clist.append(line)
 21:             self.count=self.count+1
 22:     def getCount(self):
 23:         return self.count
 24:     def getList(self):
 25:         return self.clist
 26:
 27: class Client:
 28:     maList=[]
 29:     macNum=0
 30:     def __init__(self):
 31:         conf=Configuration()
 32:         self.maList=conf.getList()
 33:         self.macNum=conf.getCount()
 34:     def write(self,src):
 35:         srcHash=self.hash(src)
 36:         print "File is",src
 37:         locatedNum=srcHash%self.macNum
 38:         print "Location machine number is",locatedNum
 39:         IPort=self.maList[locatedNum].split(',')
 40:         toIp=IPort[0]
 41:         toPort=IPort[1]
 42:         print "Send to",toIp,toPort
 43:         self.send(src,toIp,toPort)
 44:     def hash(self,src):
 45:         md5=hashlib.md5()
 46:         md5.update(src)
 47:         return int(md5.hexdigest()[-4:],16)
 48:     def send(self,src,ip,port):
 49:         clsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 50:         clsoc.connect((ip,int(port)))
 51:         fp=open(src,'rb')
 52:         formStr="WRITFILE,%04d"%len(src)
 53:         print "formStr",formStr,len(formStr)
 54:         clsoc.send(formStr)
 55:         resdata=clsoc.recv(1024)
 56:         if resdata.startswith('OK'):
 57:             print "OK"
 58:         print "sending....",src
 59:         clsoc.send(src)
 60:         print "sending data...."
 61:         while True:
 62:             data=fp.read(4096)
 63:             if not data:
 64:                 break
 65:             while len(data)>0:
 66:                 sent=clsoc.send(data)
 67:                 data=data[sent:]
 68:         print "Fished to send ",src
 69:         fp.close()
 70:     def fetch(self,src):
 71:         srcHash=self.hash(src)
 72:         locatedNum=srcHash%self.macNum
 73:         IPort=self.maList[locatedNum].split(',')
 74:         toIp=IPort[0]
 75:         toPort=IPort[1]
 76:         print "fetch from",toIp,toPort
 77:         self.get(src,toIp,toPort)
 78:     def get(self,src,ip,port):
 79:         clsoc=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 80:         clsoc.connect((ip,int(port)))
 81:         formStr="FETCFILE,%04d"%len(src)
 82:         print "formStr",formStr
 83:         clsoc.send(formStr)
 84:         resdata=clsoc.recv(1024)
 85:         if resdata.startswith('OK'):
 86:             print "OK"
 87:         print "sending....",src
 88:         clsoc.send(src)
 89:         print "fetching data...."
 90:         ffile=src[src.rindex('\\')+1:]
 91:         fp=open(ffile,'wb')
 92:         while True:
 93:             data=clsoc.recv(1024)
 94:             if not data:break
 95:             fp.write(data)
 96:             print "fetching",data[-8:]
 97:         fp.flush()
 98:         fp.close()
 99:         print "finished!",src
 100: if __name__=="__main__":
 101:     mac=Client()
 102:     src="e:\\pics\\tumblr_m0pvvmcIv41r9mp65o1_500.gif"
 103:     mac.write(src)
 104:     mac.fetch(src)

Python线程,网络

最近写了些测试程序,python 的一些网络和线程总结如下:

  • Server/Client
  1. 创建socket 对象
  2. 绑定socket 到指定地址
  3. socket 的listen 方法接受连接
  4. 处理
  5. socket.close 关闭连接
  • 创建监听服务器方法:

创建一个类继承BaseRequestHandler(import SocketServer)

class Myserver(SocketServer.BaseRequestHandler):

这个服务器将在收到一个连接后创建一个线程,线程code 在handle里

def handle(self):

  • 全局变量&类变量

能不用全局变量就不用全局变量是个好习惯

即使在handle 内定义了全局变量 global param 也只是局部变量,该变量在线程死亡后被清理

可以使用类变量达到全局变量的效果