天天看点

使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警

前言

如果大家有在使用

混合部署

完全本地化

邮件服务器

(也包含 Linux 等其他平台的邮件服务器),或许很多人会经常遇到和我一样的问题:邮件服务器的公网 IP 被列入Spamhaus和类似的反垃圾邮件联盟的黑名单 导致业务部门发出去的邮件被对方

邮件网关

拦截或直接拒收。

背景

假设你有运维这样的一个场景,这里以

Exchange

平台为例(相信很多企业都是使用该场景):

使用的是混合部署环境,并且:

发信邮件流

是本地 Exchange ⇒O365 Exchange Online;

收信邮件流

是本地邮件网关 ⇒本地 Exchange 服务器⇒O365用户邮箱

引用微软工程师的邮件内容: O365的EOP会参考Spamhaus,connection filter中的 Allow list不能跳过spamhaus的SBL和XBL的检测 ,目前列入的PBL,可以考虑加入连接筛选器的白名单。

Spamhaus是一个公平/权威的三方平台,为组织和企业提供实时的IP阻止列表。Office 365的EOP会实时参照Spamhaus的阻止列表来确定是否需要拒绝从某些地方发进来的邮件。

Spamhaus的阻止列表有好几种,如SBL,XBL.PBL,DBL。SBL列入的是发垃圾邮件的IP地址列表,XBL是因发一些恶意软件或者病毒而被列入的阻止列表,PBL是因为某些组织要求的阻止列表,DBL是基于域名的阻止列表。具体的这些阻止列表信息,您可以访问The Spamhaus Project 查看原因。

因此,当您收到IP被放入Spamhaus的阻止列表,到Spamhaus查询的时候,可以看到这个IP被列在哪个阻止列表中。从而可以知道这个IP为什么会被加到阻止列表。

由于Spamhaus是三方的组织平台,因此,微软没有办法监测或者有提醒功能。而且,考虑到来自阻止IP列表邮件的危险性,为了我们的客户考虑,EOP会在连接层面就断掉来自该IP的连接。邮件并没有进入EOP被O365收下来,因此,您在Exchange online管理中心里设置黑白名单不会生效。

因此,如果管理员不能及时发现 IP 被列入黑名单,你的业务、销售部门此时还在不停的外发邮件。一般在 12-24小时左右,你的邮件公网 IP 会被微软列入黑名单,这个时候即便你在

Exchange Online

上将本地的公网 IP 添加到连接器的白名单,你的用户收发邮件仍然会收到影响。

解决方法

一、监控 Exchange 传输日志

可以通过

ELK

Splunk

这类的工具监控 Exchange 的传输日志,当出现关键词

*550 5.7.1 Message rejected as spam by Content Filtering*

550 5.7.1 Service unavailable;Client host[IP] blocked using Spamhaus

就发出告警通知邮件管理员

上述方法有个缺点,即当你通过该日志发现了问题,说明你的邮件服务器公网 IP 已经被列入黑名单有一段时间了,并且已经影响了你的生产环境。

二、通过Python的第三方Pydnsbl模块进行实时监控

优点 当你的 IP 被列入 Spamhaus 黑名单后,会立刻被查询到,及时通知管理员并进行相应处理,就不会出现长时间未处理,导致微软同步 Spamhaus策略后整个公司的邮件都受到收发影响

官方文档

使用方法:

通过官方介绍,其使用方法非常简单,只要

Python

版本大于等于3.5版本即可。接下来我会基于这个方法做个详细的使用介绍,便于后期参考,也方便和我一样有类似需求的人查阅。

使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警

优化

有了这款工具和使用方法,接下来就要结合使用场景,来实现自动检查、自动触发告警。

由于我本人对 Python 不是很熟悉,只能根据自己的需求编写符合需求的脚本,如有不妥或可以优化、改进的地方,可留言赐教,谢谢!

#!/usr/bin/python3.7
# encoding: utf-8

import urllib.request
import json
import pydnsbl
import ssl
ssl._create_default_https_context = ssl._create_unverified_context


TOKEN_URL = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
# 企业的id,在管理端->"我的企业" 可以看到
# CORP_ID = "CORP_ID"
CORP_ID = "CORP_ID****"
# 某个自建应用的id及secret, 在管理端 -> 企业应用 -> 自建应用, 点进相应应用可以看到
APP_ID = "APP_ID*****"
CORP_SECRET = "**********"

class Wechat(object):
    "send monitor message by wechat"

    def __init__(self):
        self.CORP_ID = CORP_ID
        self.CORP_SECRET = CORP_SECRET
        self.APP_ID = APP_ID
        self.BASEURL = 'https://qyapi.weixin.qq.com/cgi-bin/'
        self.TOKEN_URL = 'gettoken?corpid={0}&corpsecret={1}'.format(
            self.CORP_ID, self.CORP_SECRET)

    # 获取认证 token
    def Get_Token(self):
        try:
            response = urllib.request.urlopen(
                '{0}{1}'.format(self.BASEURL, self.TOKEN_URL))
            access_token = json.loads(
                response.read().decode('utf-8'))['access_token']
            with open('token', 'w') as f:
                f.write(access_token)
        except KeyError:
            raise KeyError
        return access_token

    def checker(self):
        Result = []
        ip_checker = pydnsbl.DNSBLIpChecker()
        Result1 = ip_checker.check('68.128.212.240')  # Exchange Server 01
        Result2 = ip_checker.check('68.128.212.241')  # Exchange Server 02
        Result3 = ip_checker.check('68.128.212.242')  # Exchange Server 03
        Result.append(Result1)
        Result.append(Result2)
        Result.append(Result3)
        # domain_checker = pydnsbl.DNSBLDomainChecker()
        # Result4 = domain_checker.check('luxiu2.com')  # Domain Name 01
        # Result5 = domain_checker.check('videour.com')  # Domain Name 02
        # Result6 = domain_checker.check('jesdoit.com')  # Domain Name 03
        # Result.append(Result4)
        # Result.append(Result5)
        # Result.append(Result6)
        return Result

    # 本地 token
    def Local_Token(self):
        try:
            with open('token', 'r') as f:
                token = f.readline().strip()
                if token == '':
                    token = self.Get_Token()
                    return token
                else:
                    return token
        except IOError:
            token = self.Get_Token()
            return token

    # 获取报警人员名单
    def Get_User(self, dep_id=1, fchild=1):
        #token = self.Get_Token()
        token = self.Local_Token()
        send_url = '{0}user/list?access_token={1}&department_id={2}&fetch_child{3}'.format(
            self.BASEURL, token, dep_id, fchild,)
        respone = urllib.request.urlopen(url=send_url).read()
        stat = json.loads(respone)['userlist']
        user = ''
        for k in stat:
            user += '{0} '.format(k['mobile'])
        mobile = ','.join(user.split())
        with open('user.txt', 'w') as f:
            f.write(mobile)

    # 发送报警信息
    def Send_Message(self, content):
        self.content = {
            # "touser":  "User01|User02|User03",   # 成员, @all及所有人  "UserID1|UserID2|UserID3",//企业微信的唯一userid,非必输
            "touser": 'jasonhuang',
            # "toparty": '1',                          # 部门,@all 及所有部门  "PartyID1|PartyID2",//部门id,非必输,如果输入了就只给指定部门发送消息
            "msgtype": 'text',                       # 消息类型,文本,图片
            "agentid": self.APP_ID,                  # 企业应用 id
            "safe": "0",                             #
            "text": {
                    "content": content                   # 报警内容
            }
        }
        token = self.Local_Token()
        # 构建告警信息,必须是 json 格式
        msg = messages_content = json.dumps(self.content)
        send_url = '{0}message/send?access_token={1}'.format(
            self.BASEURL, token)
        respone = urllib.request.urlopen(
            url=send_url, data=msg.encode("utf-8")).read()
        stat = json.loads(respone.decode())['errcode']
        if stat == 0:
            print('Succesfully Send To Wechat')
        else:
            token = self.Get_Token()
            send_url = '{0}message/send?access_token={1}'.format(
                self.BASEURL, token)
            respone = urllib.request.urlopen(url=send_url, data=msg).read()
            return respone


if __name__ == '__main__':
    msgs = []
    msg = Wechat().checker()
    for i in msg:
        j = str(i)
        if j.find('BLACKLISTED') != -1:
            msgs.append(j)
    if msgs:
        msgsend = str(msgs)
        wechat = Wechat()
        wechat.Send_Message('邮件服务器公网IP状态告警:'+msgsend)
        print('Error'+msgsend)
    else:
        print('All Exchange Server Internet IP Status are Normal')
           

我在脚本第

46-48

行中监控了 3 台服务器的公网 IP 在

Spamhaus

的状态(*[font color="#B22222"]为测试效果,这些 IP 为网上找的垃圾邮件服务器公网 IP[/font]*)

使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警

验证

这里我们可以看到这 3 个 IP 均被提示异常,接下来我们去

Spamhaus

官网手动查询下结果看是否一致。

访问Lookup - Reputation Checker - Spamhaus并输入

68.128.212.240

进行查询,结果如下:

使用Python监控邮件服务器公网IP是否被列入黑名单并及时告警

继续阅读