這次再來給大家分享一波我工作中用到的幾個(gè)腳本,主要分為:Python和Shell兩個(gè)部分。
Python 腳本部分實(shí)例:企業(yè)微信告警、FTP 客戶端、SSH 客戶端、Saltstack 客戶端、vCenter 客戶端、獲取域名 ssl 證書過期時(shí)間、發(fā)送今天的天氣預(yù)報(bào)以及未來的天氣趨勢圖;
Shell 腳本部分實(shí)例:SVN 完整備份、Zabbix 監(jiān)控用戶密碼過期、構(gòu)建本地 YUM 以及上篇文章中有讀者的需求(負(fù)載高時(shí),查出占用比較高的進(jìn)程腳本并存儲(chǔ)或推送通知);
篇幅有些長,還請大家耐心翻到文末,畢竟有彩蛋。
Python 腳本部分
企業(yè)微信告警
此腳本通過企業(yè)微信應(yīng)用,進(jìn)行微信告警,可用于 Zabbix 監(jiān)控。
# -*- coding: utf-8 -*-
import requests
import json
class DLF:
def __init__(self, corpid, corpsecret):
self.url = \"https://qyapi.weixin.qq.com/cgi-bin\"
self.corpid = corpid
self.corpsecret = corpsecret
self._token = self._get_token
def _get_token(self):
\'\'\'
獲取企業(yè)微信API接口的access_token
:return:
\'\'\'
token_url = self.url \"/gettoken?corpid=%s&corpsecret=%s\" %(self.corpid, self.corpsecret)
try:
res = requests.get(token_url).json
token = res[\'access_token\']
return token
except Exception as e:
return str(e)
def _get_media_id(self, file_obj):
get_media_url = self.url \"/media/upload?access_token={}&type=file\".format(self._token)
data = {\"media\": file_obj}
try:
res = requests.post(url=get_media_url, files=data)
media_id = res.json[\'media_id\']
return media_id
except Exception as e:
return str(e)
def send_text(self, agentid, content, touser=None, toparty=None):
send_msg_url = self.url \"/message/send?access_token=%s\" % (self._token)
send_data = {
\"touser\": touser,
\"toparty\": toparty,
\"msgtype\": \"text\",
\"agentid\": agentid,
\"text\": {
\"content\": content
}
}
try:
res = requests.post(send_msg_url, data=json.dumps(send_data))
except Exception as e:
return str(e)
def send_image(self, agentid, file_obj, touser=None, toparty=None):
media_id = self._get_media_id(file_obj)
send_msg_url = self.url \"/message/send?access_token=%s\" % (self._token)
send_data = {
\"touser\": touser,
\"toparty\": toparty,
\"msgtype\": \"image\",
\"agentid\": agentid,
\"image\": {
\"media_id\": media_id
}
}
try:
res = requests.post(send_msg_url, data=json.dumps(send_data))
except Exception as e:
return str(e)
ftp 客戶端
通過 ftplib 模塊操作 ftp 服務(wù)器,進(jìn)行上傳下載等操作。
# -*- coding: utf-8 -*-
from ftplib import FTP
from os import path
import copy
class FTPClient:
def __init__(self, host, user, passwd, port=21):
self.host = host
self.user = user
self.passwd = passwd
self.port = port
self.res = {\'status\': True, \'msg\': None}
self._ftp = None
self._login
def _login(self):
\'\'\'
登錄FTP服務(wù)器
:return: 連接或登錄出現(xiàn)異常時(shí)返回錯(cuò)誤信息
\'\'\'
try:
self._ftp = FTP
self._ftp.connect(self.host, self.port, timeout=30)
self._ftp.login(self.user, self.passwd)
except Exception as e:
return e
def upload(self, localpath, remotepath=None):
\'\'\'
上傳ftp文件
:param localpath: local file path
:param remotepath: remote file path
:return:
\'\'\'
if not localpath: return \'Please select a local file. \'
# 讀取本地文件
# fp = open(localpath, \'rb\')
# 如果未傳遞遠(yuǎn)程文件路徑,則上傳到當(dāng)前目錄,文件名稱同本地文件
if not remotepath:
remotepath = path.basename(localpath)
# 上傳文件
self._ftp.storbinary(\'STOR \' remotepath, localpath)
# fp.close
def download(self, remotepath, localpath=None):
\'\'\'
localpath
:param localpath: local file path
:param remotepath: remote file path
:return:
\'\'\'
if not remotepath: return \'Please select a remote file. \'
# 如果未傳遞本地文件路徑,則下載到當(dāng)前目錄,文件名稱同遠(yuǎn)程文件
if not localpath:
localpath = path.basename(remotepath)
# 如果localpath是目錄的話就和remotepath的basename拼接
if path.isdir(localpath):
localpath = path.join(localpath, path.basename(remotepath))
# 寫入本地文件
fp = open(localpath, \'wb\')
# 下載文件
self._ftp.retrbinary(\'RETR \' remotepath, fp.write)
fp.close
def nlst(self, dir=\'/\'):
\'\'\'
查看目錄下的內(nèi)容
:return: 以列表形式返回目錄下的所有內(nèi)容
\'\'\'
files_list = self._ftp.nlst(dir)
return files_list
def rmd(self, dir=None):
\'\'\'
刪除目錄
:param dir: 目錄名稱
:return: 執(zhí)行結(jié)果
\'\'\'
if not dir: return \'Please input dirname\'
res = copy.deepcopy(self.res)
try:
del_d = self._ftp.rmd(dir)
res[\'msg\'] = del_d
except Exception as e:
res[\'status\'] = False
res[\'msg\'] = str(e)
return res
def mkd(self, dir=None):
\'\'\'
創(chuàng)建目錄
:param dir: 目錄名稱
:return: 執(zhí)行結(jié)果
\'\'\'
if not dir: return \'Please input dirname\'
res = copy.deepcopy(self.res)
try:
mkd_d = self._ftp.mkd(dir)
res[\'msg\'] = mkd_d
except Exception as e:
res[\'status\'] = False
res[\'msg\'] = str(e)
return res
def del_file(self, filename=None):
\'\'\'
刪除文件
:param filename: 文件名稱
:return: 執(zhí)行結(jié)果
\'\'\'
if not filename: return \'Please input filename\'
res = copy.deepcopy(self.res)
try:
del_f = self._ftp.delete(filename)
res[\'msg\'] = del_f
except Exception as e:
res[\'status\'] = False
res[\'msg\'] = str(e)
return res
def get_file_size(self, filenames=[]):
\'\'\'
獲取文件大小,單位是字節(jié)
判斷文件類型
:param filename: 文件名稱
:return: 執(zhí)行結(jié)果
\'\'\'
if not filenames: return {\'msg\': \'This is an empty directory\'}
res_l =
for file in filenames:
res_d = {}
# 如果是目錄或者文件不存在就會(huì)報(bào)錯(cuò)
try:
size = self._ftp.size(file)
type = \'f\'
except:
# 如果是路徑的話size顯示 - , file末尾加/ (/dir/)
size = \'-\'
type = \'d\'
file = file \'/\'
res_d[\'filename\'] = file
res_d[\'size\'] = size
res_d[\'type\'] = type
res_l.append(res_d)
return res_l
def rename(self, old_name=None, new_name=None):
\'\'\'
重命名
:param old_name: 舊的文件或者目錄名稱
:param new_name: 新的文件或者目錄名稱
:return: 執(zhí)行結(jié)果
\'\'\'
if not old_name or not new_name: return \'Please input old_name and new_name\'
res = copy.deepcopy(self.res)
try:
rename_f = self._ftp.rename(old_name, new_name)
res[\'msg\'] = rename_f
except Exception as e:
res[\'status\'] = False
res[\'msg\'] = str(e)
return res
def close(self):
\'\'\'
退出ftp連接
:return:
\'\'\'
try:
# 向服務(wù)器發(fā)送quit命令
self._ftp.quit
except Exception:
return \'No response from server\'
finally:
# 客戶端單方面關(guān)閉連接
self._ftp.close
SSH 客戶端
此腳本僅用于通過 key連接,如需要密碼連接,簡單修改下即可。
# -*- coding: utf-8 -*-
import paramiko
class SSHClient:
def __init__(self, host, port, user, pkey):
self.ssh_host = host
self.ssh_port = port
self.ssh_user = user
self.private_key = paramiko.RSAKey.from_private_key_file(pkey)
self.ssh = None
self._connect
def _connect(self):
self.ssh = paramiko.SSHClient
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
try:
self.ssh.connect(hostname=self.ssh_host, port=self.ssh_port, username=self.ssh_user, pkey=self.private_key, timeout=10)
except:
return \'ssh connect fail\'
def execute_command(self, command):
stdin, stdout, stderr = self.ssh.exec_command(command)
out = stdout.read
err = stderr.read
return out, err
def close(self):
self.ssh.close
Saltstack 客戶端
通過 api 對 Saltstack 服務(wù)端進(jìn)行操作,執(zhí)行命令。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import requests
import json
import copy
class SaltApi:
\"\"\"
定義salt api接口的類
初始化獲得token
\"\"\"
def __init__(self):
self.url = \"http://172.85.10.21:8000/\"
self.username = \"saltapi\"
self.password = \"saltapi\"
self.headers = {\"Content-type\": \"application/json\"}
self.params = {\'client\': \'local\', \'fun\': None, \'tgt\': None, \'arg\': None}
self.login_url = self.url \"login\"
self.login_params = {\'username\': self.username, \'password\': self.password, \'eauth\': \'pam\'}
self.token = self.get_data(self.login_url, self.login_params)[\'token\']
self.headers[\'X-Auth-Token\'] = self.token
def get_data(self, url, params):
\'\'\'
請求url獲取數(shù)據(jù)
:param url: 請求的url地址
:param params: 傳遞給url的參數(shù)
:return: 請求的結(jié)果
\'\'\'
send_data = json.dumps(params)
request = requests.post(url, data=send_data, headers=self.headers)
response = request.json
result = dict(response)
return result[\'return\'][0]
def get_auth_keys(self):
\'\'\'
獲取所有已經(jīng)認(rèn)證的key
:return:
\'\'\'
data = copy.deepcopy(self.params)
data[\'client\'] = \'wheel\'
data[\'fun\'] = \'key.list_all\'
result = self.get_data(self.url, data)
try:
return result[\'data\'][\'return\'][\'minions\']
except Exception as e:
return str(e)
def get_grains(self, tgt, arg=\'id\'):
\"\"\"
獲取系統(tǒng)基礎(chǔ)信息
:tgt: 目標(biāo)主機(jī)
:return:
\"\"\"
data = copy.deepcopy(self.params)
if tgt:
data[\'tgt\'] = tgt
else:
data[\'tgt\'] = \'*\'
data[\'fun\'] = \'grains.item\'
data[\'arg\'] = arg
result = self.get_data(self.url, data)
return result
def execute_command(self, tgt, fun=\'cmd.run\', arg=None, tgt_type=\'list\', salt_async=False):
\"\"\"
執(zhí)行saltstack 模塊命令,類似于salt \'*\' cmd.run \'command\'
:param tgt: 目標(biāo)主機(jī)
:param fun: 模塊方法 可為空
:param arg: 傳遞參數(shù) 可為空
:return: 執(zhí)行結(jié)果
\"\"\"
data = copy.deepcopy(self.params)
if not tgt: return {\'status\': False, \'msg\': \'target host not exist\'}
if not arg:
data.pop(\'arg\')
else:
data[\'arg\'] = arg
if tgt != \'*\':
data[\'tgt_type\'] = tgt_type
if salt_async: data[\'client\'] = \'local_async\'
data[\'fun\'] = fun
data[\'tgt\'] = tgt
result = self.get_data(self.url, data)
return result
def jobs(self, fun=\'detail\', jid=None):
\"\"\"
任務(wù)
:param fun: active, detail
:param jod: Job ID
:return: 任務(wù)執(zhí)行結(jié)果
\"\"\"
data = {\'client\': \'runner\'}
data[\'fun\'] = fun
if fun == \'detail\':
if not jid: return {\'success\': False, \'msg\': \'job id is none\'}
data[\'fun\'] = \'jobs.lookup_jid\'
data[\'jid\'] = jid
else:
return {\'success\': False, \'msg\': \'fun is active or detail\'}
result = self.get_data(self.url, data)
return result
vCenter 客戶端
通過官方 SDK 對 vCenter 進(jìn)行日常操作,此腳本是我用于 cmdb 平臺(tái)的,自動(dòng)獲取主機(jī)信息,存入數(shù)據(jù)庫。
from pyVim.connect import SmartConnect, Disconnect, SmartConnectNoSSL
from pyVmomi import vim
from asset import models
import atexit
class Vmware:
def __init__(self, ip, user, password, port, idc, vcenter_id):
self.ip = ip
self.user = user
self.password = password
self.port = port
self.idc_id = idc
self.vcenter_id = vcenter_id
def get_obj(self, content, vimtype, name=None):
\'\'\'
列表返回,name 可以指定匹配的對象
\'\'\'
container = content.viewManager.CreateContainerView(content.rootFolder, vimtype, True)
obj = [ view for view in container.view ]
return obj
def get_esxi_info(self):
# 宿主機(jī)信息
esxi_host = {}
res = {\"connect_status\": True, \"msg\": None}
try:
# connect this thing
si = SmartConnectNoSSL(host=self.ip, user=self.user, pwd=self.password, port=self.port, connectionPoolTimeout=60)
except Exception as e:
res[\'connect_status\'] = False
try:
res[\'msg\'] = (\"%s Caught vmodl fault : \" e.msg) % (self.ip)
except Exception as e:
res[\'msg\'] = \'%s: connection error\' % (self.ip)
return res
# disconnect this thing
atexit.register(Disconnect, si)
content = si.RetrieveContent
esxi_obj = self.get_obj(content, [vim.HostSystem])
for esxi in esxi_obj:
esxi_host[esxi.name] = {}
esxi_host[esxi.name][\'idc_id\'] = self.idc_id
esxi_host[esxi.name][\'vcenter_id\'] = self.vcenter_id
esxi_host[esxi.name][\'server_ip\'] = esxi.name
esxi_host[esxi.name][\'manufacturer\'] = esxi.summary.hardware.vendor
esxi_host[esxi.name][\'server_model\'] = esxi.summary.hardware.model
for i in esxi.summary.hardware.otherIdentifyingInfo:
if isinstance(i, vim.host.SystemIdentificationInfo):
esxi_host[esxi.name][\'server_sn\'] = i.identifierValue
# 系統(tǒng)名稱
esxi_host[esxi.name][\'system_name\'] = esxi.summary.config.product.fullName
# cpu總核數(shù)
esxi_cpu_total = esxi.summary.hardware.numCpuThreads
# 內(nèi)存總量 GB
esxi_memory_total = esxi.summary.hardware.memorySize / 1024 / 1024 /1024
# 獲取硬盤總量 GB
esxi_disk_total =0
for ds in esxi.datastore:
esxi_disk_total = ds.summary.capacity / 1024 / 1024 /1024
# 默認(rèn)配置4核8G100G,根據(jù)這個(gè)配置計(jì)算剩余可分配虛擬機(jī)
default_configure = {
\'cpu\': 4,
\'memory\': 8,
\'disk\':100
}
esxi_host[esxi.name][\'vm_host\'] =
vm_usage_total_cpu =0
vm_usage_total_memory =0
vm_usage_total_disk =0
# 虛擬機(jī)信息
for vm in esxi.vm:
host_info = {}
host_info[\'vm_name\'] = vm.name
host_info[\'power_status\'] = vm.runtime.powerState
host_info[\'cpu_total_kernel\'] = str(vm.config.hardware.numCPU) \'核\'
host_info[\'memory_total\'] = str(vm.config.hardware.memoryMB) \'MB\'
host_info[\'system_info\'] = vm.config.guestFullName
disk_info = \'\'
disk_total =0
for d in vm.config.hardware.device:
if isinstance(d, vim.vm.device.VirtualDisk):
disk_total = d.capacityInKB / 1024 /1024
disk_info = d.deviceInfo.label \": \" str((d.capacityInKB) / 1024 / 1024) \' GB\' \',\'
host_info[\'disk_info\'] = disk_info
esxi_host[esxi.name][\'vm_host\'].append(host_info)
# 計(jì)算當(dāng)前宿主機(jī)可用容量:總量 - 已分配的
if host_info[\'power_status\'] == \'poweredOn\':
vm_usage_total_cpu = vm.config.hardware.numCPU
vm_usage_total_disk = disk_total
vm_usage_total_memory = (vm.config.hardware.memoryMB / 1024)
esxi_cpu_free = esxi_cpu_total - vm_usage_total_cpu
esxi_memory_free = esxi_memory_total - vm_usage_total_memory
esxi_disk_free = esxi_disk_total - vm_usage_total_disk
esxi_host[esxi.name][\'cpu_info\'] = \'Total: %d核, Free: %d核\' % (esxi_cpu_total, esxi_cpu_free)
esxi_host[esxi.name][\'memory_info\'] = \'Total: %dGB, Free: %dGB\' % (esxi_memory_total, esxi_memory_free)
esxi_host[esxi.name][\'disk_info\'] = \'Total: %dGB, Free: %dGB\' % (esxi_disk_total, esxi_disk_free)
# 計(jì)算cpu 內(nèi)存 磁盤按照默認(rèn)資源分配的最小值,即為當(dāng)前可分配資源
if esxi_cpu_free < 4 or esxi_memory_free < 8 or esxi_disk_free < 100:
free_allocation_vm_host =0
else:
free_allocation_vm_host = int(min(
[
esxi_cpu_free / default_configure[\'cpu\'],
esxi_memory_free / default_configure[\'memory\'],
esxi_disk_free / default_configure[\'disk\']
]
))
esxi_host[esxi.name][\'free_allocation_vm_host\'] = free_allocation_vm_host
esxi_host[\'connect_status\'] = True
return esxi_host
def write_to_db(self):
esxi_host = self.get_esxi_info
# 連接失敗
if not esxi_host[\'connect_status\']:
return esxi_host
del esxi_host[\'connect_status\']
for machine_ip in esxi_host:
# 物理機(jī)信息
esxi_host_dict = esxi_host[machine_ip]
# 虛擬機(jī)信息
virtual_host = esxi_host[machine_ip][\'vm_host\']
del esxi_host[machine_ip][\'vm_host\']
obj = models.EsxiHost.objects.create(**esxi_host_dict)
obj.save
for host_info in virtual_host:
host_info[\'management_host_id\'] = obj.id
obj2 = models.virtualHost.objects.create(**host_info)
obj2.save
獲取域名 ssl 證書過期時(shí)間
用于 zabbix 告警
import re
import sys
import time
import subprocess
from datetime import datetime
from io import StringIO
def main(domain):
f = StringIO
comm = f\"curl -Ivs https://{domain} --connect-timeout 10\"
result = subprocess.getstatusoutput(comm)
f.write(result[1])
try:
m = re.search(\'start date: (.*?)n.*?expire date: (.*?)n.*?common name: (.*?)n.*?issuer: CN=(.*?)n\', f.getvalue, re.S)
start_date = m.group(1)
expire_date = m.group(2)
common_name = m.group(3)
issuer = m.group(4)
except Exception as e:
return 999999999
# time 字符串轉(zhuǎn)時(shí)間數(shù)組
start_date = time.strptime(start_date, \"%b %d %H:%M:%S %Y GMT\")
start_date_st = time.strftime(\"%Y-%m-%d %H:%M:%S\", start_date)
# datetime 字符串轉(zhuǎn)時(shí)間數(shù)組
expire_date = datetime.strptime(expire_date, \"%b %d %H:%M:%S %Y GMT\")
expire_date_st = datetime.strftime(expire_date,\"%Y-%m-%d %H:%M:%S\")
# 剩余天數(shù)
remaining = (expire_date-datetime.now).days
return remaining
if __name__ == \"__main__\":
domain = sys.argv[1]
remaining_days = main(domain)
print(remaining_days)
發(fā)送今天的天氣預(yù)報(bào)以及未來的天氣趨勢圖
此腳本用于給老婆大人發(fā)送今天的天氣預(yù)報(bào)以及未來的天氣趨勢圖,現(xiàn)在微信把網(wǎng)頁端禁止了,沒法發(fā)送到微信了,我是通過企業(yè)微信進(jìn)行通知的,需要把你老婆大人拉到企業(yè)微信,無興趣的小伙伴跳過即可。
# -*- coding: utf-8 -*-
import requests
import json
import datetime
def weather(city):
url = \"http://wthrcdn.etouch.cn/weather_mini?city=%s\" % city
try:
data = requests.get(url).json[\'data\']
city = data[\'city\']
ganmao = data[\'ganmao\']
today_weather = data[\'forecast\'][0]
res = \"老婆今天是{}n今天天氣概況n城市: {:<10}n時(shí)間: {:<10}n高溫: {:<10}n低溫: {:<10}n風(fēng)力: {:<10}n風(fēng)向: {:<10}n天氣: {:<10}nn稍后會(huì)發(fā)送近期溫度趨勢圖,請注意查看。
\".format(
ganmao,
city,
datetime.datetime.now.strftime(\'%Y-%m-%d\'),
today_weather[\'high\'].split[1],
today_weather[\'low\'].split[1],
today_weather[\'fengli\'].split(\'[\')[2].split(\']\')[0],
today_weather[\'fengxiang\'],today_weather[\'type\'],
)
return {\"source_data\": data, \"res\": res}
except Exception as e:
return str(e)
```
獲取天氣預(yù)報(bào)趨勢圖
```python
# -*- coding: utf-8 -*-
import matplotlib.pyplot as plt
import re
import datetime
def Future_weather_states(forecast, save_path, day_num=5):
\'\'\'
展示未來的天氣預(yù)報(bào)趨勢圖
:param forecast: 天氣預(yù)報(bào)預(yù)測的數(shù)據(jù)
:param day_num: 未來幾天
:return: 趨勢圖
\'\'\'
future_forecast = forecast
dict={}
for i in range(day_num):
data =
date = future_forecast[i][\"date\"]
date = int(re.findall(\"d \",date)[0])
data.append(int(re.findall(\"d \", future_forecast[i][\"high\"])[0]))
data.append(int(re.findall(\"d \", future_forecast[i][\"low\"])[0]))
data.append(future_forecast[i][\"type\"])
dict[date] = data
data_list = sorted(dict.items)
date=
high_temperature =
low_temperature =
for each in data_list:
date.append(each[0])
high_temperature.append(each[1][0])
low_temperature.append(each[1][1])
fig = plt.plot(date,high_temperature,\"r\",date,low_temperature,\"b\")
current_date = datetime.datetime.now.strftime(\'%Y-%m\')
plt.rcParams[\'font.sans-serif\'] = [\'SimHei\']
plt.rcParams[\'axes.unicode_minus\'] = False
plt.xlabel(current_date)
plt.ylabel(\"℃\")
plt.legend([\"高溫\", \"低溫\"])
plt.xticks(date)
plt.title(\"最近幾天溫度變化趨勢\")
plt.savefig(save_path)
```
發(fā)送到企業(yè)微信
```python
# -*- coding: utf-8 -*-
import requests
import json
class DLF:
def __init__(self, corpid, corpsecret):
self.url = \"https://qyapi.weixin.qq.com/cgi-bin\"
self.corpid = corpid
self.corpsecret = corpsecret
self._token = self._get_token
def _get_token(self):
\'\'\'
獲取企業(yè)微信API接口的access_token
:return:
\'\'\'
token_url = self.url \"/gettoken?corpid=%s&corpsecret=%s\" %(self.corpid, self.corpsecret)
try:
res = requests.get(token_url).json
token = res[\'access_token\']
return token
except Exception as e:
return str(e)
def _get_media_id(self, file_obj):
get_media_url = self.url \"/media/upload?access_token={}&type=file\".format(self._token)
data = {\"media\": file_obj}
try:
res = requests.post(url=get_media_url, files=data)
media_id = res.json[\'media_id\']
return media_id
except Exception as e:
return str(e)
def send_text(self, agentid, content, touser=None, toparty=None):
send_msg_url = self.url \"/message/send?access_token=%s\" % (self._token)
send_data = {
\"touser\": touser,
\"toparty\": toparty,
\"msgtype\": \"text\",
\"agentid\": agentid,
\"text\": {
\"content\": content
}
}
try:
res = requests.post(send_msg_url, data=json.dumps(send_data))
except Exception as e:
return str(e)
def send_image(self, agentid, file_obj, touser=None, toparty=None):
media_id = self._get_media_id(file_obj)
send_msg_url = self.url \"/message/send?access_token=%s\" % (self._token)
send_data = {
\"touser\": touser,
\"toparty\": toparty,
\"msgtype\": \"image\",
\"agentid\": agentid,
\"image\": {
\"media_id\": media_id
}
}
try:
res = requests.post(send_msg_url, data=json.dumps(send_data))
except Exception as e:
return str(e)
main腳本
# -*- coding: utf-8 -*-
from plugins.weather_forecast import weather
from plugins.trend_chart import Future_weather_states
from plugins.send_wechat import DLF
import os
# 企業(yè)微信相關(guān)信息
corpid = \"xxx\"
corpsecret = \"xxx\"
agentid = \"xxx\"
# 天氣預(yù)報(bào)趨勢圖保存路徑
_path = os.path.dirname(os.path.abspath(__file__))
save_path = os.path.join(_path ,\'./tmp/weather_forecast.jpg\')
# 獲取天氣預(yù)報(bào)信息
content = weather(\"大興\")
# 發(fā)送文字消息
dlf = DLF(corpid, corpsecret)
dlf.send_text(agentid=agentid, content=content[\'res\'], toparty=\'1\')
# 生成天氣預(yù)報(bào)趨勢圖
Future_weather_states(content[\'source_data\'][\'forecast\'], save_path)
# 發(fā)送圖片消息
file_obj = open(save_path, \'rb\')
dlf.send_image(agentid=agentid, toparty=\'1\', file_obj=file_obj)
Shell 腳本部分
SVN 完整備份
通過 hotcopy進(jìn)行 SVN 完整備份,備份保留 7 天。
#!/bin/bash
# Filename : svn_backup_repos.sh
# Date : 2020/12/14
# Author : JakeTian
# Email : JakeTian@***.com
# Crontab : 59 23 * * * /bin/bash $BASE_PATH/svn_backup_repos.sh >/dev/ 2>&1
# Notes : 將腳本加入crontab中,每天定時(shí)執(zhí)行
# Description: SVN完全備份
set -e
SRC_PATH=\"/opt/svndata\"
DST_PATH=\"/data/svnbackup\"
LOG_FILE=\"$DST_PATH/logs/svn_backup.log\"
SVN_BACKUP_C=\"/bin/svnadmin hotcopy\"
SVN_LOOK_C=\"/bin/svnlook youngest\"
TODAY=$(date \'%F\')
cd $SRC_PATH
ALL_REPOS=$(find ./ -maxdepth 1 -type d ! -name \'httpd\' -a ! -name \'bak\' | tr -d \'./\')
# 創(chuàng)建備份目錄,備份腳本日志目錄
test -d $DST_PATH || mkdir -p $DST_PATH
test -d $DST_PATH/logs || mkdir $DST_PATH/logs
test -d $DST_PATH/$TODAY || mkdir $DST_PATH/$TODAY
# 備份repos文件
for repo in $ALL_REPOS
do
$SVN_BACKUP_C $SRC_PATH/$repo $DST_PATH/$TODAY/$repo
# 判斷備份是否完成
if $SVN_LOOK_C $DST_PATH/$TODAY/$repo;then
echo \"$TODAY: $repo Backup Success\" >> $LOG_FILE
else
echo \"$TODAY: $repo Backup Fail\" >> $LOG_FILE
fi
done
# # 備份用戶密碼文件和權(quán)限文件
cp -p authz access.conf $DST_PATH/$TODAY
# 日志文件轉(zhuǎn)儲(chǔ)
mv $LOG_FILE $LOG_FILE-$TODAY
# 刪除七天前的備份
seven_days_ago=$(date -d \"7 days ago\" \'%F\')
rm -rf $DST_PATH/$seven_days_ago
zabbix 監(jiān)控用戶密碼過期
用于 Zabbix 監(jiān)控 Linux 系統(tǒng)用戶(shell 為 /bin/bash 和 /bin/sh)密碼過期,密碼有效期剩余 7 天觸發(fā)加自動(dòng)發(fā)現(xiàn)用戶。
#!/bin/bash
diskarray=(`awk -F\':\' \'$NF ~ //bin/bash/||//bin/sh/{print $1}\' /etc/passwd`)
length=${#diskarray[@]}
printf \"{n\"
printf \'t\'\"\"data\":[\"
for ((i=0;i<$length;i ))
do
printf \'ntt{\'
printf \"\"{#USER_NAME}\":\"${diskarray[$i]}\"}\"
if [ $i -lt $[$length-1] ];then
printf \',\'
fi
done
printf \"nt]n\"
printf \"}n\"
檢查用戶密碼過期
#!/bin/bash
export LANG=en_US.UTF-8
SEVEN_DAYS_AGO=$(date -d \'-7 day\' \'%s\')
user=\"$1\"
# 將Sep 09, 2018格式的時(shí)間轉(zhuǎn)換成unix時(shí)間
expires_date=$(sudo chage -l $user | awk -F\':\' \'/Password expires/{print $NF}\' | sed -n \'s/^ //p\')
if [[ \"$expires_date\" != \"never\" ]];then
expires_date=$(date -d \"$expires_date\" \'%s\')
if [ \"$expires_date\" -le \"$SEVEN_DAYS_AGO\" ];then
echo \"1\"
else
echo \"0\"
fi
else
echo \"0\"
fi
構(gòu)建本地YUM
通過 rsync 的方式同步 yum,通過 nginx 只做 http yum 站點(diǎn);
但是 centos6 的鏡像最近都不能用了,國內(nèi)貌似都禁用了,如果找到合適的自行更換地址。
#!/bin/bash
# 更新yum鏡像
RsyncCommand=\"rsync -rvutH -P --delete --delete-after --delay-updates --bwlimit=1000\"
DIR=\"/app/yumData\"
LogDir=\"$DIR/logs\"
Centos6Base=\"$DIR/Centos6/x86_64/Base\"
Centos7Base=\"$DIR/Centos7/x86_64/Base\"
Centos6Epel=\"$DIR/Centos6/x86_64/Epel\"
Centos7Epel=\"$DIR/Centos7/x86_64/Epel\"
Centos6Salt=\"$DIR/Centos6/x86_64/Salt\"
Centos7Salt=\"$DIR/Centos7/x86_64/Salt\"
Centos6Update=\"$DIR/Centos6/x86_64/Update\"
Centos7Update=\"$DIR/Centos7/x86_64/Update\"
Centos6Docker=\"$DIR/Centos6/x86_64/Docker\"
Centos7Docker=\"$DIR/Centos7/x86_64/Docker\"
Centos6Mysql5_7=\"$DIR/Centos6/x86_64/Mysql/Mysql5.7\"
Centos7Mysql5_7=\"$DIR/Centos7/x86_64/Mysql/Mysql5.7\"
Centos6Mysql8_0=\"$DIR/Centos6/x86_64/Mysql/Mysql8.0\"
Centos7Mysql8_0=\"$DIR/Centos7/x86_64/Mysql/Mysql8.0\"
MirrorDomain=\"rsync://rsync.mirrors.ustc.edu.cn\"
# 目錄不存在就創(chuàng)建
check_dir{
for dir in $*
do
test -d $dir || mkdir -p $dir
done
}
# 檢查rsync同步結(jié)果
check_rsync_status{
if [ $? -eq 0 ];then
echo \"rsync success\" >> $1
else
echo \"rsync fail\" >> $1
fi
}
check_dir $DIR $LogDir $Centos6Base $Centos7Base $Centos6Epel $Centos7Epel $Centos6Salt $Centos7Salt $Centos6Update $Centos7Update $Centos6Docker $Centos7Docker $Centos6Mysql5_7 $Centos7Mysql5_7 $Centos6Mysql8_0 $Centos7Mysql8_0
# Base yumrepo
#$RsyncCommand \"$MirrorDomain\"/repo/centos/6/os/x86_64/ $Centos6Base >> \"$LogDir/centos6Base.log\" 2>&1
# check_rsync_status \"$LogDir/centos6Base.log\"
$RsyncCommand \"$MirrorDomain\"/repo/centos/7/os/x86_64/ $Centos7Base >> \"$LogDir/centos7Base.log\" 2>&1
check_rsync_status \"$LogDir/centos7Base.log\"
# Epel yumrepo
# $RsyncCommand \"$MirrorDomain\"/repo/epel/6/x86_64/ $Centos6Epel >> \"$LogDir/centos6Epel.log\" 2>&1
# check_rsync_status \"$LogDir/centos6Epel.log\"
$RsyncCommand \"$MirrorDomain\"/repo/epel/7/x86_64/ $Centos7Epel >> \"$LogDir/centos7Epel.log\" 2>&1
check_rsync_status \"$LogDir/centos7Epel.log\"
# SaltStack yumrepo
# $RsyncCommand \"$MirrorDomain\"/repo/salt/yum/redhat/6/x86_64/ $Centos6Salt >> \"$LogDir/centos6Salt.log\" 2>&1
# ln -s $Centos6Salt/archive/$(ls $Centos6Salt/archive | tail -1) $Centos6Salt/latest
# check_rsync_status \"$LogDir/centos6Salt.log\"
$RsyncComman \"$MirrorDomain\"/repo/salt/yum/redhat/7/x86_64/ $Centos7Salt >> \"$LogDir/centos7Salt.log\" 2>&1
check_rsync_status \"$LogDir/centos7Salt.log\"
# ln -s $Centos7Salt/archive/$(ls $Centos7Salt/archive | tail -1) $Centos7Salt/latest
# Docker yumrepo
$RsyncCommand \"$MirrorDomain\"/repo/docker-ce/linux/centos/7/x86_64/stable/ $Centos7Docker >> \"$LogDir/centos7Docker.log\" 2>&1
check_rsync_status \"$LogDir/centos7Docker.log\"
# centos update yumrepo
# $RsyncCommand \"$MirrorDomain\"/repo/centos/6/updates/x86_64/ $Centos6Update >> \"$LogDir/centos6Update.log\" 2>&1
# check_rsync_status \"$LogDir/centos6Update.log\"
$RsyncCommand \"$MirrorDomain\"/repo/centos/7/updates/x86_64/ $Centos7Update >> \"$LogDir/centos7Update.log\" 2>&1
check_rsync_status \"$LogDir/centos7Update.log\"
# mysql 5.7 yumrepo
# $RsyncCommand \"$MirrorDomain\"/repo/mysql-repo/yum/mysql-5.7-community/el/6/x86_64/ \"$Centos6Mysql5_7\" >> \"$LogDir/centos6Mysql5.7.log\" 2>&1
# check_rsync_status \"$LogDir/centos6Mysql5.7.log\"
$RsyncCommand \"$MirrorDomain\"/repo/mysql-repo/yum/mysql-5.7-community/el/7/x86_64/ \"$Centos7Mysql5_7\" >> \"$LogDir/centos7Mysql5.7.log\" 2>&1
check_rsync_status \"$LogDir/centos7Mysql5.7.log\"
# mysql 8.0 yumrepo
# $RsyncCommand \"$MirrorDomain\"/repo/mysql-repo/yum/mysql-8.0-community/el/6/x86_64/ \"$Centos6Mysql8_0\" >> \"$LogDir/centos6Mysql8.0.log\" 2>&1
# check_rsync_status \"$LogDir/centos6Mysql8.0.log\"
$RsyncCommand \"$MirrorDomain\"/repo/mysql-repo/yum/mysql-8.0-community/el/7/x86_64/ \"$Centos7Mysql8_0\" >> \"$LogDir/centos7Mysql8.0.log\" 2>&1
check_rsync_status \"$LogDir/centos7Mysql8.0.log\"
讀者需求解答
負(fù)載高時(shí),查出占用比較高的進(jìn)程腳本并存儲(chǔ)或推送通知
這部分內(nèi)容是上篇 Shell 腳本實(shí)例中底部讀者留言的需求,如下:
#!/bin/bash
# 物理cpu個(gè)數(shù)
physical_cpu_count=$(egrep \'physical id\' /proc/cpuinfo | sort | uniq | wc -l)
# 單個(gè)物理cpu核數(shù)
physical_cpu_cores=$(egrep \'cpu cores\' /proc/cpuinfo | uniq | awk \'{print $NF}\')
# 總核數(shù)
total_cpu_cores=$((physical_cpu_count*physical_cpu_cores))
# 分別是一分鐘、五分鐘、十五分鐘負(fù)載的閾值,其中有一項(xiàng)超過閾值才會(huì)觸發(fā)
one_min_load_threshold=\"$total_cpu_cores\"
five_min_load_threshold=$(awk \'BEGIN {print \'\"$total_cpu_cores\"\' * \"0.8\"}\')
fifteen_min_load_threshold=$(awk \'BEGIN {print \'\"$total_cpu_cores\"\' * \"0.7\"}\')
# 分別是分鐘、五分鐘、十五分鐘負(fù)載平均值
one_min_load=$(uptime | awk \'{print $(NF-2)}\' | tr -d \',\')
five_min_load=$(uptime | awk \'{print $(NF-1)}\' | tr -d \',\')
fifteen_min_load=$(uptime | awk \'{print $NF}\' | tr -d \',\')
# 獲取當(dāng)前cpu 內(nèi)存 磁盤io信息,并寫入日志文件
# 如果需要發(fā)送消息或者調(diào)用其他,請自行編寫函數(shù)即可
get_info{
log_dir=\"cpu_high_script_log\"
test -d \"$log_dir\" || mkdir \"$log_dir\"
ps -eo user,pid,%cpu,stat,time,command --sort -%cpu | head -10 > \"$log_dir\"/cpu_top10.log
ps -eo user,pid,%mem,rss,vsz,stat,time,command --sort -%mem | head -10 > \"$log_dir\"/mem_top10.log
iostat -dx 1 10 > \"$log_dir\"/disk_io_10.log
}
export -f get_info
echo \"$one_min_load $one_min_load_threshold $five_min_load $five_min_load_threshold $fifteen_min_load $fifteen_min_load_threshold\" |
awk \'{ if ($1>=$2 || $3>=$4 || $5>=$6) system(\"get_info\") }\'
以上,就是今天分享的全部內(nèi)容了。
希望大家通過這些案例能夠?qū)W以致用,結(jié)合自身的實(shí)際場景進(jìn)行運(yùn)用,從而提高自己的工作效率。
版權(quán)聲明:本文內(nèi)容由互聯(lián)網(wǎng)用戶自發(fā)貢獻(xiàn),該文觀點(diǎn)僅代表作者本人。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如發(fā)現(xiàn)本站有涉嫌抄襲侵權(quán)/違法違規(guī)的內(nèi)容, 請發(fā)送郵件至 舉報(bào),一經(jīng)查實(shí),本站將立刻刪除。