来具体点
需求:
执行一个程序, 程序一直是运行状态, 这里假设是一个函数
当程序运行 30s 的时候, 需要终止程序, 可以用 python, c, c++ 语言实现
扩展需求:
当 1 分钟以后, 需要重新启动程序
- def process_2():
- # 时间几点执行
- # 执行 process_1
- # 开始监听 -- 计时
- # 当时间超过多少s以后, 强制结束
- #(注意进程里面的任务执行进程是否还存在, 举个例子, 进程的任务是在创建一个进程, 那强制结束的是任务的进程, 而你创建的进程呢?)
- pass
- #!/usr/bin/env python
- # -*- coding=utf-8 -*-
- import sys, time, multiprocessing, datetime, os
- # -----< 自定义 Error>----- #
- class MyCustomError(Exception):
- def __init__(self, msg=None, retcode=4999):
- self.retcode = int(retcode)
- try:
- if not msg:
- msg = "Setting time is less than the current time Error."
- except:
- msg = "Unknown Error!!!"
- Exception.__init__(self, self.retcode, msg)
- # -----< 格式化时间变成时间戳 >----- #
- class FormatDatetime():
- '''
- Use method:
- FormatDatetime.become_timestamp("2018-08-21 13:19:00")
- '''
- @staticmethod
- def become_timestamp(dtdt):
- # 将时间类型转换成时间戳
- if isinstance(dtdt, datetime.datetime):
- timestamp = time.mktime(dtdt.timetuple())
- return timestamp
- elif isinstance(dtdt, str):
- if dtdt.split(" ")[1:]:
- a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d %H:%M:%S")
- timestamp = time.mktime(a_datetime.timetuple())
- else:
- a_datetime = datetime.datetime.strptime(dtdt, "%Y-%m-%d")
- timestamp = time.mktime(a_datetime.timetuple())
- return timestamp
- elif isinstance(dtdt, float):
- return dtdt
- # -----< 计时器 >----- #
- def timer(arg):
- '''
- use method:
- timer(14)
- :param arg: 倒计时时间
- :return: True
- '''
- # 倒计时时间
- back_time = arg
- while back_time != 0:
- sys.stdout.write('\r') ## 意思是打印在清空
- back_time -= 1
- sys.stdout.write("%s" % (int(back_time)))
- sys.stdout.flush()
- time.sleep(1)
- return True
- # -----< 自定义任务函数 >----- #
- class Tasks:
- @staticmethod
- def start():
- ip1 = "42.93.48.16"
- ip2 = "192.168.2.141"
- adjure = """hping3 -c 100000 -d 120 -S -w 64 -p 57511 -a 10.10.10.10 --flood --rand-source {}""".format(ip2)
- os.system(adjure)
- @staticmethod
- def stop():
- adjure = """netstat -anpo | grep hping3 | awk -F"[ /]+"'{print $7}' | xargs -i -t kill -9 {}"""
- os.system(adjure)
- # -----< 任务函数 >----- #
- def slow_worker():
- '''
- 你的自定义任务, 需要执行的代码
- :return:
- ''' print('Starting worker')
- time.sleep(0.1)
- print('Finished worker')
- Tasks.start()
- # -----< 任务计划执行时间 >----- #
- def crontab_time(custom_time):
- # custom_time = "2018-08-21 13:44:00"
- date_time = FormatDatetime.become_timestamp(custom_time)
- now_time = time.time()
- # 余数, 有小数
- remainder_data = int(date_time - now_time)
- if remainder_data <0:
- raise MyCustomError
- else:
- while remainder_data> 0:
- remainder_data -= 1
- time.sleep(1)
- return True
- # -----< 执行函数 >----- #
- def my_crontab(custom_time='', frequency=1, countdown=0):
- '''
- 几点几分执行任务
- 此函数属于业务逻辑, 可以考虑再次封装
- custom_time = "2018-08-21 13:19:00" 时间格式必须是这样
- frequency = 1 # 频次
- countdown = 0 # 倒计时多少 s
- :return:
- '''
- if custom_time:
- crontab_time_status = crontab_time(custom_time)
- if crontab_time_status:
- for i in range(frequency):
- p = multiprocessing.Process(target=slow_worker)
- p.start()
- if countdown:
- status = timer(countdown)
- if status:
- p.terminate()
- Tasks.stop()
- else:
- for i in range(frequency):
- p = multiprocessing.Process(target=slow_worker)
- p.start()
- if countdown:
- status = timer(countdown)
- if status:
- p.terminate()
- Tasks.stop()
- if __name__ == '__main__':
- my_crontab(frequency=1, countdown=50)
- View Code
来源: http://www.bubuko.com/infodetail-2736404.html