mpython 使用smtp发送邮件时ussl库未找到的解决方案
MicroPython新版本中已经将ussl改名为ssl,但不知道为什么官方提供的umail.py没改把掌控板中的umail替换为以下代码即可:
import usocket
import gc
import ssl# 替换 ussl 为 ssl
DEFAULT_TIMEOUT = 10# sec
LOCAL_DOMAIN = '127.0.0.1'
CMD_EHLO = 'EHLO'
CMD_STARTTLS = 'STARTTLS'
CMD_AUTH = 'AUTH'
CMD_MAIL = 'MAIL'
AUTH_PLAIN = 'PLAIN'
AUTH_LOGIN = 'LOGIN'
# 加密过程
def encrypt_string(message):
encode_result = ""
for char in message:
char_int = ord(char)
if char.isalpha():# 判断是否为字母
if 64 < char_int < 78 or 96 < char_int < 110:# 针对其中的部分字母进行加密
encode_result += "00" + str((char_int + 13) * 2) + "|"
else:# 对剩下字母进行加密
encode_result += "01" + str(char_int - 23) + "|"
elif '\u4e00' <= char <= '\u9fff':# 单个汉字可以这么判断
encode_result += "02" + str(char_int + 24) + "|"
else:# 对数字、特殊字符进行加密
encode_result += "03" + str(char_int) + "|"
return encode_result
# 解密过程
def decrypt_string(message):
decode_result = ""
# 将message转换为list
message_list = message.split("|")
message_list.remove("")# 移除list中的空元素
for i in message_list:
type_ = i[:2]
char_number = int(i)
if type_ == "00":
char_number = int(char_number / 2 - 13)
elif type_ == "01":
char_number = char_number + 23
elif type_ == "02":
char_number = char_number - 24
else:
char_number = char_number
decode_result += chr(char_number)
return decode_result
DEFAULT_EMAIL = 'zhangkongban@163.com'
DEFAULT_PASSWORD = encrypt_string('NTJTTHERKCSJFEXM')
class SMTP:
def cmd(self, cmd_str):
sock = self._sock
sock.write('%s\r\n' % cmd_str)
resp = []
next = True
while next:
code = sock.read(3)
if not code:# 添加超时或断开连接检查
break
next = sock.read(1) == b'-'
line = sock.readline().strip()
if line:
resp.append(line.decode())
return int(code), resp
def __init__(self, host, port, ssl_mode=False, username=None, password=None):
self.username = username
addr = usocket.getaddrinfo(host, port)[-1]
sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
sock.settimeout(DEFAULT_TIMEOUT)
sock.connect(addr)
# 使用 ssl 替代 ussl
if ssl_mode:
sock = ssl.wrap_socket(sock)
code = int(sock.read(3))
sock.readline()# 消耗剩余响应
self._sock = sock
# 读取初始响应(如果不是 SSL 模式)
if not ssl_mode:
code = int(sock.read(3))
sock.readline()# 消耗剩余响应
assert code == 220, '无法连接到服务器 %d' % code
code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN)
assert code == 250, 'EHLO 失败 %d' % code
# 支持 STARTTLS
if not ssl_mode and any(CMD_STARTTLS in r for r in resp):
code, resp = self.cmd(CMD_STARTTLS)
assert code == 220, 'STARTTLS 失败 %d, %s' % (code, resp)
self._sock = ssl.wrap_socket(sock)
if username and password:
self.login(username, password)
def login(self, username, password):
self.username = username
code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN)
assert code == 250, 'EHLO 错误 %d, %s' % (code, resp)
auths = None
for feature in resp:
if feature[:4].upper() == CMD_AUTH:
auths = feature.strip('=').upper().split()
assert auths is not None, "服务器不支持认证方式"
from ubinascii import b2a_base64 as b64
if AUTH_PLAIN in auths:
creds = "\0%s\0%s" % (username, password)
cren = b64(creds.encode()).decode().strip()
code, resp = self.cmd('%s %s %s' % (CMD_AUTH, AUTH_PLAIN, cren))
elif AUTH_LOGIN in auths:
user_b64 = b64(username.encode()).decode().strip()
code, resp = self.cmd("%s %s %s" % (CMD_AUTH, AUTH_LOGIN, user_b64))
assert code == 334, '用户名错误 %d, %s' % (code, resp)
pass_b64 = b64(password.encode()).decode().strip()
code, resp = self.cmd(pass_b64)
else:
raise Exception("认证方式(%s)不被支持" % ', '.join(auths))
assert code in (235, 503), '认证错误 %d, %s' % (code, resp)
return code, resp
def to(self, addrs, mail_from=None):
mail_from = self.username if mail_from is None else mail_from
code, resp = self.cmd('MAIL FROM: <%s>' % mail_from)
assert code == 250, '发件人拒绝 %d, %s' % (code, resp)
if isinstance(addrs, str):
addrs =
count = 0
for addr in addrs:
code, resp = self.cmd('RCPT TO: <%s>' % addr)
if code not in (250, 251):
print('%s 拒绝, %s' % (addr, resp))
count += 1
assert count != len(addrs), '收件人全部拒绝, %d, %s' % (code, resp)
code, resp = self.cmd('DATA')
assert code == 354, '数据拒绝, %d, %s' % (code, resp)
return code, resp
def write(self, content):
if isinstance(content, str):
content = content.encode('utf-8')
self._sock.write(content)
def send(self, content=''):
if content:
self.write(content)
self._sock.write(b'\r\n.\r\n')# 结束标记
code = int(self._sock.read(3))
line = self._sock.readline().decode().strip()
return code, line
def quit(self):
try:
self.cmd("QUIT")
finally:
self._sock.close()
def send_email(myusername, mypassword, target_email, SMTP_SERVER, subject, text):
server_map = {
1: ('smtp.office365.com', 587),
2: ('smtp.qq.com', 587),
3: ('smtp.126.com', 25),
4: ('smtp.163.com', 25)
}
if SMTP_SERVER in server_map:
host, port = server_map
ssl_mode = (port == 465)# 465端口使用SSL
else:
raise ValueError("无效的SMTP服务器选择")
if myusername == DEFAULT_EMAIL:
mypassword = decrypt_string(mypassword)
try:
gc.collect()
smtp = SMTP(host, port, ssl_mode=ssl_mode)
smtp.login(myusername, mypassword)
smtp.to(target_email)
# 邮件头
headers = [
"From: 掌控板 <{}>".format(myusername),
"To: <{}>".format(target_email),
"Subject: {}".format(subject),
""
]
for header in headers:
smtp.write(header + "\r\n")
smtp.write(text + "\r\n")
code, resp = smtp.send()
smtp.quit()
if code == 250:
print("邮件发送成功!")
else:
print("邮件发送失败! 错误代码: {}, 响应: {}".format(code, resp))
except Exception as e:
print("邮件发送失败! 错误详情:")
import sys
sys.print_exception(e)
raise e实测有效
页:
[1]