ok0k 发表于 5 天前

mpython 使用smtp发送邮件时ussl库未找到的解决方案

MicroPython新版本中已经将ussl改名为ssl,但不知道为什么官方提供的umail.py没改
把掌控板中的umail替换为以下代码即可:
import usocket
import gc
import ssl# 替换 ussl 为 ssl

DEFAULT_TIMEOUT = 10# sec
LOCAL_DOMAIN = '127.0.0.1'
CMD_EHLO = 'EHLO'
CMD_STARTTLS = 'STARTTLS'
CMD_AUTH = 'AUTH'
CMD_MAIL = 'MAIL'
AUTH_PLAIN = 'PLAIN'
AUTH_LOGIN = 'LOGIN'

# 加密过程
def encrypt_string(message):
    encode_result = ""
    for char in message:
      char_int = ord(char)
      if char.isalpha():# 判断是否为字母
            if 64 < char_int < 78 or 96 < char_int < 110:# 针对其中的部分字母进行加密
                encode_result += "00" + str((char_int + 13) * 2) + "|"
            else:# 对剩下字母进行加密
                encode_result += "01" + str(char_int - 23) + "|"

      elif '\u4e00' <= char <= '\u9fff':# 单个汉字可以这么判断
            encode_result += "02" + str(char_int + 24) + "|"
      else:# 对数字、特殊字符进行加密
            encode_result += "03" + str(char_int) + "|"

    return encode_result

# 解密过程
def decrypt_string(message):
    decode_result = ""

    # 将message转换为list
    message_list = message.split("|")
    message_list.remove("")# 移除list中的空元素

    for i in message_list:
      type_ = i[:2]
      char_number = int(i)
      if type_ == "00":
            char_number = int(char_number / 2 - 13)
      elif type_ == "01":
            char_number = char_number + 23
      elif type_ == "02":
            char_number = char_number - 24
      else:
            char_number = char_number

      decode_result += chr(char_number)

    return decode_result

DEFAULT_EMAIL = 'zhangkongban@163.com'
DEFAULT_PASSWORD = encrypt_string('NTJTTHERKCSJFEXM')

class SMTP:
    def cmd(self, cmd_str):
      sock = self._sock
      sock.write('%s\r\n' % cmd_str)
      resp = []
      next = True
      while next:
            code = sock.read(3)
            if not code:# 添加超时或断开连接检查
                break
            next = sock.read(1) == b'-'
            line = sock.readline().strip()
            if line:
                resp.append(line.decode())
      return int(code), resp

    def __init__(self, host, port, ssl_mode=False, username=None, password=None):
      self.username = username
      addr = usocket.getaddrinfo(host, port)[-1]
      sock = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
      sock.settimeout(DEFAULT_TIMEOUT)
      sock.connect(addr)
      
      # 使用 ssl 替代 ussl
      if ssl_mode:
            sock = ssl.wrap_socket(sock)
            code = int(sock.read(3))
            sock.readline()# 消耗剩余响应
      
      self._sock = sock
      
      # 读取初始响应(如果不是 SSL 模式)
      if not ssl_mode:
            code = int(sock.read(3))
            sock.readline()# 消耗剩余响应
            assert code == 220, '无法连接到服务器 %d' % code

      code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN)
      assert code == 250, 'EHLO 失败 %d' % code
      
      # 支持 STARTTLS
      if not ssl_mode and any(CMD_STARTTLS in r for r in resp):
            code, resp = self.cmd(CMD_STARTTLS)
            assert code == 220, 'STARTTLS 失败 %d, %s' % (code, resp)
            self._sock = ssl.wrap_socket(sock)

      if username and password:
            self.login(username, password)

    def login(self, username, password):
      self.username = username
      code, resp = self.cmd(CMD_EHLO + ' ' + LOCAL_DOMAIN)
      assert code == 250, 'EHLO 错误 %d, %s' % (code, resp)

      auths = None
      for feature in resp:
            if feature[:4].upper() == CMD_AUTH:
                auths = feature.strip('=').upper().split()
      assert auths is not None, "服务器不支持认证方式"

      from ubinascii import b2a_base64 as b64
      if AUTH_PLAIN in auths:
            creds = "\0%s\0%s" % (username, password)
            cren = b64(creds.encode()).decode().strip()
            code, resp = self.cmd('%s %s %s' % (CMD_AUTH, AUTH_PLAIN, cren))
      elif AUTH_LOGIN in auths:
            user_b64 = b64(username.encode()).decode().strip()
            code, resp = self.cmd("%s %s %s" % (CMD_AUTH, AUTH_LOGIN, user_b64))
            assert code == 334, '用户名错误 %d, %s' % (code, resp)
            pass_b64 = b64(password.encode()).decode().strip()
            code, resp = self.cmd(pass_b64)
      else:
            raise Exception("认证方式(%s)不被支持" % ', '.join(auths))

      assert code in (235, 503), '认证错误 %d, %s' % (code, resp)
      return code, resp

    def to(self, addrs, mail_from=None):
      mail_from = self.username if mail_from is None else mail_from
      code, resp = self.cmd('MAIL FROM: <%s>' % mail_from)
      assert code == 250, '发件人拒绝 %d, %s' % (code, resp)

      if isinstance(addrs, str):
            addrs =
      count = 0
      for addr in addrs:
            code, resp = self.cmd('RCPT TO: <%s>' % addr)
            if code not in (250, 251):
                print('%s 拒绝, %s' % (addr, resp))
                count += 1
      assert count != len(addrs), '收件人全部拒绝, %d, %s' % (code, resp)

      code, resp = self.cmd('DATA')
      assert code == 354, '数据拒绝, %d, %s' % (code, resp)
      return code, resp

    def write(self, content):
      if isinstance(content, str):
            content = content.encode('utf-8')
      self._sock.write(content)

    def send(self, content=''):
      if content:
            self.write(content)
      self._sock.write(b'\r\n.\r\n')# 结束标记
      code = int(self._sock.read(3))
      line = self._sock.readline().decode().strip()
      return code, line

    def quit(self):
      try:
            self.cmd("QUIT")
      finally:
            self._sock.close()

def send_email(myusername, mypassword, target_email, SMTP_SERVER, subject, text):
    server_map = {
      1: ('smtp.office365.com', 587),
      2: ('smtp.qq.com', 587),
      3: ('smtp.126.com', 25),
      4: ('smtp.163.com', 25)
    }
   
    if SMTP_SERVER in server_map:
      host, port = server_map
      ssl_mode = (port == 465)# 465端口使用SSL
    else:
      raise ValueError("无效的SMTP服务器选择")

    if myusername == DEFAULT_EMAIL:
      mypassword = decrypt_string(mypassword)

    try:
      gc.collect()
      smtp = SMTP(host, port, ssl_mode=ssl_mode)
      smtp.login(myusername, mypassword)
      smtp.to(target_email)
      
      # 邮件头
      headers = [
            "From: 掌控板 <{}>".format(myusername),
            "To: <{}>".format(target_email),
            "Subject: {}".format(subject),
            ""
      ]
      
      for header in headers:
            smtp.write(header + "\r\n")
      
      smtp.write(text + "\r\n")
      code, resp = smtp.send()
      smtp.quit()
      
      if code == 250:
            print("邮件发送成功!")
      else:
            print("邮件发送失败! 错误代码: {}, 响应: {}".format(code, resp))
    except Exception as e:
      print("邮件发送失败! 错误详情:")
      import sys
      sys.print_exception(e)
      raise e实测有效
页: [1]
查看完整版本: mpython 使用smtp发送邮件时ussl库未找到的解决方案