- import os
- import smtplib
- import zipfile
- from datetime import datetime, timedelta
- from email import encoders
- from email.header import Header
- from email.mime.base import MIMEBase
- from email.mime.multipart import MIMEMultipart
- from email.mime.text import MIMEText
- import mysql.connector
- import xlwt
- mysql_host = '' # TODO 请设置数据库地址
- mysql_port = 3306 # TODO 请设置数据库端口
- mysql_user = '' # TODO 请设置数据库用户名
- mysql_password = '' # TODO 请设置数据库密码
- class SQL(object):
- def __init__(self, name, params=None, formula=None):
- self._name = name
- self._params = params
- self.formula = formula
- with open(os.path.join('sql', name + '.sql'), 'r', encoding='utf-8') as f:
- self._sql = f.read()
- @property
- def filename(self):
- return self._name
- @property
- def sql(self):
- return self._sql
- @property
- def params(self):
- return self._params
- class SQL1(SQL):
- @property
- def filename(self):
- today = datetime.now().strftime('%Y-%m-%d')
- return today + self._name + '.xls'
- class SQL2(SQL):
- @property
- def filename(self):
- today = datetime.now().strftime('%Y-%m-%d')
- return today + self._name + '.xls'
- @property
- def sql(self):
- today = datetime.now()
- return self._sql % tuple([(today - timedelta(days=i)).strftime('%Y%m%d') for i in range(0, 7)])
- def export(SQL, d='.'):
- conn = mysql.connector.connect(host=mysql_host, port=mysql_port, database='wifi', user=mysql_user,
- password=mysql_password)
- cursor = conn.cursor()
- cursor.execute(SQL.sql, SQL.params)
- book = xlwt.Workbook()
- sheet = book.add_sheet('sheet1')
- for i, val in enumerate(cursor.column_names):
- sheet.write(0, i, val)
- rows = cursor.fetchall()
- for i, row in enumerate(rows):
- r = sheet.row(i + 1)
- for j, v in enumerate(row):
- r.write(j, v)
- if SQL.formula:
- sheet.write(len(rows) + 1, len(cursor.column_names), xlwt.Formula(SQL.formula))
- book.save(os.path.join(d, SQL.filename))
- def export_all(sqls, dir='.'):
- if not sqls or len(sqls) <1:
- return
- for item in sqls:
- try:
- export(item, dir)
- except Exception as e:
- print(e)
- def do_export(d='.'):
- export_all([
- SQL1('SQL 文件 1'),
- SQL2('SQL 文件 2', formula='SUM($C$2:$C$100)'),
- ], d)
- def zip_dir(d):
- f = zipfile.ZipFile(d + '.zip', 'w', zipfile.ZIP_DEFLATED)
- for *_, filenames in os.walk(d):
- for filename in filenames:
- f.write(os.path.join(d, filename))
- f.close()
- def send_email(sender, sender_pass, receiver, filename):
- msg = MIMEMultipart()
- msg['From'] = sender
- msg['To'] = receiver
- msg['Subject'] = Header(filename, 'utf-8').encode()
- # 邮件正文是 MIMEText:
- msg.attach(MIMEText('统计数据 %s, 请查看附件' % filename, 'plain', 'utf-8'))
- # 添加附件就是加上一个 MIMEBase, 从本地读取一个图片:
- with open(filename, 'rb') as f:
- # 设置附件的 MIME 和文件名, 这里是 png 类型:
- mime = MIMEBase('application', 'octet-stream', filename=filename)
- # 加上必要的头信息:
- mime.add_header('Content-Disposition', 'attachment', filename=filename)
- mime.add_header('Content-ID', '<0>')
- mime.add_header('X-Attachment-Id', '0')
- # 把附件的内容读进来:
- mime.set_payload(f.read())
- # 用 Base64 编码:
- encoders.encode_base64(mime)
- # 添加到 MIMEMultipart:
- msg.attach(mime)
- smtp_server = 'smtp.qiye.163.com'
- server = smtplib.SMTP(smtp_server, 25) # SMTP 协议默认端口是 25
- server.login(sender, sender_pass)
- server.sendmail(sender, [receiver], msg.as_string())
- server.quit()
- def export_and_send():
- sender = '' # TODO 请输入发送者邮箱, 例如 mawei@ssgm.net
- sender_password = '' # TODO 请输入发送者邮箱密码
- receiver = '' # TODO 请输入接收者邮箱, 例如 ssgmlihong@ssgm.net
- d = datetime.now().strftime('%Y-%m-%d') + '统计数据'
- if not os.path.exists(d):
- os.mkdir(d)
- do_export(d)
- zip_dir(d)
- send_email(sender, sender_password, receiver, d + '.zip')
- if __name__ == '__main__':
- export_and_send()
来源: https://juejin.im/entry/5b8ce11351882554ae4bd841