这里有新鲜出炉的Python入门,程序狗速度看过来!
Python 是一种面向对象、解释型计算机程序设计语言,由Guido van Rossum于1989年底发明,第一个公开发行版发行于1991年。Python语法简洁而清晰,具有丰富和强大的类库。它常被昵称为胶水语言,它能够把用其他语言制作的各种模块(尤其是C/C++)很轻松地联结在一起。
这篇文章主要介绍了Python实现基于多线程、多用户的FTP服务器与客户端功能,结合完整实例形式分析了Python多线程、多用户FTP服务器端与客户端相关实现技巧与注意事项,需要的朋友可以参考下
本文实例讲述了Python实现基于多线程、多用户的FTP服务器与客户端功能。分享给大家供大家参考,具体如下:
项目介绍:
1. 用户加密认证
2. 允许同时多用户登录
3. 每个用户有自己的家目录 ,且只能访问自己的家目录
4. 对用户进行磁盘配额,每个用户的可用空间不同
5. 允许用户在ftp server上随意切换目录
6. 允许用户查看当前目录下文件
7. 允许上传和下载文件,保证文件一致性
8. 文件传输过程中显示进度条
实现的原理:
服务器端启用端口监听,并对每一连接启用一个线程,对用户登陆密码采用SHA512进行加密并进行匹配,当用户登陆成功后,实例化FTPS,并引导客户端进入主命令模式,
然后实现FTP的上传功能、下载功能、新建目录、删除文件或目录、切换目录等实例化操作,同时对相关上传下载进行进度条显示,服务器端显示下载或上传文件的大小等
客户端与服务器协商建立连接后,进行用户身份登陆,登陆成功接收服务器指令,转入命令输入窗口,同时对put 与 get命令进行判断,实现特定的上传与下载功能
核心代码实现如下:
服务器端
main.py
- #!/usr/bin/env python3.5
- # -*-coding:utf8-*-
- import os,sys,socket,pickle
- BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(BASEDIR)
- from conf import setting
- from core import file_handler
- from core import db_handler
- import select,hashlib
- import threading
- def login(username,password):
- """
- FTP登陆验证函数
- :param username:
- :param password:
- :return:
- # testDict ={"username":"jjb","password":"123456","file_dir":"E:\python","file_size":500}
- # file = 'jjb.pkl'
- # fp = open(file,'wb')
- # pickle.dump(testDict,fp)
- # fp.close()
- f = open("jjb.pkl","rb")
- data = pickle.loads(f.read())
- f.close()
- print(data)
- """
- #实例化加密函数
- hash = hashlib.sha512()
- db= db_handler.handler(setting.DATABASE,username)
- if os.path.isfile(db):
- f = open(db,"rb")
- data = pickle.loads(f.read())
- f.close()
- if username == data["name"]:
- hash.update(bytes(data["password"],"utf8"))
- hash_pwd = hash.hexdigest()
- if hash_pwd == password:
- filedir = data["file_dir"]
- filesize = data["file_size"]
- return "True|%s|%s"%(filedir,filesize)
- else:
- return "False||"
- else:
- return "False||"
- else:
- return "False||"
- def process(conn,addr):
- flage = "False"
- # 接收客户端连接请求信息
- info = conn.recv(1000)
- if info.decode() == "connect":
- conn.send(bytes("login","utf8"))
- # 接收用户及密码信息
- while flage =="False":
- user_check =conn.recv(8000)
- # 分割用户名及密码
- username,password = str(user_check.decode()).split("|")
- # 调用登陆验证函数
- login_ack = login(username,password)
- flage,home,size = str(login_ack).split("|")
- # print(flage,home,size)
- # print("user_input:",username,"user_pass:",password)
- if flage =="True":
- # 登陆成功发送登陆确认信息给客户端
- conn.send(bytes("login_ack","utf8"))
- # 实例化FTPserver
- ftp = file_handler.FTPs(username,conn,home,size) # 登陆用户,数据连接,工作目录,磁盘配额
- ftp.run()
- break
- else:
- # 登陆失败,发送给客户端重新验证
- conn.send(bytes("登陆失败!","utf8"))
- def ftp_server():
- '''
- 启动FTP服务器端,开启线程监听
- :return:
- '''
- server = socket.socket()
- server.bind((setting.IP_PORT["host"],setting.IP_PORT["port"]))
- server.listen(10)
- while True:
- r,w,e = select.select([server,], [], [], 1)
- for i,server in enumerate(r):
- conn,addr = server.accept()
- # 创建线程
- t = threading.Thread(target=process, args=(conn, addr))
- # 启动线程
- t.start()
- server.close()
- def run():
- ftp_server()
- if __name__ =="__main__":
- run()
file_handler.py:
- #!/usr/bin/env python3.5
- # -*-coding:utf8-*-
- import os,sys
- BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(BASEDIR)
- import re
- from core import db_handler
- from conf import setting
- import pickle
- class FTPs(object):
- '''
- ftp操作命令方法:
- '''
- def __init__(self,username,conn,home,total_size):
- '''
- 初始化参数
- :param username: 操作用户名
- :param conn: sock连接
- :param home: 用户根目录
- :param total_size: 磁盘配额
- :return:
- '''
- self.username = username
- self.conn = conn
- self.root = home
- self.home = self.root
- self.total_size = int(total_size)
- self.cmd_file = None # 文件指令
- self.psize = 4096 # 文件分片
- def getdirsize(self,space):
- '''
- 计算磁盘空间大小
- :return:
- '''
- self.dirsize = 0
- for root,dirs,files in os.walk(space):
- self.dirsize += (sum([os.path.getsize(os.path.join(root,name))for name in files])/1024)
- return int(self.dirsize)
- def put(self):
- '''
- 上传文件
- :return:
- '''
- if self.cmd_file:
- self.user_space = int(self.getdirsize(self.root)/1024)
- # 组合接收字符串
- self.file_root = '%s\\%s'% (self.home,self.cmd_file)
- # # 获取文件名
- self.f =os.path.basename(self.file_root)
- if os.path.isdir(self.home):
- os.chdir(self.home)
- else:
- os.makedirs(self.home)
- os.chdir(self.home)
- try:
- self.conn.send(bytes("f_ack","utf8"))
- self.size = str(self.conn.recv(1024).decode()).split("|")
- if self.size[0]== "fsize":
- self.fss = int(self.size[1])
- self.f_total_size = int(self.user_space + (self.fss/1024/1024))
- if self.f_total_size < self.total_size: # 判断空间是否超额
- self.conn.send(bytes("f_ack_ready","utf8"))
- self.bsize = 0
- print("需要上传文件大小:",self.fss)
- # 打开文件
- f=open(self.f,'wb')
- while self.bsize < self.fss:
- data = self.conn.recv(self.psize)
- self.bsize += len(data)
- f.write(data)
- self.conn.send(bytes("ok","utf8"))
- print("实际已上传文件大小:",self.bsize)
- else:
- self.conn.send(bytes("上传空间不足!无法上传,你当前磁盘配额为%sM"%self.total_size,"utf8"))
- except Exception as ex:
- self.conn.send(bytes(ex,"utf8"))
- else:
- self.conn.send(bytes("请上传文件,文件不能为空","utf8"))
- def get(self):
- '''
- 下载文件
- :return:
- '''
- if self.cmd_file:
- os.chdir(self.home) # 进入用户根目录
- self.file = os.getcwd()+"\\"+ self.cmd_file
- if os.path.isfile(self.file):
- f = open(self.file, 'rb')
- self.fsize = os.path.getsize(self.file) # 获取要发送文件的大小
- self.conn.send(bytes("f_ack_read","utf8"))
- self.conn.recv(1000)
- print("需发送文件大小:",self.fsize)
- self.conn.send(bytes("fsize|%s"%self.fsize,"utf8")) # 发送文件大小及要发送准备完毕指令
- if self.conn.recv(1000).decode() == "f_ack": # 接收对方是否准备就绪
- self.fsize = int(self.fsize)
- self.size = 0
- ack =""
- while self.size < self.fsize and ack !="ok":
- data = f.read(self.fsize) # 一次读取分片大小4096
- self.conn.send(data)
- self.size += len(data)
- print("实际发送文件大小:",self.size)
- ack = self.conn.recv(1000).decode() # 接收客户端是否下载完指令
- self.conn.send(bytes("成功","utf8"))
- else:
- self.conn.send(bytes("接收失败","utf8"))
- else:
- self.conn.send(bytes("文件不存在","utf8"))
- else:
- self.conn.send(bytes("请输入文件名","utf8"))
- def dir(self):
- '''
- 查看文件
- :return:
- '''
- self.current_space =int(self.getdirsize(self.home))
- # 文件列表
- self.li = ""
- # 目录列表
- self.dl = ""
- try:
- os.chdir(self.home)
- except:
- os.makedirs(self.home)
- os.chdir(self.home)
- try:
- if os.listdir(os.getcwd()):
- for self.i in os.listdir(os.getcwd()):
- self.file = os.getcwd()+'\\'+self.i
- if os.path.isfile(self.file):
- # 获取文件大小
- self.fsize = int(os.path.getsize(self.file)/1024)
- if self.fsize < 1:
- self.fsize = 4
- else:
- self.fsize +=4
- self.li += '%s -rw-rw-rw- 占用大小:%skb\r\n'% (self.i,self.fsize)
- else:
- self.dl += '%s\r\n'%self.i
- self.conn.send(bytes("目录:\r\n\r\n%s 文件:\r\n%s\r\n \r\n当前目录空间大小:%skb"%(self.dl,self.li,self.current_space),"utf8"))
- else:
- self.conn.send(bytes("当前目录为:%s"%(self.home),"utf8"))
- except Exception as ex:
- self.conn.send(bytes(ex,"utf8"))
- def cd(self):
- '''
- 进入目录
- :return:
- '''
- if self.cmd_file:
- os.chdir(self.home) # 先进入到工作目录
- self.dir_change = os.path.abspath(os.path.join(self.home,"%s\%s"%(self.home,self.cmd_file)))
- if self.root in self.dir_change:
- try:
- os.chdir(self.dir_change)
- self.home = self.dir_change
- self.conn.send(bytes("当前工作目录为:%s"%self.home,"utf8"))
- except:
- os.makedirs(self.dir_change)
- os.chdir(self.dir_change)
- self.home = self.dir_change
- self.conn.send(bytes("当前工作目录为:%s"%self.home,"utf8"))
- else:
- self.conn.send(bytes("当前工作目录为:%s 更改失败!"%self.home,"utf8"))
- else:
- os.chdir(self.home)
- self.conn.send(bytes("当前工作目录为:%s"%self.home,"utf8"))
- def mkd(self):
- '''
- 创建目录
- :return:
- '''
- if self.cmd_file:
- try:
- os.makedirs(self.cmd_file)
- self.conn.send(bytes("创建目录成功!","utf8"))
- except Exception as ex:
- self.conn.send(bytes("创建目录失败!原因:%s"%ex,"utf8"))
- else:
- self.conn.send(bytes("请输入文件夹名!","utf8"))
- def delete(self):
- '''
- 删除文件
- :return:
- '''
- os.chdir(self.home) # 进入用户根目录
- try:
- self.file = self.home+'\\'+ self.cmd_file
- if os.path.isfile(self.file):
- os.remove(self.cmd_file)
- self.conn.send(bytes("文件:%s删除成功!"%self.cmd_file,"utf8"))
- else:
- os.removedirs(self.cmd_file)
- self.conn.send(bytes("目录删除成功!","utf8"))
- os.chdir(self.root)
- except Exception:
- if os.path.isdir(self.root):
- self.conn.send(bytes("删除失败!","utf8"))
- else:
- os.makedirs(self.root)
- self.conn.send(bytes("删除失败!","utf8"))
- def help(self):
- '''
- FTP帮助信息
- :return:
- '''
- self.conn.send(bytes("""
- FTP服务器操作方法有: put------>上传文件至服务器
- get------>从服务器上下载文件
- dir------>查看服务器文件列表
- cd------->进入指定文件夹
- delete--->删除文件
- mkd ----->创建目录
- help----->帮助信息
- q ------->退出
- ""","utf8"))
- def run(self):
- while True:
- # try:
- # # 接收客户端发来的命令信息
- self.cmd = self.conn.recv(1000)
- self.cmd_action = str(self.cmd.decode())
- # 判断命令是否含有空格
- self.fg = re.search("\s","%s"%self.cmd_action)
- if self.fg:
- self.cmd_action,self.cmd_file = str(self.cmd_action).split(" ")
- else:
- self.cmd_file =None
- # print("cmd_action:",self.cmd_action,"cmd_file:",self.cmd_file)
- if hasattr(FTPs,self.cmd_action):
- func = getattr(self,self.cmd_action)
- func()
- continue
- else:
- self.conn.send(b'command is not found!')
- continue
- # except Exception as ex:
- # print("系统异常:%s"%ex)
客户端
client.py:
- #!/usr/bin/env python3.5
- # -*-coding:utf8-*-
- import sys,os,re
- import socket,hashlib
- BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(BASEDIR)
- from core import file_handler
- from conf import setting
- def login():
- hash = hashlib.sha512()
- while True:
- user_input = input("请输入用户名:").strip()
- pass_input = input("请输入密码:").strip()
- if len(user_input) !=0 and len(pass_input) != 0:
- hash.update(bytes(pass_input,"utf8"))
- sha_pwd = hash.hexdigest()
- user = "%s|%s"% (user_input,sha_pwd)
- return user
- break
- def ftp_client():
- sk = socket.socket()
- sk.connect((setting.IP_PORT["host"],setting.IP_PORT["port"]))
- while True:
- flage = False
- sk.send(bytes("connect","utf8"))
- msg = sk.recv(100)
- print("欢迎访问FTP服务器,请根据提示进行操作")
- if msg.decode() == "login":
- while flage == False:
- login_user =login()
- username,password = str(login_user).split("|")
- sk.send(bytes(login_user,"utf8"))
- user_info = sk.recv(1000)
- if user_info.decode() == "login_ack":
- print("登陆成功!")
- flage = True
- break
- print(user_info.decode())
- while flage:
- cmd_action = input("请输入操作命令如:get fy.py or help :").strip()
- if len(cmd_action) == 0:continue
- if cmd_action == "q":
- sys.exit()
- # 判断命令是否含有空格
- fg = re.search("\s","%s"%cmd_action)
- if fg:
- cmd,cmd_file = str(cmd_action).split(" ")
- ftp = file_handler.ftpc(sk,username,cmd_action,setting.DATABASE["local"])
- if hasattr(ftp,cmd):
- func = getattr(ftp,cmd)
- func()
- continue
- else:
- cmd_file =None
- sk.send(bytes(cmd_action,"utf8"))
- rec_msg = sk.recv(8000)
- print(rec_msg.decode())
- if flage == "False":
- sk.send(bytes("connect","utf8"))
- sk.close()
- def run():
- ftp_client()
- if __name__ == "__main__":
- run()
file_handler.py:
- #!/usr/bin/env python3.5
- # -*-coding:utf8-*-
- import sys,os,re
- import socket
- BASEDIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
- sys.path.append(BASEDIR)
- class ftpc(object):
- def __init__(self,sk,username,cmd_action,home):
- self.sk = sk
- self.username = username
- self.cmd_action = cmd_action
- self.home = home
- def put(self):
- '''
- 上传文件
- :return:
- '''
- try:
- os.chdir(self.home)
- except:
- os.makedirs(self.home)
- os.chdir(self.home)
- # 判断命令是否含有空格
- fg = re.search("\s","%s"%self.cmd_action)
- if fg:
- self.cmd,self.cmd_file = str(self.cmd_action).split(" ")
- if os.path.isfile(os.getcwd()+"\\"+self.cmd_file):
- self.sk.send(bytes(self.cmd_action,"utf8")) # 发送动作命令
- rec_msg = self.sk.recv(8000)
- if rec_msg.decode() == "f_ack":
- f = open(self.cmd_file, 'rb')
- self.fsize = os.path.getsize(self.cmd_file) # 获取要发送文件的大小
- self.sk.send(bytes("fsize|%s"%self.fsize,"utf8")) # 发送文件大小
- self.ack = self.sk.recv(1000)
- if self.ack.decode() =="f_ack_ready":
- self.fsize = int(self.fsize)
- self.size = 0
- ack =""
- while self.size < self.fsize and ack !="ok":
- data = f.read(4095) # 一次读取分片大小4095
- self.sk.send(data)
- self.size += len(data)
- count = int(self.size/self.fsize*100)
- print('#'*count,"->",(count),"%")
- ack = self.sk.recv(1000).decode()
- if ack =="ok":
- print("上传成功")
- else:
- print("上传失败")
- else:
- print(self.ack.decode())
- else:
- print("上传文件失败:%s"%rec_msg.decode())
- else:
- print("上传文件失败,请输入正确的文件名!")
- else:
- print("上传文件失败,请输入正确的文件名!")
- def get(self):
- '''
- 下载文件
- :return:
- '''
- try:
- os.chdir(self.home)
- except:
- os.makedirs(self.home)
- os.chdir(self.home)
- # 判断命令是否含有空格
- fg = re.search("\s","%s"%self.cmd_action)
- if fg:
- self.cmd,self.cmd_file = str(self.cmd_action).split(" ")
- else:
- self.cmd_file =None
- self.sk.send(bytes(self.cmd_action,"utf8"))
- rec_msg = self.sk.recv(8000)
- if rec_msg.decode() == "f_ack_read":
- self.rec = self.sk.send(bytes("ok","utf8"))
- self.rec_size = self.sk.recv(2048)
- self.ack_rec= str(self.rec_size.decode()).split("|")
- self.sk.send(bytes("f_ack","utf8"))
- self.ack_s =int(self.ack_rec[1])
- print(self.ack_s)
- self.re_s = 0
- f = open(self.cmd_file,"wb")
- while self.re_s < self.ack_s:
- xx = self.re_s/self.ack_s*100
- data = self.sk.recv(4096)
- self.re_s += len(data)
- # print(data.decode("gbk"))
- f.write(data)
- count = int(xx)
- print('#'*count,"->",(count+1),"%")
- self.sk.send(bytes("ok","utf8"))
- print(self.re_s)
- self.ack_ok = self.sk.recv(1024)
- print("接收文件:%s"%self.ack_ok.decode())
- else:
- print("接收文件失败:%s"%rec_msg.decode())
如下是重要模块进行收藏:
OS模块
获取当前工作目录,即当前python脚本工作的目录路径
- os.getcwd()
改变当前脚本工作目录;相当于shell下cd
- os.chdir("dirname")
返回当前目录: ('.')
- os.curdir
获取当前目录的父目录字符串名:('..')
- os.pardir
可生成多层递归目录
- os.makedirs('dirname1/dirname2')
若目录为空,则删除,并递归到上一级目录,如若也为空,则删除,依此类推
- os.removedirs('dirname1')
生成单级目录;相当于shell中mkdir dirname
- os.mkdir('dirname')
删除单级空目录,若目录不为空则无法删除,报错;相当于shell中rmdir dirname
- os.rmdir('dirname')
列出指定目录下的所有文件和子目录,包括隐藏文件,并以列表方式打印
- os.listdir('dirname')
删除一个文件
- os.remove()
重命名文件/目录
- os.rename("oldname","newname")
获取文件/目录信息
- os.stat('path/filename')
输出操作系统特定的路径分隔符,win下为"\\",Linux下为"/"
- os.sep
输出当前平台使用的行终止符,win下为"\t\n",Linux下为"\n"
- os.linesep
输出用于分割文件路径的字符串
- os.pathsep
输出字符串指示当前使用平台。win->'nt'; Linux->'posix'
- os.name
运行shell命令,直接显示
- os.system("bash command")
获取系统环境变量
- os.environ
返回path规范化的绝对路径
- os.path.abspath(path)
将path分割成目录和文件名二元组返回
- os.path.split(path)
返回path的目录。其实就是os.path.split(path)的第一个元素
- os.path.dirname(path)
返回path最后的文件名。如何path以/或\结尾,那么就会返回空值。即os.path.split(path)的第二个元素
- os.path.basename(path)
如果path存在,返回True;如果path不存在,返回False
- os.path.exists(path)
如果path是绝对路径,返回True
- os.path.isabs(path)
如果path是一个存在的文件,返回True。否则返回False
- os.path.isfile(path)
如果path是一个存在的目录,则返回True。否则返回False
- os.path.isdir(path)
将多个路径组合后返回,第一个绝对路径之前的参数将被忽略
- os.path.join(path1[, path2[, ...]])
返回path所指向的文件或者目录的最后存取时间
- os.path.getatime(path)
返回path所指向的文件或者目录的最后修改时间
- os.path.getmtime(path)
sys模块
命令行参数List,第一个元素是程序本身路径
- sys.argv
退出程序,正常退出时exit(0)
- sys.exit(n)
获取Python解释程序的版本信息
- sys.version
最大的Int值
- sys.maxint
返回模块的搜索路径,初始化时使用PYTHONPATH环境变量的值
- sys.path
返回操作系统平台名称
- sys.platform
- sys.stdout.write('please:')
- val = sys.stdin.readline()[:-1]
re 模块
匹配格式
模式 | 描述 |
---|---|
^ | 匹配字符串的开头 |
$ | 匹配字符串的末尾。 |
. | 匹配任意字符,除了换行符,当re.DOTALL标记被指定时,则可以匹配包括换行符的任意字符。 |
[...] | 用来表示一组字符,单独列出:[amk] 匹配 'a','m'或'k |
[^...] | 不在[]中的字符:[^abc] 匹配除了a,b,c之外的字符。 |
re* | 匹配0个或多个的表达式。 |
re+ | 匹配1个或多个的表达式。 |
re? | 匹配0个或1个由前面的正则表达式定义的片段,非贪婪方式 |
re{ n} | |
re{ n,} | 精确匹配n个前面表达式。 |
re{ n, m} | 匹配 n 到 m 次由前面的正则表达式定义的片段,贪婪方式 |
a| b | 匹配a或b |
(re) | G匹配括号内的表达式,也表示一个组 |
(?imx) | 正则表达式包含三种可选标志:i, m, 或 x 。只影响括号中的区域。 |
(?-imx) | 正则表达式关闭 i, m, 或 x 可选标志。只影响括号中的区域。 |
(?: re) | 类似 (...), 但是不表示一个组 |
(?imx: re) | 在括号中使用i, m, 或 x 可选标志 |
(?-imx: re) | 在括号中不使用i, m, 或 x 可选标志 |
(?#...) | 注释. |
(?= re) | 前向肯定界定符。如果所含正则表达式,以 ... 表示,在当前位置成功匹配时成功,否则失败。但一旦所含表达式已经尝试,匹配引擎根本没有提高;模式的剩余部分还要尝试界定符的右边。 |
(?! re) | 前向否定界定符。与肯定界定符相反;当所含表达式不能在字符串当前位置匹配时成功 |
(?> re) | 匹配的独立模式,省去回溯。 |
\w | 匹配字母数字 |
\W | 匹配非字母数字 |
\s | 匹配任意空白字符,等价于 [\t\n\r\f]. |
\S | 匹配任意非空字符 |
\d | 匹配任意数字,等价于 [0-9]. |
\D | 匹配任意非数字 |
\A | 匹配字符串开始 |
\Z | 匹配字符串结束,如果是存在换行,只匹配到换行前的结束字符串。c |
\z | 匹配字符串结束 |
\G | 匹配最后匹配完成的位置。 |
\b | 匹配一个单词边界,也就是指单词和空格间的位置。例如, 'er\b' 可以匹配"never" 中的 'er',但不能匹配 "verb" 中的 'er'。 |
\B | 匹配非单词边界。'er\B' 能匹配 "verb" 中的 'er',但不能匹配 "never" 中的 'er'。 |
\n, \t, 等. | 匹配一个换行符。匹配一个制表符。等 |
\1...\9 | 匹配第n个分组的子表达式。 |
\10 | 匹配第n个分组的子表达式,如果它经匹配。否则指的是八进制字符码的表达式。 |
正则表达式常用5种操作
# 从头匹配
- re.match(pattern, string)
# 匹配整个字符串,直到找到一个匹配
- re.search(pattern, string)
# 将匹配到的格式当做分割点对字符串分割成列表
- re.split()
- >>>m = re.split("[0-9]", "alex1rain2jack3helen rachel8")
- >>>print(m)
输出:['alex', 'rain', 'jack', 'helen rachel', '']
# 找到所有要匹配的字符并返回列表格式
- re.findall()
- >>>m = re.findall("[0-9]", "alex1rain2jack3helen rachel8")
- >>>print(m)
输出:['1', '2', '3', '8']
# 替换匹配到的字符
- re.sub(pattern, repl, string, count,flag)
- m=re.sub("[0-9]","|", "alex1rain2jack3helen rachel8",count=2 )
- print(m)
输出:alex|rain|jack3helen rachel8
正则表达式实例
字符匹配
实例 | 描述 |
---|---|
python | 匹配 "python". |
字符类
实例 | 描述 |
---|---|
[Pp]ython | 匹配 "Python" 或 "python |
rub[ye] | 匹配 "ruby" 或 "rube |
[aeiou] | 匹配中括号内的任意一个字母 |
[0-9] | 匹配任何数字。类似于 [0123456789] |
[a-z] | 匹配任何小写字母 |
[A-Z] | 匹配任何大写字母 |
[a-zA-Z0-9] | 匹配任何字母及数字 |
[^aeiou] | 除了aeiou字母以外的所有字符 |
[^0-9] | 匹配除了数字外的字符 |
特殊字符类
实例 | 描述 |
---|---|
. | 匹配除 "\n" 之外的任何单个字符。要匹配包括 '\n' 在内的任何字符,请使用象 '[.\n]' 的模式。 |
\d | 匹配一个数字字符。等价于 [0-9]。 |
\D | 匹配一个非数字字符。等价于 [^0-9]。 |
\s | 匹配任何空白字符,包括空格、制表符、换页符等等。等价于 [ \f\n\r\t\v]。 |
\S | 匹配任何非空白字符。等价于 [^ \f\n\r\t\v]。 |
\w | 匹配包括下划线的任何单词字符。等价于'[A-Za-z0-9_]'。 |
\W | 匹配任何非单词字符。等价于 '[^A-Za-z0-9_]'。 |
re.match与re.search的区别
re.match只匹配字符串的开始,如果字符串开始不符合正则表达式,则匹配失败,函数返回None;而re.search匹配整个字符串,直到找到一个匹配。
希望本文所述对大家Python程序设计有所帮助。
来源: http://www.phperz.com/article/17/1022/351017.html