双色球组合推荐脚本

本贴最后更新于 1378 天前,其中的信息可能已经沧海桑田

双色球脚本作用

我观察了几期,发现蓝区号码经常是“15”、“05”。所以就想着能不能统计号码抽中次数,然后作为一组号码去购买双色球,这样总比随机来的强一点。

本脚本用 python 语言实现

项目组织

数据库

创建一个叫“double_color_ball”的数据库

再创建三张表

# 创建 open_number_count, 用于存放号码被抽中的次数 CREATE TABLE `open_number_count` ( `id` int(11) NOT NULL AUTO_INCREMENT, `open_number` varchar(2) NOT NULL COMMENT '开奖号码', `red_ball_times` int(11) DEFAULT NULL COMMENT '号码出现在蓝球区的次数', `blue_ball_times` int(11) NOT NULL COMMENT '号码出现在蓝球区的次数', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=199 DEFAULT CHARSET=utf8 # 创建 recommend_records, 用于存放推荐的组合 CREATE TABLE `recommend_records` ( `id` int(11) NOT NULL AUTO_INCREMENT, `open_date` varchar(20) DEFAULT NULL COMMENT '开奖日期', `open_nums` varchar(20) NOT NULL COMMENT '开奖期号', `recommend_nums` varchar(200) NOT NULL COMMENT '推荐组合,格式[(1,2,3,4,5,6),7],[(2,3,4,5,6,7),8]', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8 # 创建 open_records,用于存放往期开奖结果 CREATE TABLE `open_records` ( `id` int(11) NOT NULL AUTO_INCREMENT, `open_date` varchar(20) NOT NULL COMMENT '开奖日期', `open_nums` varchar(20) NOT NULL COMMENT '开奖期号', `red_numbers` varchar(100) NOT NULL COMMENT '开奖的红区数字,通过空格分隔', `blue_number` varchar(2) NOT NULL COMMENT '开奖的蓝区数字', PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=34 DEFAULT CHARSET=utf8

脚本结构

image.png

base: base_function: 类中包含初始化数据库、查找开奖号码、计算推荐组合号码等函数 log_function: 日志文件类 mysql_functions:类中包含 mysql 数据库增删改查函数 requests_functions:类中包含获取开奖号码、调用 “server酱“ 通知等函数 config: base_config:目前只用于存放 server 酱的 token database_config:用于存放连接 mysql 数据库的基础信息 main.py:程序主入口 main.shell:执行shell,到时候配合 crontab 一起使用,定时推荐组合号码 pipfile:用于管理所用到的库

流程

初始化数据库

首先需要知道过往的开奖号码,找到一个 api,输入开奖期号就会返回指定期号到目前最新期号的开奖信息

api 连接

需要再默认拼接上期号,例如“21041”。

获取到的开奖号码存放到 “open_records”中,同时每找到一个红蓝区号码,都需要到 “open_number_count” 表中更新抽中次数。

在每次推荐组合号码之前,都会更新最新的开奖信息到数据库再推荐。

推荐组合号码

目前推荐的组合有两种:

  1. 根据过往开奖,统计红区最常抽中的 6 个号码,蓝区 1 个,作为一组推荐。
  2. 随机生成 6 个红区号码,1 个蓝区号码。

后续可能会加入因子影响随机生成的组合,形成第三种推荐,但是没有很好的头绪,欢迎大家留言交流意见。

开始编码

logs_functions.py

class LogFunctions(object): """ 初始化 log 文件 """ def __init__(self, logger_name): logger = logging.getLogger(logger_name) fmter = logging.Formatter( "%(asctime)s %(filename)s %(funcName)s [line:%(lineno)d]%(levelname)s %(message)s") log_file_name = os.path.join("./logs", "{}.txt".format(logger_name)) hdlr = logging.FileHandler(log_file_name, mode='a') hdlr.setLevel(logging.INFO) hdlr.setFormatter(fmter) logger.addHandler(hdlr) logger.setLevel(logging.INFO) @classmethod def get_logger(cls, logger_name): return logging.getLogger(logger_name)

日志文件目前是存放在项目下的“logs”文件夹中,如果有需要可以自行修改“log_file_name”变量

具体使用方法:

# 生成对象的同时也会自动创建文件 LogFunctions("mysql_functions_logs") # 获取 mysql_functions_logs 文件的对象 self.logger = LogFunctions.get_logger("mysql_functions_logs")

mysql_functions.py

这个类实现了 mysql 的增删改查功能.

需要注意的点:

  1. select 函数中,因为实现了可以自定义返回多少行数据,所以实际的结果中有两个 tuple 嵌套在一起的,大概是这样的:((0,)),实际 0 才是我们需要的数据,在使用时需要注意这一点。
  2. 关于 “mysql_init_database_object”中的 SSH 登陆,需要注意由于端口转发,数据库实际对外端口不再是 3306 了,需要连接上服务器后使用转发的端口
    port=server.local_bind_port
#!/usr/bin/env python3 # -*- coding:utf-8 -*- from sshtunnel import SSHTunnelForwarder from pymysql import connect, DatabaseError, cursors, ProgrammingError, Error, InterfaceError from base.log_functions import LogFunctions import sys class MysqlFunctions(object): def __init__(self, login_way, database_config): """ :param login_way: 数据库登录方式,目前实现密码登录(password),ssh登录(ssh) :param database_config: 数据库登录的配置 """ LogFunctions("mysql_functions_logs") self.logger = LogFunctions.get_logger("mysql_functions_logs") self._db = None self._cursor = None self.mysql_init_database_object(login_way, database_config) def mysql_init_database_object(self, login_way, database_config): """ 初始化数据库的链接,返回数据库对象 :param login_way: 数据库登录方式,目前实现密码登录(password),ssh登录(ssh) :param database_config: 数据库登录的配置 :return: 返回数据库对象 """ if login_way.lower() == "password": self._db = connect(host=database_config.get("db_host"), user=database_config.get("db_user"), password=database_config.get("db_passwd"), database=database_config.get("db_name"), port=database_config.get("db_port")) self._cursor = self._db.cursor() elif login_way.upper() == "SSH": server = SSHTunnelForwarder( # (database_config.get("ssh_host"), database_config.get("ssh_port")), ssh_address_or_host=database_config.get("ssh_host"), ssh_port=database_config.get("ssh_port"), ssh_password=database_config.get("ssh_password"), ssh_pkey=database_config.get("keyfile"), ssh_username=database_config.get("ssh_user"), remote_bind_address=(database_config.get("db_host"), database_config.get("db_port")) ) server.start() self._db = connect( host='127.0.0.1', port=server.local_bind_port, user=database_config.get("db_user"), passwd=database_config.get("db_passwd"), database=database_config.get("db_name"), charset="utf8", cursorclass=cursors.DictCursor) self._cursor = self._db.cursor() else: self.logger.error("The login method is not implemented, the program will be exited") sys.exit() def mysql_get_current_database_name(self): """ 获取当前指定的数据库名称 :return: 数据库名称 """ return self.mysql_execute_sql("select database()") def mysql_close_cursor_connection(self): """ 关闭 cursor 的连接 :return: 无 """ self._cursor.close() self.logger.info("mysql cursor close connection") def mysql_close_mysql_connection(self): """ 关闭数据库连接 注意:当 socket 关闭后,执行什么方法都是引起 error,慎用! :return: 无 """ try: self._db.close() self.logger.info("close mysql socket connection") except Error as e: self.logger.error("mysql connection already closed:{}".format(e)) sys.exit() def mysql_set_database(self, database_name): """ 重新指定数据库 :param database_name: 数据库名称 :return: 无 """ try: self._cursor.execute("use {}".format(database_name)) self._db.commit() self.logger.info("set database:{}".format(database_name)) except ProgrammingError as e: self.logger.error("set database fail, database_name error:{}".format(e)) sys.exit() def mysql_execute_sql(self, sql): """ 执行非增删查改sal语句 :param sql: SQL语句 :return: 返回执行结果 """ self.logger.info("execute other sql:{}".format(sql)) try: self._cursor.execute(sql) data = self._cursor.fetchall() self._db.commit() self.logger.info("execute other sql success!") self.logger.info("execute result:{}".format(data)) return data except DatabaseError as e: self._db.rollback() self.logger.error("execute other sql error:" + str(e)) except InterfaceError as e: self.logger.error("mysql socket maybe close, error information:{}".format(e)) def mysql_insert_data(self, table_name, values, fields=None): """ 插入数据 :param table_name: 表名,不能为空 :param values: 值tuple,不允许为空 :param fields: 字段tuple,可以为空,默认插入全部数据 :return: 无返回 """ # 判断 values 是否为空 if not values: self.logger.error("insert_data(), values is empty") return if fields: insert_sql = "insert into {}{}".format(table_name, str(fields)).replace("'", "") insert_sql = insert_sql + "values{}".format(str(values)) else: insert_sql = "insert into {} values{};".format(table_name, str(values)) self.logger.info("insert into sql:" + insert_sql) try: self._cursor.execute(insert_sql) self._db.commit() self.logger.info("insert into success!") except DatabaseError as e: self._db.rollback() self.logger.error("insert into error:" + str(e)) def mysql_update_data(self, table_name, field, value, where_condition=None): """ 更新单条数据 :param table_name: 表名,不能为空 :param field: 更新字段,需要传入string :param value: 更新的值,需要传入string :param where_condition: 匹配条件,目前仅支持 where 匹配 :return: 无返回 """ if where_condition is None: update_sql = "update {} set {} = {};".format(table_name, field, value) else: update_sql = "update {} set {} = {} where {};".format(table_name, field, value, where_condition) self.logger.info("update sql: {}".format(update_sql)) try: self._cursor.execute(update_sql) self._db.commit() self.logger.info("update sql success!") except DatabaseError as e: self._db.rollback() self.logger.error("update error:" + str(e)) def mysql_select_data(self, table_name, return_num=1, want_get_info="*", where_condition=None, having_condition=None, order_condition=None, limit_condition=None): """ 查询数据,返回数据信息 :param table_name: 表名,不允许为空,需要传入一个string :param return_num: 返回数据多少行,默认一条 :param want_get_info: 想获取的信息,默认为全部"*",需要传入一个tuple :param where_condition: where 匹配条件,默认为空,需要传入一个string :param having_condition: having 匹配条件,默认为空,需要传入一个string :param order_condition: order 匹配条件,默认为空,需要传入一个string :param limit_condition: limit 匹配条件,默认为空,需要传入一个string :return: 返回结果 """ select_sql = "select {} from {}".format(str(want_get_info), table_name) if where_condition: select_sql = select_sql + " where {}".format(str(where_condition)) if having_condition: select_sql = select_sql + " having {}".format(str(having_condition)) if order_condition: select_sql = select_sql + " order by {}".format(str(order_condition)) if limit_condition: select_sql = select_sql + " limit {}".format(str(limit_condition)) select_sql = select_sql + ";" try: # 执行查询 self._cursor.execute(select_sql) # 返回指定数据 return self._cursor.fetchmany(int(return_num)) except DatabaseError as e: self._db.rollback() self.logger.error("select error: {}".format(str(e)))

requests_functions.py

由于当初敲代码没有考虑的很周全,所以把查询开奖信息的功能写到“base_functions”类中,应该放到 request_functions 类中的。

这个类实现了调用 server 酱的服务,可以给「企业微信、微信、钉钉」等发消息,这里贴个连接,有兴趣可以了解一下,很简单的。server 酱

#!/usr/bin/env python3 # -*- coding:utf-8 -*- import requests from config.base_config import STF_PUSH_TOKEN class RequestsFunctions(object): @classmethod def messages_push(cls, message_title, message_text): """ 使用 server 酱发送推荐组合通知,加上定时任务就不用手动运行了。 默认从 base_config 获取 push_token 有多少个就发多少次 :param message_title: 消息的通知 :param message_text: 消息的内容 :return: 无 """ for x in STF_PUSH_TOKEN: url = "https://sctapi.ftqq.com/{}.send".format(x) push_data = { "title": message_title, "desp": message_text } requests.post(url=url, data=push_data)

base_functions.py

这里有几点需要注意的:

  1. 在抓取网页的数据时,推荐使用“bs4.BeautifulSoup”,非常简单易用。由于返回的数据是字符串,所以为了方便直接使用了切片的方式分隔数据。
  2. 函数“init_database”由于是在很不情愿的状态下敲的(误操作把数据库重装了。。。大家一定要养成定时备份的习惯!),所以可能有坑,大家注意一下
  3. 如果在开奖当天执行,有可能网页还没来得及更新数据(开奖号码有了,但是奖金信息没有不全,导致切片失败),所以建议第二天再执行。
#!/usr/bin/env python3 # -*- coding:utf-8 -*- from bs4 import BeautifulSoup from base.mysql_functions import MysqlFunctions from config.database_config import DOCKER_MYSQL from base.requests_functions import RequestsFunctions import requests import re import datetime import random class BASEFUNCTIONS(object): """ 实现基础功能 """ def __init__(self): self.mysql = MysqlFunctions("password", DOCKER_MYSQL) def update_open_records(self): """ 更新开奖记录,需要实现自动补全功能 :return: """ # 先从数据库查出最新的开奖期号 db_open_nums = self.mysql.mysql_select_data("open_records", want_get_info="open_nums", order_condition="id desc") # 把编号加1传给insert_data self.insert_data(int(db_open_nums[0][0]) + 1) def insert_data(self, db_open_nums): """ 存数据到数据库中 :return: """ open_nums = "" open_date = "" red_nums = "" blue_nums = "" url = "https://datachart.500.com/ssq/history/newinc/history.php?start=" + str(db_open_nums) req = requests.get(url=url) # 转码一下 req.encoding = "utf-8" # 找到所有tr标签的数据 find_tr = BeautifulSoup(req.text, features="html.parser").find_all("tr", class_="t_tr1") results = [] for x in range(len(find_tr)): results.append(re.findall(r"\d+", find_tr[x].text)) for y in range(1, len(find_tr) + 1): result = results[-y] try: open_nums = result[0][:5] open_date = result[8][-4:] + "-" + result[9] + "-" + result[10] except IndexError as e: print("有可能网站页面的奖金没有更新,导致切片失败了,等明天再执行吧") exit() temp = 5 for z in range(6): # 找到每个号码 find_number = result[0][temp:temp + 2] red_nums = red_nums + find_number + " " # 更新这个号码出现的次数 # red_ball_show_times格式:(0,) red_ball_show_times = self.mysql.mysql_select_data(table_name="open_number_count", where_condition="open_number = {}".format( find_number), want_get_info="red_ball_times")[0][0] self.mysql.mysql_update_data(table_name="open_number_count", value=red_ball_show_times + 1, where_condition="open_number = {}".format(find_number), field="red_ball_times") temp += 2 blue_nums = result[0][-2:] blue_ball_show_times = self.mysql.mysql_select_data(table_name="open_number_count", where_condition="open_number = {}".format( blue_nums), want_get_info="blue_ball_times")[0][0] self.mysql.mysql_update_data(table_name="open_number_count", value=blue_ball_show_times + 1, where_condition="open_number = {}".format(blue_nums), field="blue_ball_times") # 开始插入数据 records_result = self.mysql.mysql_select_data(table_name="open_records", want_get_info="open_date", where_condition='open_date = "{}"'.format(open_date)) if len(records_result) == 0: self.mysql.mysql_insert_data(table_name="open_records", values=(open_date, open_nums, red_nums, blue_nums), fields=("open_date", "open_nums", "red_numbers", "blue_number")) # 重制下变量 red_nums = "" blue_nums = "" def recommend_numbers(self, recommend_count): """ 推荐6个红球和1个蓝球号码。 第一组:根据记录,找到最常出现的6个红球,1个蓝球 第二~组:完全是随机的。 :param recommend_count: 需要推荐组的数量 :return: 返回一个list,每一组推荐都是一个tuple """ # 从 open_number_count 表通过排序方式找到6个红球号码和一个蓝球号码 recommend_numbers = [] red_balls = [] usual_red_balls = self.mysql.mysql_select_data(table_name="open_number_count", want_get_info="open_number", order_condition="red_ball_times desc", return_num=6) usual_blue_balls = self.mysql.mysql_select_data(table_name="open_number_count", want_get_info="open_number", order_condition="blue_ball_times desc", return_num=1) for x in usual_red_balls: red_balls.append(int(x[0])) # 进行排序 red_balls_sort = self.numbers_sort(red_balls) # 把多次出现的推荐组合添加到数组中 recommend_numbers.append((red_balls_sort, int(usual_blue_balls[0][0]))) # 把随机推荐的组合添加到数组中 recommend_numbers.append(self.get_random_recommend_numbers(recommend_count)) # 把数据插入到数据库 new_open_info = self.calculation_open_date_and_nums() # 为了避免多次执行导致插入多条,这里做个判断,如果数据已存在"open_nums",那么就执行更新,不插入 is_exists = self.mysql.mysql_select_data(table_name="recommend_records", want_get_info="open_nums", where_condition="open_nums = {}".format(new_open_info[1])) if len(is_exists) > 0: self.mysql.mysql_update_data(table_name="recommend_records", field="recommend_nums", value=str(recommend_numbers), where_condition="open_nums = {}".format(new_open_info[1])) else: self.mysql.mysql_insert_data(table_name="recommend_records", fields=("open_date", "open_nums", "recommend_nums"), values=(*new_open_info, str(recommend_numbers))) # 发送一下推荐组合 RequestsFunctions.messages_push(message_title="期号:'{}' 推荐组合".format(new_open_info[1]), message_text=str(recommend_numbers)) return recommend_numbers def get_random_recommend_numbers(self, recommend_count): """ 获取随机推荐的号码 :param recommend_count: 需要推荐组的数量 :return: 返回N组推荐号码,是一个list """ # 红区是6位「1~33」,蓝区是1位「1~16」 random_recommend_list = [] for x in range(int(recommend_count)): red_balls = [] while True: if len(red_balls) == 6: break red_number = random.randint(1, 33) if red_number in red_balls: continue else: red_balls.append(red_number) # 使用冒泡排序把数据整理一下,从大到小排列 self.numbers_sort(red_balls) blue_ball = random.randint(1, 16) random_recommend_list.append((red_balls, blue_ball)) return random_recommend_list def numbers_sort(self, nums: list): """ 给数据排序,使用冒泡排序算法 :param nums: 数据,需要传入一个list :return: 返回排序好的list,从大到小 """ for x in range(len(nums)): for y in range(0, len(nums) - 1): if nums[y] > nums[y + 1]: nums[y], nums[y + 1] = nums[y + 1], nums[y] return nums def calculation_open_date_and_nums(self): """ 计算开奖日期和期号 :return: 返回一个list,0:开奖日期,1:是开奖期号 """ # 由于会先更新数据,再推荐,所以直接取最新的开奖期号和开奖日期 db_last_open_info = \ self.mysql.mysql_select_data(table_name="open_records", want_get_info="open_date, open_nums", order_condition="id desc", return_num=1)[0] # 开奖期号+1即可,开奖日期在星期二、四、日,所以需要获取当前时间,然后再确定+2还是+3 new_nums = str(int(db_last_open_info[1]) + 1) # 获取当前日期 current_weekday = datetime.datetime.now().weekday() + 1 db_last_open_date = datetime.datetime.strptime(db_last_open_info[0], "%Y-%m-%d").date() new_open_date = "" if current_weekday in (1, 2): new_open_date = db_last_open_date + datetime.timedelta(days=+2) elif current_weekday in (3, 4): new_open_date = db_last_open_date + datetime.timedelta(days=+2) elif current_weekday in (5, 6, 7): new_open_date = db_last_open_date + datetime.timedelta(days=+3) return [str(new_open_date), new_nums] def init_database(self, open_num): """ 初始化数据库,插入open_records,open_number_count :param open_num: 开奖的期号 :return: """ # 先给 open_number_count 初始化数据 for num in range(1, 34): self.mysql.mysql_insert_data(table_name="open_number_count", fields=("open_number", "red_ball_times", "blue_ball_times"), values=(str(num), 0, 0)) open_nums = "" open_date = "" red_nums = "" blue_nums = "" url = "https://datachart.500.com/ssq/history/newinc/history.php?start=" + str(open_num) req = requests.get(url=url) # 转码一下 req.encoding = "utf-8" # 找到所有tr标签的数据 find_tr = BeautifulSoup(req.text, features="html.parser").find_all("tr", class_="t_tr1") results = [] for x in range(len(find_tr)): results.append(re.findall(r"\d+", find_tr[x].text)) for y in range(1, len(find_tr) + 1): result = results[-y] try: open_nums = result[0][:5] open_date = result[8][-4:] + "-" + result[9] + "-" + result[10] except IndexError as e: print("有可能网站页面的奖金没有更新,导致切片失败了,等明天再执行吧") exit() temp = 5 for z in range(6): # 找到每个号码 find_number = result[0][temp:temp + 2] red_nums = red_nums + find_number + " " # 更新这个号码出现的次数 # red_ball_show_times格式:(0,) red_ball_show_times = self.mysql.mysql_select_data(table_name="open_number_count", where_condition="open_number = {}".format( find_number), want_get_info="red_ball_times")[0][0] self.mysql.mysql_update_data(table_name="open_number_count", value=red_ball_show_times + 1, where_condition="open_number = {}".format(find_number), field="red_ball_times") temp += 2 blue_nums = result[0][-2:] blue_ball_show_times = self.mysql.mysql_select_data(table_name="open_number_count", where_condition="open_number = {}".format( blue_nums), want_get_info="blue_ball_times")[0][0] self.mysql.mysql_update_data(table_name="open_number_count", value=blue_ball_show_times + 1, where_condition="open_number = {}".format(blue_nums), field="blue_ball_times") # 开始插入数据 records_result = self.mysql.mysql_select_data(table_name="open_records", want_get_info="open_date", where_condition='open_date = "{}"'.format(open_date)) if len(records_result) == 0: self.mysql.mysql_insert_data(table_name="open_records", values=(open_date, open_nums, red_nums, blue_nums), fields=("open_date", "open_nums", "red_numbers", "blue_number")) else: self.mysql.mysql_update_data(table_name="open_records", value=str(blue_nums), field="blue_number", where_condition='open_date = "{}"'.format(open_date)) # 重制下变量 red_nums = "" blue_nums = ""

main.py

不解释了

#!/usr/bin/env python3 # -*- coding:utf-8 -*- from base.base_functions import BASEFUNCTIONS import os if __name__ == '__main__': # 删掉logs文件 logs_path = "./logs/mysql_functions_logs.tx if os.path.exists(logs_path): os.remove(logs_path) bf = BASEFUNCTIONS() bf.update_open_records() print(bf.recommend_numbers(2))

main.shell

这里说两句:

  1. pipenv run 执行时会自动进入虚拟环境,执行后自动退出虚拟环境
  2. 如果把代码上传到代码托管平台,gitlab、gtiee、github 等,切记一定要使用 igitignore 不然会后悔的!!!
  3. pipenv install 类似 "requirements.txt" 但是比他好用多了,推荐使用 pipenv 管理虚拟环境,安装库,管理 python 版本,功能强大,你值得拥有!
#!/bin/bash cd /root/kidcao/double_color_ball # 更新脚本 git checkout master git pull origin master # 进入虚拟环境且安装库 pipenv run pipenv install # 进入虚拟环境且执行脚本 pipenv run python3.9 main.py

添加定时任务

centos 的定时任务需要安装 crontab,所以先安装他

yum install crontabs

crontab 常用参数:

  • -l:查看当前定时任务
  • -e:编辑定时任务

使用 crontab -e,然后在最末位添加:0 18 * * 1,3,5 bash /具体路径/double_color_ball/main.shell,即可

  • Python

    Python 是一种面向对象、直译式电脑编程语言,具有近二十年的发展历史,成熟且稳定。它包含了一组完善而且容易理解的标准库,能够轻松完成很多常见的任务。它的语法简捷和清晰,尽量使用无异义的英语单词,与其它大多数程序设计语言使用大括号不一样,它使用缩进来定义语句块。

    556 引用 • 675 回帖
  • requests
    2 引用 • 3 回帖
  • 双色球
    1 引用 • 1 回帖
  • 脚本
    21 引用 • 164 回帖 • 1 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...
  • kidcao via macOS
    作者

    自顶 doge