天天看點

python 解析 crontab配置

接觸python一段時間了,最近要用py做個 監控功能,需要解析crontab中的配置資訊, 本想偷懶一下,直接 百度/谷哥出來,無奈半天沒找着,隻好自己寫一個,實作代碼及使用 執行個體如下,望各位路過的大蝦大神不吝賜教,能指點得到更優的處理辦法:

#/usr/bin/env python

#_*_coding:utf-8_*_

# Copyright (c) 2013 stephen <[email protected]>

# All rights reserved

"""

1.解析 crontab 配置檔案中的五個數間參數(分 時 日 月 周),擷取他們對應的取值範圍

2.将時間戳與crontab配置中一行時間參數對比,判斷該時間戳是否在配置設定的時間範圍内

"""

#$Id $

import re, time, sys

def get_struct_time(time_stamp_int):

____"""

____按整型時間戳擷取格式化時間 分 時 日 月 周

____Args:

________time_stamp_int 為傳入的值為時間戳(×××),如:1332888820

________經過localtime轉換後變成

________time.struct_time(tm_year=2012, tm_mon=3, tm_mday=28, tm_hour=6, tm_min=53, tm_sec=40, tm_wday=2, tm_yday=88, tm_isdst=0)

____Return:

________list____傳回 分 時 日 月 周

____"""

____st_time = time.localtime(time_stamp_int)

____return [st_time.tm_min, st_time.tm_hour, st_time.tm_mday, st_time.tm_mon, st_time.tm_wday]

def get_strptime(time_str, str_format):

____"""從字元串擷取 整型時間戳

____Args:

________time_str 字元串類型的時間戳 如 '31/Jul/2013:17:46:01'

________str_format 指定 time_str 的格式 如 '%d/%b/%Y:%H:%M:%S'

____Return:

________傳回10位整型(int)時間戳,如 1375146861

____"""

____return int(time.mktime(time.strptime(time_str, str_format)))

def get_str_time(time_stamp, str_format='%Y%m%d%H%M'):

____"""

____擷取時間戳,

____Args:

________time_stamp 10位整型(int)時間戳,如 1375146861

________str_format 指定傳回格式,值類型為 字元串 str

____Rturn:

________傳回格式 預設為 年月日時分,如2013年7月9日1時3分 :201207090103

____"""

____return time.strftime("%s" % str_format, time.localtime(time_stamp))

def match_cont(patten, cont):

____"""

____正則比對(精确符合的比對)

____Args:

________patten 正規表達式

________cont____ 比對内容

____Return:

________True or False

____"""

____res = re.match(patten, cont)

____if res:

________return True

____else:

________return False

def handle_num(val, ranges=(0, 100), res=list()):

____"""處理純數字"""

____val = int(val)

____if val >= ranges[0] and val <= ranges[1]:

________res.append(val)

____return res

def handle_nlist(val, ranges=(0, 100), res=list()):

____"""處理數字清單 如 1,2,3,6"""

____val_list = val.split(',')

____for tmp_val in val_list:

________tmp_val = int(tmp_val)

________if tmp_val >= ranges[0] and tmp_val <= ranges[1]:

____________res.append(tmp_val)

____return res

def handle_star(val, ranges=(0, 100), res=list()):

____"""處理星号"""

____if val == '*':

________tmp_val = ranges[0]

________while tmp_val <= ranges[1]:

____________res.append(tmp_val)

____________tmp_val = tmp_val + 1

____return res

def handle_starnum(val, ranges=(0, 100), res=list()):

____"""星号/數字 組合 如 */3"""

____tmp = val.split('/')

____val_step = int(tmp[1])

____if val_step < 1:

________return res

____val_tmp = int(tmp[1])

____while val_tmp <= ranges[1]:

________res.append(val_tmp)

________val_tmp = val_tmp + val_step

____return res

def handle_range(val, ranges=(0, 100), res=list()):

____"""處理區間 如 8-20"""

____tmp = val.split('-')

____range1 = int(tmp[0])

____range2 = int(tmp[1])

____tmp_val = range1

____if range1 < 0:

________return res

____while tmp_val <= range2 and tmp_val <= ranges[1]:

________res.append(tmp_val)

________tmp_val = tmp_val + 1

____return res

def handle_rangedv(val, ranges=(0, 100), res=list()):

____"""處理區間/步長 組合 如 8-20/3 """

____tmp = val.split('/')

____range2 = tmp[0].split('-')

____val_start = int(range2[0])

____val_end = int(range2[1])

____val_step = int(tmp[1])

____if (val_step < 1) or (val_start < 0):

________return res

____val_tmp = val_start

____while val_tmp <= val_end and val_tmp <= ranges[1]:

________res.append(val_tmp)

________val_tmp = val_tmp + val_step

____return res

def parse_conf(conf, ranges=(0, 100), res=list()):

____"""解析crontab 五個時間參數中的任意一個"""

____#去除空格,再拆分

____conf = conf.strip(' ').strip(' ')

____conf_list = conf.split(',')

____other_conf = []

____number_conf = []

____for conf_val in conf_list:

________if match_cont(PATTEN['number'], conf_val):

____________#記錄拆分後的純數字參數

____________number_conf.append(conf_val)

________else:

____________#記錄拆分後純數字以外的參數,如通配符 * , 區間 0-8, 及 0-8/3 之類

____________other_conf.append(conf_val)

____if other_conf:

________#處理純數字外各種參數

________for conf_val in other_conf:

____________for key, ptn in PATTEN.items():

________________if match_cont(ptn, conf_val):

____________________res = PATTEN_HANDLER[key](val=conf_val, ranges=ranges, res=res)

____if number_conf:

________if len(number_conf) > 1 or other_conf:

____________#純數字多于1,或純數字與其它參數共存,則數字作為時間清單

____________res = handle_nlist(val=','.join(number_conf), ranges=ranges, res=res)

________else:

____________#隻有一個純數字存在,則數字為時間 間隔

____________res = handle_num(val=number_conf[0], ranges=ranges, res=res)

____return res

def parse_crontab_time(conf_string):

____"""

____解析crontab時間配置參數

____Args:

________conf_string____ 配置内容(共五個值:分 時 日 月 周)

________________________ 取值範圍 分鐘:0-59 小時:1-23 日期:1-31 月份:1-12 星期:0-6(0表示周日)

____Return:

________crontab_range____list格式,分 時 日 月 周 五個傳入參數分别對應的取值範圍

____"""

____time_limit____= ((0, 59), (1, 23), (1, 31), (1, 12), (0, 6))

____crontab_range = []

____clist________ = []

____conf_length = 5

____tmp_list____ = conf_string.split(' ')

____for val in tmp_list:

________if len(clist) == conf_length:

____________break

________if val:

____________clist.append(val)

____

____if len(clist) != conf_length:

________return -1, 'config error whith [%s]' % conf_string

____cindex = 0

____for conf in clist:

________res_conf = []

________res_conf = parse_conf(conf, ranges=time_limit[cindex], res=res_conf)

________if not res_conf:

____________return -1, 'config error whith [%s]' % conf_string

________crontab_range.append(res_conf)

________cindex = cindex + 1

____return 0, crontab_range

def time_match_crontab(crontab_time, time_struct):

____"""

____将時間戳與crontab配置中一行時間參數對比,判斷該時間戳是否在配置設定的時間範圍内

____Args:

________crontab_time____crontab配置中的五個時間(分 時 日 月 周)參數對應時間取值範圍

________time_struct____ 某個整型時間戳,如:1375027200 對應的 分 時 日 月 周

____Return:

________tuple 狀态碼, 狀态描述

____"""

____cindex = 0

____for val in time_struct:

________if val not in crontab_time[cindex]:

____________return 0, False

________cindex = cindex + 1

____return 0, True

def close_to_cron(crontab_time, time_struct):

____"""coron的指定範圍(crontab_time)中 最接近 指定時間 time_struct 的值"""

____close_time = time_struct

____cindex = 0

____for val_struct in time_struct:

________offset_min = val_struct

________val_close = val_struct

________for val_cron in crontab_time[cindex]:

____________offset_tmp = val_struct - val_cron

____________if offset_tmp > 0 and offset_tmp < offset_min:

________________val_close = val_struct

________________offset_min = offset_tmp

________close_time[cindex] = val_close

________cindex = cindex + 1

____return close_time

def cron_time_list(

________cron_time,

________year_num=int(get_str_time(time.time(), "%Y")),

________limit_start=get_str_time(time.time(), "%Y%m%d%H%M"),

________limit_end=get_str_time(time.time() + 86400, "%Y%m%d%H%M")

____):

____#print "\nfrom ", limit_start , ' to ' ,limit_end

____"""

____擷取crontab時間配置參數取值範圍内的所有時間點 的 時間戳

____Args:

________cron_time 符合crontab配置指定的所有時間點

________year_num____指定在哪一年内 擷取

________limit_start 開始時間

____Rturn:

________List________所有時間點組成的清單(年月日時分 組成的時間,如2013年7月29日18時56分:201307291856)

____"""

____#按小時 和 分鐘組裝

____hour_minute = []

____for minute in cron_time[0]:

________minute = str(minute)

________if len(minute) < 2:

____________minute = '0%s' % minute

________for hour in cron_time[1]:

____________hour = str(hour)

____________if len(hour) < 2:

________________hour = '0%s' % hour

____________hour_minute.append('%s%s' % (hour, minute))

____#按天 和 小時組裝

____day_hm = []

____for day in cron_time[2]:

________day = str(day)

________if len(day) < 2:

____________day = '0%s' % day

________for hour_mnt in hour_minute:

____________day_hm.append('%s%s' % (day, hour_mnt))

____#按月 和 天組裝

____month_dhm = []

____#隻有30天的月份

____month_short = ['02', '04', '06', '09', '11']

____for month in cron_time[3]:

________month = str(month)

________if len(month) < 2:

____________month = '0%s' % month

________for day_hm_s in day_hm:

____________if month == '02':

________________if (((not year_num % 4 ) and (year_num % 100)) or (not year_num % 400)):

____________________#閏年2月份有29天

____________________if int(day_hm_s[:2]) > 29:

________________________continue

________________else:

____________________#其它2月份有28天

____________________if int(day_hm_s[:2]) > 28:

________________________continue

____________if month in month_short:

________________if int(day_hm_s[:2]) > 30:

____________________continue

____________month_dhm.append('%s%s' % (month, day_hm_s))

____#按年 和 月組裝

____len_start = len(limit_start)

____len_end = len(limit_end)

____month_dhm_limit = []

____for month_dhm_s in month_dhm:

________time_ymdhm = '%s%s' % (str(year_num), month_dhm_s)

________#開始時間\結束時間以外的排除

________if (int(time_ymdhm[:len_start]) < int(limit_start)) or \

________ (int(time_ymdhm[:len_end]) > int(limit_end)):

____________continue

________month_dhm_limit.append(time_ymdhm)

____if len(cron_time[4]) < 7:

________#按不在每周指定時間的排除

________month_dhm_week = []

________for time_minute in month_dhm_limit:

____________str_time = time.strptime(time_minute, '%Y%m%d%H%M%S')

____________if str_time.tm_wday in cron_time[4]:

________________month_dhm_week.append(time_minute)

________return month_dhm_week

____return month_dhm_limit

#crontab時間參數各種寫法 的 正則比對

PATTEN = {

____#純數字

____'number':'^[0-9]+$',

____#數字清單,如 1,2,3,6

____'num_list':'^[0-9]+([,][0-9]+)+$',

____#星号 *

____'star':'^\*$',

____#星号/數字 組合,如 */3

____'star_num':'^\*\/[0-9]+$',

____#區間 如 8-20

____'range':'^[0-9]+[\-][0-9]+$',

____#區間/步長 組合 如 8-20/3

____'range_div':'^[0-9]+[\-][0-9]+[\/][0-9]+$'

____#區間/步長 清單 組合,如 8-20/3,21,22,34

____#'range_div_list':'^([0-9]+[\-][0-9]+[\/][0-9]+)([,][0-9]+)+$'

____}

#各正則對應的處理方法

PATTEN_HANDLER = {

____'number':handle_num,

____'num_list':handle_nlist,

____'star':handle_star,

____'star_num':handle_starnum,

____'range':handle_range,

____'range_div':handle_rangedv

}

def main():

____"""測試用執行個體"""

____#crontab配置中一行時間參數

____conf_string = '*/10 * * * * (cd /opt/pythonpm/devpapps;' \

________________ ' /usr/local/bin/python2.5 data_test.py>>output_error.txt)'

____#時間戳

____time_stamp = int(time.time())

____#解析crontab時間配置參數 分 時 日 月 周 各個取值範圍

____res, desc = parse_crontab_time(conf_string)

____if res == 0:

________cron_time = desc

____else:

________print desc

________sys, exit(-1)

____print "\nconfig:", conf_string

____print "\nparse result(range for crontab):"

____

____print " minute:", cron_time[0]

____print " hour: ", cron_time[1]

____print " day: ", cron_time[2]

____print " month: ", cron_time[3]

____print " week day:", cron_time[4]

____#解析 時間戳對應的 分 時 日 月 周

____time_struct = get_struct_time(time_stamp)

____print "\nstruct time(minute hour day month week) for %d :" % \

________ time_stamp, time_struct

____#将時間戳與crontab配置中一行時間參數對比,判斷該時間戳是否在配置設定的時間範圍内

____match_res = time_match_crontab(cron_time, time_struct)

____print "\nmatching result:", match_res

____#crontab配置設定範圍中最近接近時指定間戳的一組時間

____most_close = close_to_cron(cron_time, time_struct)

____print "\nin range of crontab time which is most colse to struct ", most_close

____time_list = cron_time_list(cron_time)

____print "\n\n %d times need to tart-up:\n" % len(time_list)

____print time_list[:10], '...'

if __name__ == '__main__':

____#請看 使用執行個體

____

____main()