python:定时任务模块schedule
2019-02-25 16:15:10来源:博客园 阅读 ()
1.安装
pip install schedule
2.文档
https://schedule.readthedocs.io/en/stable/faq.html#how-to-execute-jobs-in-parallel
3.官网使用demo
import schedule import time def job(): print("I'm working...") schedule.every(10).minutes.do(job) schedule.every().hour.do(job) schedule.every().day.at("10:30").do(job) schedule.every(5).to(10).minutes.do(job) schedule.every().monday.do(job) schedule.every().wednesday.at("13:15").do(job) schedule.every().minute.at(":17").do(job) while True: schedule.run_pending() time.sleep(1)
4.我的schedule使用demo
import sys import time import schedule import os import logging if not os.path.exists('/var/log/video_download/'): os.makedirs('/var/log/video_download') log = logging.getLogger() log.setLevel(logging.DEBUG) fmt = logging.Formatter("%(asctime)s %(pathname)s %(filename)s %(funcName)s %(lineno)s %(levelname)s - %(message)s", "%Y-%m-%d %H:%M:%S") stream_handler = logging.FileHandler( '/var/log/video_download/debug-%s.log' % (time.strftime('%Y-%m-%d', time.localtime(time.time())))) stream_handler.setLevel(logging.DEBUG) stream_handler.setFormatter(fmt) log.addHandler(stream_handler) def handler(): print("this is handler") def main(): if len(sys.argv) != 2: print('python video_data.py hour') sys.exit() param = sys.argv[1] if param == 'hour': log.debug("enter main") schedule.every().day.at("00:00").do(handler) schedule.every().hour.do(handler) while True: schedule.run_pending() time.sleep(1) else: print("python video_data.py hour") sys.exit() if __name__ == "__main__": main()
5.拓展:
并行执行任务
(1)默认情况下,schedule按顺序执行所有作业。这背后的原因是很难找到一个让每个人都开心的并行执行模型
import threading import time import schedule def job(): print("I'm running on thread %s" % threading.current_thread()) def run_threaded(job_func): job_thread = threading.Thread(target=job_func) job_thread.start() schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) schedule.every(10).seconds.do(run_threaded, job) while 1: schedule.run_pending() time.sleep(1)
(2)如果需要控制线程数,就需要用queue
import Queue import time import threading import schedule def job(): print("I'm working") def worker_main(): while 1: job_func = jobqueue.get() job_func() jobqueue.task_done() jobqueue = Queue.Queue() schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) schedule.every(10).seconds.do(jobqueue.put, job) worker_thread = threading.Thread(target=worker_main) worker_thread.start() while 1: schedule.run_pending() time.sleep(1)
(3)抛出异常
import functools def catch_exceptions(cancel_on_failure=False): def catch_exceptions_decorator(job_func): @functools.wraps(job_func) def wrapper(*args, **kwargs): try: return job_func(*args, **kwargs) except: import traceback print(traceback.format_exc()) if cancel_on_failure: return schedule.CancelJob return wrapper return catch_exceptions_decorator @catch_exceptions(cancel_on_failure=True) def bad_task(): return 1 / 0 schedule.every(5).minutes.do(bad_task)
(4)只运行一次
def job_that_executes_once(): # Do some work ... return schedule.CancelJob schedule.every().day.at('22:30').do(job_that_executes_once)
(5)一次取消多个任务
def greet(name): print('Hello {}'.format(name)) schedule.every().day.do(greet, 'Andrea').tag('daily-tasks', 'friend') schedule.every().hour.do(greet, 'John').tag('hourly-tasks', 'friend') schedule.every().hour.do(greet, 'Monica').tag('hourly-tasks', 'customer') schedule.every().day.do(greet, 'Derek').tag('daily-tasks', 'guest') schedule.clear('daily-tasks')
(6)在任务中加入日志功能
import functools import time import schedule # This decorator can be applied to def with_logging(func): @functools.wraps(func) def wrapper(*args, **kwargs): print('LOG: Running job "%s"' % func.__name__) result = func(*args, **kwargs) print('LOG: Job "%s" completed' % func.__name__) return result return wrapper @with_logging def job(): print('Hello, World.') schedule.every(3).seconds.do(job) while 1: schedule.run_pending() time.sleep(1)
(7)随机开展工作
def my_job(): # This job will execute every 5 to 10 seconds. print('Foo') schedule.every(5).to(10).seconds.do(my_job)
(8)传参给作业函数
def greet(name): print('Hello', name) schedule.every(2).seconds.do(greet, name='Alice') schedule.every(4).seconds.do(greet, name='Bob')
原文链接:https://www.cnblogs.com/kakawith/p/10432526.html
如有疑问请与原作者联系
标签:
版权申明:本站文章部分自网络,如有侵权,请联系:west999com@outlook.com
特别注意:本站所有转载文章言论不代表本站观点,本站所提供的摄影照片,插画,设计作品,如需使用,请与原作者联系,版权归原作者所有
- python3基础之“术语表(2)” 2019-08-13
- python3 之 字符串编码小结(Unicode、utf-8、gbk、gb2312等 2019-08-13
- Python3安装impala 2019-08-13
- 小白如何入门 Python 爬虫? 2019-08-13
- python_字符串方法 2019-08-13
IDC资讯: 主机资讯 注册资讯 托管资讯 vps资讯 网站建设
网站运营: 建站经验 策划盈利 搜索优化 网站推广 免费资源
网络编程: Asp.Net编程 Asp编程 Php编程 Xml编程 Access Mssql Mysql 其它
服务器技术: Web服务器 Ftp服务器 Mail服务器 Dns服务器 安全防护
软件技巧: 其它软件 Word Excel Powerpoint Ghost Vista QQ空间 QQ FlashGet 迅雷
网页制作: FrontPages Dreamweaver Javascript css photoshop fireworks Flash