简易版配置文件管理实现

本贴最后更新于 1159 天前,其中的信息可能已经物是人非

导读

配置、证书、密钥文件这类经常会涉及到变更,如果每次都到登录到目标服务器进行操作,要是量特别大,着实不美丽,那这里小易给大家伙带来一个简易的实现方案,供各位参考。

需求分析

配置、密钥文件往往存储了比较机密的信息,安全性要保证它不被轻易的泄漏,第一首要条件必须加认证,标题都说了简易版,那直接上个基本认证够用了,在加上传输层的安全性,那就基本认证配合 https 足够。那么咱们再抛一个问题,用一个账号进行访问所有的文件?显然安全风险还是太高,毕竟,为了便于记忆,咱们有几个是把文件名设置成随机码的对吧,所以还得细划下访问控制,暂定一个账号访问一小批指定的文件吧,咱可以这样定义,某一台机器的所有相关变更的文件给它一个账号统一访问。具体到某台机器上咱就设置个定时任务,拿着这个账号下载对应的文件与原文件做比对,有变更就替换之。

概要设计

咱再来捋一捋,涉及文件变更,得有文件管理吧,涉及多账号,得有账号管理吧,具体某个账号还不能访问它不该访问,那给它设置个访问清单得有吧,所以,思路明晰了。

文件管理

  • 文件上传,通过 Web 页面,或者直接 API 调用,上传限制大小,后缀名白名单,防止上传至任意目录破坏系统文件
  • 文件下载,提供一个接口供消费客户端获取配置文件
  • 文件删除,暂不考虑

账号管理

  • 用户新建,用户已存在的,忽略操作
  • 认证凭证保护,密码不能明文存放,这里仅用于账号认证,采用单向 hash
  • 用户删除,暂不考虑

访问控制

  • 设置两个角色,一个管理员,若干个普通用户,管理员可可进行文件上传、账号管理及设置访问清单,普通用户仅能访问该访问的文件
  • 提供一个接口,供管理员设置普通用户的文件访问清单

代码实现

服务端

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # @Project : tools # @File : conf_server.py # @Software: PyCharm # @Author : 易雾君 # @Email : evling2020@gmail.com # @公众号 : 易雾山庄 # @Site : https://www.evling.tech # @Describe : 家庭基建,生活乐享. # @Time : 2022/1/17 9:38 PM import json import os from flask import Flask, flash, request, redirect, url_for, send_from_directory, jsonify from flask_httpauth import HTTPBasicAuth from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash UPLOAD_FOLDER = './uploads' DATA_JSON = './data.json' ALLOWED_EXTENSIONS = {'txt', 'conf', 'sh', 'crt', 'key', 'json'} app = Flask(__name__) app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['MAX_CONTENT_LENGTH'] = 16 * 1000 * 1000 # 限制文件大小为 16M app.add_url_rule( "/uploads/<name>", endpoint="download_file", build_only=True ) auth = HTTPBasicAuth() users = { 'admin': { 'password': generate_password_hash('admin'), 'privileged': True, 'files': [] }, } def save_data(): with open(DATA_JSON, 'w', encoding='utf-8') as f: json.dump(users, f, ensure_ascii=False, indent=4) f.close() if not os.path.exists(UPLOAD_FOLDER): os.mkdir(UPLOAD_FOLDER) if os.path.exists(DATA_JSON): with open(DATA_JSON, 'r') as f: users = json.load(f) f.close() else: save_data() def is_admin(username): return users.get(username).get('privileged') def can_access(username, filename): return True if filename in users.get(username).get('files') else False def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS @auth.verify_password def verify_password(username, password): if username in users and check_password_hash(users.get(username).get('password'), password): return username @app.route('/users/add', methods=['POST']) @auth.login_required def add_user(): if not is_admin(auth.current_user()): return jsonify({'msg': 'You can not access this page!'}) if request.method == 'POST': username = request.form['username'] password = request.form['password'] if username not in users: users[username] = {'password': generate_password_hash(password), 'privileged': False, 'files': []} save_data() return jsonify({'msg': 'Add user succ'}) else: return jsonify({'msg': 'User: {} may be exist'.format(username)}) @app.route('/users/passwd', methods=['POST']) @auth.login_required def passwd(): if not is_admin(auth.current_user()): return jsonify({'msg': 'You can not access this page!'}) if request.method == 'POST': username = request.form['username'] password = request.form['password'] if username in users: users[username]['password'] = generate_password_hash(password) save_data() return jsonify({'msg': 'Change {}\'s password succ'.format(username)}) else: return jsonify({'msg': 'User: {} not exist!'.format(username)}) @app.route('/users/authorize', methods=['POST']) @auth.login_required def authorize(): if not is_admin(auth.current_user()): return jsonify({'msg': 'You can not access this page!'}) if request.method == 'POST': data = request.get_json() username = data['username'] files = data['files'] if username in users: succ_list = [] fail_list = [] for file in files: if not os.path.exists(os.path.join(UPLOAD_FOLDER, file)): fail_list.append(file) else: succ_list.append(file) if succ_list != []: for file in succ_list: if file not in users[username]['files']: users[username]['files'].append(file) save_data() return jsonify({'msg': 'authorize {} succ'.format(username), 'succ_list': succ_list, 'fail_list': fail_list}) else: return jsonify({'msg': 'No file authorized to {}'.format(username), 'succ_list': succ_list, 'fail_list': fail_list}) else: return jsonify({'msg': 'User: {} not exist!'.format(username)}) @app.route('/uploads/<path:filename>', methods=['GET']) @auth.login_required def download_file(filename): if can_access(auth.current_user(), filename): return send_from_directory(app.config["UPLOAD_FOLDER"], filename) else: return jsonify({'msg': 'You can not do this operate!'}) @app.route('/', methods=['GET', 'POST']) @auth.login_required def upload_file(): if not is_admin(auth.current_user()): return jsonify({'msg': 'You can not access this page!'}) if request.method == 'POST': # check if the post request has the file part if 'file' not in request.files: flash('No file part') return redirect(request.url) file = request.files['file'] # If the user does not select a file, the browser submits an # empty file without a filename. if file.filename == '': flash('No selected file') return redirect(request.url) if file and allowed_file(file.filename): filename = secure_filename(file.filename) file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename)) return redirect(url_for('download_file', name=filename)) return ''' <!doctype html> <title>Upload new File</title> <h1>Upload new File</h1> <form method=post enctype=multipart/form-data> <input type=file name=file> <input type=submit value=Upload> </form> ''' if __name__ == '__main__': app.run(port=5001)

API 梳理

  • /users/add
# 新建用户,仅管理员有权限访问 auth=('admin', 'admin') # 默认密码为admin res = requests.post(url=base_url + "users/add", data={'username': 'your_username', 'password': 'your_password'}, auth=auth) print(res.status_code) print(res.json())
  • /users/passwd
# 修改用户密码,仅管理员有权限访问 auth=('admin', 'admin') # 默认密码为admin res=requests.post(url=base_url+"users/passwd",data=post_data,auth=auth) print(res.status_code) print(res.json())
  • /users/authorize
# 授权文件访问,仅管理员有权限访问 auth=('admin', 'admin') # 默认密码为admin res = requests.post(url=base_url + "users/authorize", json={'username': 'your_username', 'files': 'your_files_list'}, auth=auth) print(res.status_code) print(res.json())
  • /
# 文件上传,仅管理员有权限访问 auth=('admin', 'admin') # 默认密码为admin res = requests.post(base_url, files={'file': open('example.txt', 'rb')} ,auth=auth) print(res.content)
  • /uploads/
# 文件下载,仅普通用户有权限访问 auth=('admin', 'admin') # 默认密码为admin res = requests.get(base_url + "uploads/example.txt", auth=auth) with open('example_local.txt', 'w') as f: f.write(res.content) f.close()

运维端维护示例脚本

假定服务端部署的地址为 https://conf.evling.tech,在客户端脚本所在目录创建一个目录 ./local_data,指定好相应的文件名,与脚本中的账号文件映射信息要匹配,如下图所示,主要是第一比较麻烦,后边更新就直接在本地当前目录结构进行变更即可,因为上传接口是通过覆盖的方式替换服务端文件的。
ScreenShot20220119at10.44.44PM.png

#!/usr/bin/env python3 # -*- coding: utf-8 -*- # @Project : tools # @File : conf_mgr.py # @Software: PyCharm # @Author : 易雾君 # @Email : evling2020@gmail.com # @公众号 : 易雾山庄 # @Site : https://www.evling.tech # @Describe : 家庭基建,生活乐享. # @Time : 2022/1/18 8:29 PM import os.path import requests # 初始化数据,一组账号密码授权访问文件列表里的文件 data = [ {'username': 'www', 'password': '37KC8N4', 'files': ['www.evling.tech.pem', 'www.evling.tech.key']}, {'username': 'pve-lab', 'password': '6JASVTM', 'files': ['pve-lab.evling.tech.pem', 'pve-lab.evling.tech.key']}, {'username': 'conf', 'password': 'etePT1A', 'files': ['conf.evling.tech.pem', 'conf.evling.tech.key']}, {'username': 'ivre', 'password': 'Ta9qF21', 'files': ['ivre.evling.tech.pem', 'ivre.evling.tech.key']}, {'username': 'dataease', 'password': '83k87', 'files': ['dataease.evling.tech.pem', 'dataease.evling.tech.key']}, {'username': 'grafana', 'password': 'vdF92m', 'files': ['grafana.evling.tech.pem', 'grafana.evling.tech.key']}, {'username': 'proxypool', 'password': '654P7', 'files': ['proxypool.evling.tech.pem', 'proxypool.evling.tech.key']}, {'username': 'pve-nas', 'password': 'bi14D7', 'files': ['pve-nas.evling.tech.pem', 'pve-nas.evling.tech.key']}, {'username': 'jellyfin', 'password': 'R1JFcb', 'files': ['jellyfin.evling.tech.pem', 'jellyfin.evling.tech.key']}, {'username': 'onlyoffice', 'password': 'HnU6', 'files': ['onlyoffice.evling.tech.pem', 'onlyoffice.evling.tech.key']}, {'username': 'vnet', 'password': '7kEWLx5', 'files': ['vnet.evling.tech.pem', 'vnet.evling.tech.key']}, {'username': 'elk', 'password': 'Hn4z83', 'files': ['elk.evling.tech.pem', 'elk.evling.tech.key']}, {'username': 'static', 'password': 'PU35UG', 'files': ['static.evling.tech.pem', 'static.evling.tech.key']}, {'username': 'gitlab', 'password': 'Q2v7cZ', 'files': ['gitlab.evling.tech.pem', 'gitlab.evling.tech.key']}, {'username': 'kodbox', 'password': 'Gu2R3e', 'files': ['kodbox.evling.tech.pem', 'kodbox.evling.tech.key']}, {'username': 'cloud', 'password': '6zj4ST', 'files': ['cloud.evling.tech.pem', 'cloud.evling.tech.key']}, {'username': 'wechat', 'password': 'eUXt', 'files': ['wechat.evling.tech.pem', 'wechat.evling.tech.key']}, {'username': 'pve-prod', 'password': '4VZTQG', 'files': ['pve-prod.evling.tech.pem', 'pve-prod.evling.tech.key']}, {'username': 'ichat', 'password': '3S655', 'files': ['ichat.evling.tech.pem', 'ichat.evling.tech.key']}, {'username': 'crawlab', 'password': 'r125SG', 'files': ['crawlab.evling.tech.pem', 'crawlab.evling.tech.key']}, {'username': 'jms', 'password': 'scMmP8', 'files': ['jms.evling.tech.pem', 'jms.evling.tech.key']}, ] # 基础变量 local_dir = './local_data' base_url='https://conf.evling.tech/' auth=('admin', 'your_admin_password') for each_item in data: # 上传文件 for file in each_item['files']: res = requests.post(base_url, files={'file': open(os.path.join(local_dir, file), 'rb')} ,auth=auth) print(res.content) # 新建用户 res = requests.post(url=base_url + "users/add", data={'username': each_item['username'], 'password': each_item['password']}, auth=auth) print(res.status_code) print(res.json()) # 授权文件访问 res = requests.post(url=base_url + "users/authorize", json={'username': each_item['username'], 'files': each_item['files']}, auth=auth) print(res.status_code) print(res.json())

消费客户端

消费客户端直接拿到上边给定的账号进行无脑下载即可,实现思路是,crontab 定时任务,将服务端的指定文件下载到本地的一个临时文件路径,用 diff 比对原文件与临时文件,不同即进行文件权限拷贝,并替换。wget 命令中记得密码改成你自己的,访问密码不建议以 bash 传参的形式,示例脚本:

#!/bin/bash # ScriptName: conf_client.sh echo "usage: -$0 file1 file2 url" file1=$1 file2=$2 url=$3 wget --user=www --password=pzgE8e $url --no-check-certificate -O $file2 if [ -f $file1 ] && [ -f $file2 ]; then diff $file1 $file2 > /dev/null if [ $? != 0 ]; then echo "Different!" chmod --reference=$file1 $file2 mv $file2 $file1 else echo "Same!" fi else echo "$file1 or $file2 does not exist, please check filename." fi

crontab 配置任务

# 每天中午12点同步一下 0 12 * * * /bin/bash/bash /data/tools/conf_client.sh /etc/letsencrypt/certs/www.evling.tech.pem /tmp/www.evling.tech.pem https://conf.evling.tech/uploads/www.evling.tech.pem

下期

  • 基于 openldap 实现账号统一管理

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...