система умный дом
Хотели бы иметь собственного бота, который будет помощником, сигнализацией, пультом от всего, а заодно и кроссплатформенным приложением, работающим на всех ваших девайсах?
За последние пол года я написал около полусотни разных ботов для мессенджера Telegram. Пора немного поделиться хотя бы простыми примерами 🙂
Для тех, кто не любит читать, мы подготовили небольшой видеообзор, демонстрирующий возможности связок, описанных ниже.
Осторожно, будет много букаф!
Зачем мне бот от Telegram?
Телеграм, или в народе «телега», «телек», — очень крутой мессенджер! И не только тем, что он безопасный и удобный, а еще и тем, что у него открытое api. Такие программы мы любим. Это значит, что можно объединить все самое важное и критично-необходимое в одном мессенджере. С точки зрения удобства использования, бот — это всего лишь ваш контакт в телефоне, который умеет «кое-что еще». С практической точки зрения — целый сервер домашней автоматизации (Raspberry pi или любой другой МК) можно подключить к управлению и мониторингу через бота телеграма в вашем телефоне. При этом вы не заботитесь о «белой» IP адресации, или транзитном облаке для вашего трафика. Весь транспорт берет на себя Telegram! На устройстве, где будет написан ваш бот, просто достаточно иметь выход в сеть Интернет, но этим дело не ограничивается. С помощью бота можно соединить кучу областей жизнедеятельности и работы: напоминание покормить кота, вывод квартальных отчетов, управление светом на даче и т.д. и т. п. На что хватит фантазии.
Сегодня мы остановимся на небольшом симбиозе контроля и управления. Для примера нам понадобятся:
- датчик температуры и влажности
- датчик контроля протечек
- датчик движения или объема
- реле 220v c 3-5 вольтовой логикой управления
Сразу оговорюсь, что список может быть не полным или вы можете что-то выкинуть из него. Что подключать, решать вам. Как подключить все эти устройства и проверить их минимальную работу, можно посмотреть по ссылкам ниже.
Si7021: raspberry + датчик температуры и влажности
YL-83: raspberry + датчик контроля протечек
Dfrobot ir motion sensor: raspberry pi + датчик движения
Реле 220v: raspberry pi подключение реле
Создание бота Telegram
Естественное условие — наличие телеграма на вашем девайсе (телефон, пк, планшет — без разницы).
Открываем Telegram, идем в контакты и ищем в поисковой строке @botfather. Кликаем на него. Это мастер или менеджер создания и управления всех ваших ботов.
Вводим в строку ввода «/» и нажимаем «/newbot». Далее все по инструкции.
В конечном итоге, после того как вы придумаете имя контакта и уникальное имя и успешно все создадите, у вас появится TOKEN. Никому не говорите его и не пересылайте, вы будете использовать его в качестве ключа авторизации в ваших программах. При желании можете посмотреть остальные доступные команды, все они начинаются со «/». Например, вы можете создать аватарку вашему боту.
И как мне Telegram-бота написать?
Терпение, мой мальчик, и их_теозавры станут нашими! Для начала хотел бы поделиться очень удобным и простым приложением-помощником для работы с API Telegram — Telepot
Также для реализации всех возможностей потребуется Python3.5. Как установить Python3.5 вы можете прочитать в отдельной статье.
После того как Python3.5 установлен, ставим Telepot из pip:
1
|
$ pip3.5 install telepot
|
После установки приложения убедитесь, что >>> import telepot проходит успешно в python3.5
Больше кода богу кода!
В сети видел достаточно примеров ботов, но, если честно, бОльшая часть не красива с точки зрения пользователя. Очень часто людям приходится писать в телефоне команды, а не кликать. Даже если вы видели примеры с кликами, чаще всего они со слешами в начале, которые, мягко говоря, раздражают людей. В данном примере мы попытаемся это исправить. Будем делать вот такие красивые клавиатуры и кнопки со своим содержимым и логикой работы, как на картинке ниже:
В боте будут присутствовать блок управления реле, блок с отображением текущей информации используемых датчиков (влажность/температура/вода/обратная связь от реле) и блок настроек сигнализаций. Последний блок — это такой же блок управления, только не датчиками, а программами или их параметрами. Так же хочу отметить, что будет настроена фильтрация по разрешенным персональным id телеграмма. Данная навеска позволит иметь доступ к боту только вам и вашим близким. Если кратко, то это самые необходимые примеры работы бота в различных проектах. Произвольные клавиатуры могут содержать текст, вызов контакта, локации и еще много чего. «Волшебные серые кнопки» имеют не меньший функционал. Если вы хотите более подробно разобраться с возможностями элементов — обратитесь к официальной документации api.
Поехали. Создаем тело бота:
1
2
|
$ sudo -i
# vim /home/pi/bot.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
|
#!/usr/bin/env python3.5
# -*- coding: utf-8 -*-
#project: paradox-sec.ru
import subprocess
from subprocess import Popen, PIPE
import sys,os
import asyncio
import telepot
import telepot.aio
from telepot.namedtuple import ReplyKeyboardMarkup, KeyboardButton, ReplyKeyboardRemove, ForceReply
from telepot.namedtuple import InlineKeyboardMarkup, InlineKeyboardButton
from telepot.namedtuple import InlineQueryResultArticle, InlineQueryResultPhoto, InputTextMessageContent
#Тут должны находиться ваши айдишники (для примера я сделал 2 разрешенных)
#Вы можете запустить бота и увидеть при нажатии меню или /start ваш личный айдишник
#Ваши разрешенные айди нужно прописать в переменных chat_allow, заменив None на айдишники
#chat_allow1=123456789
#chat_allow2=987654321
chat_allow1=None
chat_allow2=None
# Ниже пути расположения скриптов чтения значений датчиков и управление реле.
# Каждый файл — исполняемый питоновский скрипт.
# Необходимо чтобы все файлы были представлены в системе и были исполняемыми.
file_read_temp = ‘/home/pi/si7021_temp.py’
file_read_hum = ‘/home/pi/si7021_hum.py’
file_read_water = ‘/home/pi/water_read.py’
file_read_relay = ‘/home/pi/relay_state.py’
file_relay_on = ‘/home/pi/relay_on.py’
file_relay_off = ‘/home/pi/relay_off.py’
# Блок переменных ниже — блок файлов-индификаторов (просто айдишник), отвечающиx за состояние сигнализации.
# Для них в директории /home/pi создается отдельная директория (/home/pi/alert_state)
# Если файл сигнализации присутствует — значит сигнализация должна слать алерты при срабатывании.
# Если файл отсутствует значит считаем что сигнализация выключена. Необходимо для перезагрузок (обесточевания)
# И других программ которые хотят знать что с сигнализацией
water_id = «/home/pi/alert_state/w_on»
motion_id = «/home/pi/alert_state/m_on»
temper_id = «/home/pi/alert_state/t_on»
# Файл со значением минимального температурного порога срабатывания температурной сигналки
# файл должен существовать и необходимо внести в него значение температуры (целое число)
critical_temp = «/home/pi/alert_state/critical_temp»
# считывание температуры из скрипта для si7021
def temp_read():
proc = Popen([‘%s’ %file_read_temp], shell=True, stdout=PIPE, stderr=PIPE)
proc.wait()
t = proc.stdout.read()
t = float(t)
return t
# считывание влажности из скрипта для si7021
def hum_read():
proc = Popen([‘%s’ %file_read_hum], shell=True, stdout=PIPE, stderr=PIPE)
proc.wait()
H = proc.stdout.read()
H = float(H)
return H
# считывание значения с датчика воды
def water_read():
proc = Popen([‘%s’ %file_read_water], shell=True, stdout=PIPE, stderr=PIPE)
proc.wait()
w = proc.communicate()[0]
w = w.decode(encoding=‘utf-8’)
return w
# считывание состояния пина на котором висит реле
def relay_read():
proc = Popen([‘%s’ %file_read_relay], shell=True, stdout=PIPE, stderr=PIPE)
proc.wait()
r = proc.communicate()[0]
r = int(r)
if r == 1:
r=‘Реле включено’
elif r==0:
r=‘Реле обесточено’
else:
r=‘Ошибка!’
return r
# включение/выключение реле в зависимости от входящего параметра
def relay_execute(state):
if state == ‘on’ and relay_read() == ‘Реле обесточено’:
subprocess.call(«%s» %file_relay_on, shell=True)
text = «включаю реле»
elif state == ‘on’ and relay_read() == ‘Реле включено’:
text = «реле уже под напряжением»
elif state == ‘off’ and relay_read() == ‘Реле включено’:
subprocess.call(«%s» %file_relay_off, shell=True)
text = «отключаю реле»
elif state == ‘off’ and relay_read() == ‘Реле обесточено’:
text = «реле уже обесточено»
else:
print(«Ошибка!»)
return text
# управление сигнализациями alarm: on/off. file_id — айдишник сигнализации (см выше)
def alert_f(alarm, file_id):
#сигнализация уже включена
if alarm == ‘on’ and os.path.exists(file_id):
text = «Сигнализация уже была включена»
#была включена, теперь отключаем
elif alarm == ‘off’ and os.path.exists(file_id):
text = «Отключаю сигнализацию»
subprocess.call(«rm -f %s» %file_id, shell=True)
#уже была выключена, выключать не надо
elif alarm == ‘off’ and os.path.exists(file_id) == False:
text = «Сигнализация уже была отключена»
#выла выключена, теперь включаем
elif alarm == ‘on’ and os.path.exists(file_id) == False:
text = «Активирую сигнализацию»
subprocess.call(«touch %s» %file_id, shell=True)
else:
text = «err»
return text
# текущее сотояние сигнализации. file_id — айдишник сигнализации (см выше)
def alert_info_f(file_id):
if os.path.exists(file_id):
text = «Сигнализация сейчас активна»
else:
text = «Сигнализация сейчас отключена»
return text
# Текущее минимальное значение температуры
# считывание значения с датчика воды
def c_t_read():
proc = Popen([‘cat %s’ %critical_temp], shell=True, stdout=PIPE, stderr=PIPE)
proc.wait()
c_t = proc.communicate()[0]
c_t = c_t.decode(encoding=‘utf-8’)
c_t = «nПорог срабатывания установлен на «+c_t+» градусов»
return c_t
#################################
#Блоки дальше — тело самого бота#
#################################
message_with_inline_keyboard = None
id_write_critical_temper = 0
#эта функция отвечает за текстовые сообщения и «клавиатуру»
async def on_chat_message(msg):
global id_write_critical_temper
content_type, chat_type, chat_id = telepot.glance(msg)
print(‘Chat:’, content_type, chat_type)
print(«id отправителя сообщения: «+str(chat_id))
if chat_id == chat_allow1 or chat_id == chat_allow2:
if content_type != ‘text’:
return
else:
ok=1
command = msg[‘text’].lower()
print(command)
if command == ‘/start’:
markup = ReplyKeyboardMarkup(keyboard=[
[dict(text=‘инфо’)],
[dict(text=‘управление’)],
[dict(text=‘сигнализация’)],
])
await bot.sendMessage(chat_id, ‘чем воспользуешься?’, reply_markup=markup)
elif command == ‘главное меню’:
markup = ReplyKeyboardMarkup(keyboard=[
[dict(text=‘инфо’)],
[dict(text=‘управление’)],
[dict(text=‘сигнализация’)],
])
await bot.sendMessage(chat_id, ‘выбери раздел’, reply_markup=markup)
elif command == u‘инфо’:
markup = ReplyKeyboardMarkup(keyboard=[
[dict(text=‘вода’), dict(text=‘розетка’)],
[dict(text=‘температура’), dict(text=‘влажность’)],
[dict(text=‘главное меню’)],
])
await bot.sendMessage(chat_id, ‘выбери объект’, reply_markup=markup)
elif command == u‘управление’:
markup = InlineKeyboardMarkup(inline_keyboard=[
[dict(text=‘включить’, callback_data=‘relay_on’), dict(text=‘отключить’, callback_data=‘relay_off’)],
[dict(text=‘текущее состояние’, callback_data=‘relay_info’)],
])
global message_with_inline_keyboard
message_with_inline_keyboard = await bot.sendMessage(chat_id, ‘Что сделать с розеткой?’, reply_markup=markup)
elif command == u‘сигнализация’:
markup = ReplyKeyboardMarkup(keyboard=[
[dict(text=‘контроль воды’)],
[dict(text=‘контроль движения’)],
[dict(text=‘контроль температуры’)],
[dict(text=‘главное меню’)],
])
await bot.sendMessage(chat_id, ‘какой раздел необходим?’, reply_markup=markup)
elif command == u‘температура’:
markup = ReplyKeyboardMarkup(keyboard=[
[dict(text=‘вода’), dict(text=‘розетка’)],
[dict(text=‘температура’), dict(text=‘влажность’)],
[dict(text=‘главное меню’)]
])
#считываем значение с датчика температуры
t = str(temp_read())+‘C°’
await bot.sendMessage(chat_id, ‘Текущая температура: %s’ %t, reply_markup=markup)
elif command == u‘влажность’:
markup = ReplyKeyboardMarkup(keyboard=[
[dict(text=‘вода’), dict(text=‘розетка’)],
[dict(text=‘температура’), dict(text=‘влажность’)],
[dict(text=‘главное меню’)]
])
#считываем значение с датчика влажности
h = str(hum_read())+‘%’
await bot.sendMessage(chat_id, ‘Текущая влажность: %s’ %h, reply_markup=markup)
elif command == u‘вода’:
markup = ReplyKeyboardMarkup(keyboard=[
[dict(text=‘вода’), dict(text=‘розетка’)],
[dict(text=‘температура’), dict(text=‘влажность’)],
[dict(text=‘главное меню’)]
])
#считываем значение с датчика воды
w = str(water_read())
await bot.sendMessage(chat_id, ‘Текущее состояние сенсора воды: %s’ %w, reply_markup=markup)
elif command == u‘розетка’:
markup = ReplyKeyboardMarkup(keyboard=[
[dict(text=‘вода’), dict(text=‘розетка’)],
[dict(text=‘температура’), dict(text=‘влажность’)],
[dict(text=‘главное меню’)]
])
#считываем значение с пина, на который подключено реле
R=str(relay_read())
await bot.sendMessage(chat_id, ‘Состояние розетки (реле): %s’ %R, reply_markup=markup)
elif command == u‘контроль воды’:
markup = InlineKeyboardMarkup(inline_keyboard=[
[dict(text=‘включить’, callback_data=‘water_on’), dict(text=‘отключить’, callback_data=‘water_off’)],
[dict(text=‘текущее состояние’, callback_data=‘water_alert_info’)],
])
message_with_inline_keyboard = await bot.sendMessage(chat_id, ‘Опции сигнализации воды:’, reply_markup=markup)
elif command == u‘контроль движения’:
markup = InlineKeyboardMarkup(inline_keyboard=[
[dict(text=‘включить’, callback_data=‘motion_on’), dict(text=‘отключить’, callback_data=‘motion_off’)],
[dict(text=‘текущее состояние’, callback_data=‘motion_alert_info’)],
])
message_with_inline_keyboard = await bot.sendMessage(chat_id, ‘Опции сигнализации движения:’, reply_markup=markup)
elif command == u‘контроль температуры’:
markup = InlineKeyboardMarkup(inline_keyboard=[
[dict(text=‘включить’, callback_data=‘temp_on’), dict(text=‘отключить’, callback_data=‘temp_off’)],
[dict(text=‘порог срабатывания’, callback_data=‘temp_alert_min’)],
[dict(text=‘текущее состояние’, callback_data=‘temp_alert_info’)],
])
message_with_inline_keyboard = await bot.sendMessage(chat_id, ‘Опции сигнализации температуры:’, reply_markup=markup)
else:
if id_write_critical_temper == 1:
#если происходит установка температуры срабатывания
if command.isdigit():
subprocess.call(«echo %s > %s» %(command, critical_temp), shell=True)
markup = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text=‘главное меню’)]])
await bot.sendMessage(chat_id, str(«Температурный минимум установлен в %s градусов. Ниже этой температуры будут приходить алерты») %command, reply_markup=markup)
id_write_critical_temper = 0
else:
markup = ReplyKeyboardMarkup(keyboard=[[KeyboardButton(text=‘главное меню’)]])
await bot.sendMessage(chat_id, str(«%s — это не целое число. При необходимости пройдите настройку заново. Значение не установлено!») %command, reply_markup=markup)
id_write_critical_temper = 0
else:
#если ввели текст, не соответствующий команде
await bot.sendMessage(chat_id, str(«начните чат с команды /start»))
else:
#если чат айди не соответствует разрешенному
markup_protect = ReplyKeyboardMarkup(keyboard=[[dict(text=‘я очень тугой, еще раз можно?’)]])
await bot.sendMessage(chat_id, ‘Вы не имеете доступа к этому боту! Обратитесь к владельцу за разрешением.’, reply_markup=markup_protect)
return
#эта функция отвечает за «волшебные полупрозрачные кнопки»
async def on_callback_query(msg):
global id_write_critical_temper
query_id, from_id, data = telepot.glance(msg, flavor=‘callback_query’)
print(‘Callback query:’, query_id, data)
id_owner_callback=msg[‘from’][‘id’]
print(«id отправителя запроса: «+str(id_owner_callback))
if id_owner_callback == chat_allow1 or id_owner_callback == chat_allow2:
#управление реле (розеткой)
if data == ‘relay_on’:
R_inf = str(relay_execute(‘on’))
await bot.answerCallbackQuery(query_id, text=‘%s’ %R_inf, show_alert=True)
elif data == ‘relay_off’:
R_inf = str(relay_execute(‘off’))
await bot.answerCallbackQuery(query_id, text=‘%s’ %R_inf, show_alert=True)
elif data == ‘relay_info’:
R=str(relay_read())
await bot.answerCallbackQuery(query_id, text=‘%s’%R, show_alert=True)
#управление сигнализацией воды
elif data == ‘water_on’:
inf = str(alert_f(‘on’, water_id))
await bot.answerCallbackQuery(query_id, text=‘%s’ %inf, show_alert=True)
elif data == ‘water_off’:
inf = str(alert_f(‘off’, water_id))
await bot.answerCallbackQuery(query_id, text=‘%s’ %inf, show_alert=True)
elif data == ‘water_alert_info’:
inf = str(alert_info_f(water_id))
await bot.answerCallbackQuery(query_id, text=‘%s’ %inf, show_alert=True)
#управление сигнализацией движения
elif data == ‘motion_on’:
inf = str(alert_f(‘on’, motion_id))
await bot.answerCallbackQuery(query_id, text=‘%s’ %inf, show_alert=True)
elif data == ‘motion_off’:
inf = str(alert_f(‘off’, motion_id))
await bot.answerCallbackQuery(query_id, text=‘%s’ %inf, show_alert=True)
elif data == ‘motion_alert_info’:
inf = str(alert_info_f(motion_id))
await bot.answerCallbackQuery(query_id, text=‘%s’ %inf, show_alert=True)
#управление сигнализацией температуры
elif data == ‘temp_on’:
inf = str(alert_f(‘on’, temper_id))
await bot.answerCallbackQuery(query_id, text=‘%s’ %inf, show_alert=True)
elif data == ‘temp_off’:
inf = str(alert_f(‘off’, temper_id))
await bot.answerCallbackQuery(query_id, text=‘%s’ %inf, show_alert=True)
elif data == ‘temp_alert_info’:
inf = str(alert_info_f(temper_id))
info_c_t = str(c_t_read())
inf = inf+info_c_t
await bot.answerCallbackQuery(query_id, text=‘%s’ %inf, show_alert=True)
elif data == ‘temp_alert_min’:
id_write_critical_temper = 1
await bot.answerCallbackQuery(query_id, text=‘Установите min порог срабатывания температурной сигнализации. Введите целое число.’, show_alert=True)
else:
next=1
else:
await bot.answerCallbackQuery(query_id, text=‘У вас нет доступа’, show_alert=True)
#В TOKEN должен находиться ваш токен, полученый при создании бота!
#замените значение на свои данные!
TOKEN = «12345678:xxxxx-xyxyxyxyxyxyxyxyxyxyxyxyxyxyx»
bot = telepot.aio.Bot(TOKEN)
loop = asyncio.get_event_loop()
#вызов списка ваших функций для работы с api
loop.create_task(bot.message_loop({‘chat’: on_chat_message,
‘callback_query’: on_callback_query}))
#project: paradox-sec.ru
print(‘Listening …’)
loop.run_forever()
|
Делаем файл исполняемым:
1
|
# chmod 700 /home/pi/bot.py
|
Вам необходимо в теле бота заменить разрешенные айдишники со значением None на свои, а также записать в TOKEN свои данные, полученные при регистрации бота. Как узнать свой id в Telegram?. Айдишник можно узнать, просто запустив бота и нажав на старт. Вы увидите диагностическую информацию с вашим айдишником на экране консоли.
1
2
3
4
5
6
7
8
|
root@raspberrypi:~# /home/pi/bot.py
Listening ...
Chat: text private
id отправителя сообщения: 1XXXXXXXX — это ваш айдишник
главное меню
Chat: text private
id отправителя сообщения: 1XXXXXXXX — это ваш айдишник
сигнализация
|
Но перед тем как запускать бота, убедитесь, что все необходимые директории и файлы из bot.py представлены в системе! (либо перепишите сценарий под себя).
Для того чтобы вы лучше понимали, как и что должно будет работать и зачем все файлы и команды ниже, нарисую маленькую схему. Схема не претендует на звание лучшей, она лишь отображает все в упрощенном виде:
Теперь опишем все более детально.
Директория для состояния сигнализаций и файл с критической температурой:
1
2
|
# mkdir /home/pi/alert_state
# echo 10 > /home/pi/alert_state/critical_temp
|
Сценарий включения реле (signal на 20 pin BCM):
1
|
# vim /home/pi/relay_on.py
|
1
2
3
4
5
6
7
8
9
10
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#project: paradox-sec.ru
import RPi.GPIO as GPIO
gpio_pin_number=20 #на 20 пине (BCM) сигнальный контакт реле
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(gpio_pin_number, GPIO.OUT)
GPIO.output(gpio_pin_number, GPIO.HIGH)
|
1
|
# chmod +x /home/pi/relay_on.py
|
Сценарий отключения реле (signal на 20 pin BCM):
1
|
# vim /home/pi/relay_off.py
|
1
2
3
4
5
6
7
8
9
10
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#project: paradox-sec.ru
import RPi.GPIO as GPIO
gpio_pin_number=20 #на 20 пине (BCM) сигнальный контакт реле
GPIO.setmode(GPIO.BCM)
GPIO.setwarnings(False)
GPIO.setup(gpio_pin_number, GPIO.OUT)
GPIO.output(gpio_pin_number, GPIO.LOW)
|
1
|
# chmod +x /home/pi/relay_off.py
|
Текущее состояние реле смотрим через пин, на котором подключен сигнальный провод реле. (Конечно, вам никто не мешает сделать более реальный контроль через опторазвязку):
1
|
# vim /home/pi/relay_state.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#project: paradox-sec.ru
import subprocess
from subprocess import Popen, PIPE
pin_number=20 #пин с сигнальным проводом реле
proc = Popen(
«echo %s > /sys/class/gpio/export» % pin_number,
shell=True,
stdout=PIPE, stderr=PIPE
)
proc.wait()
proc = Popen(
«cat /sys/class/gpio/gpio%s/value» % pin_number,
shell=True,
stdout=PIPE, stderr=PIPE
)
proc.wait()
res = proc.communicate()
count = res[0].replace(«n»,«»)
count = int(count)
if count == 0:
print(‘0’).replace(«n»,«»)
else:
print(‘1’).replace(«n»,«»)
|
1
|
# chmod +x /home/pi/relay_state.py
|
Считывание температуры из Si7021 (подключен по i2c):
1
|
# vim /home/pi/si7021_temp.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#project: paradox-sec.ru
import smbus
import time
bus = smbus.SMBus(1)
bus.write_byte(0x40, 0xF5)
time.sleep(0.3)
data0 = bus.read_byte(0x40)
data1 = bus.read_byte(0x40)
humidity = ((data0 * 256 + data1) * 125 / 65536.0) — 6
time.sleep(0.3)
bus.write_byte(0x40, 0xF3)
time.sleep(0.3)
data0 = bus.read_byte(0x40)
data1 = bus.read_byte(0x40)
cTemp = ((data0 * 256 + data1) * 175.72 / 65536.0) — 46.85
print «%.2f» %cTemp
|
1
|
# chmod +x /home/pi/si7021_temp.py
|
Считывание влажности из Si7021 (подключен по i2c):
1
|
# vim /home/pi/si7021_hum.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#project: paradox-sec.ru
import smbus
import time
bus = smbus.SMBus(1)
bus.write_byte(0x40, 0xF5)
time.sleep(0.3)
data0 = bus.read_byte(0x40)
data1 = bus.read_byte(0x40)
humidity = ((data0 * 256 + data1) * 125 / 65536.0) — 6
time.sleep(0.3)
bus.write_byte(0x40, 0xF3)
time.sleep(0.3)
data0 = bus.read_byte(0x40)
data1 = bus.read_byte(0x40)
cTemp = ((data0 * 256 + data1) * 175.72 / 65536.0) — 46.85
print «%.2f» %humidity
|
1
|
# chmod +x /home/pi/si7021_hum.py
|
Считывание состояния из датчика наличия воды (капель). Подключен на 17pin BCM:
1
|
# vim /home/pi/water_read.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#project: paradox-sec.ru
import RPi.GPIO as io
pin=17 #сигнальный провод от датчика воды идет на 17 pin
io.setmode(io.BCM)
io.setup(pin, io.IN) #устанавливаем пин на вход
signal = io.input(pin) #считываем сигнал
if signal == 1:
print «датчик сухой»
elif signal == 0:
print «на датчике обнаружена вода!»
else:
print «ошибка!»
|
1
|
# chmod +x /home/pi/water_read.py
|
Итак, скрипты по управлению реле и считывания данных с датчиков написаны и протестированы, можно потестировать бота. Запускаем. Кликаем. Переписываем chat_allow на разрешенные, например, вписываем в первый свой, во второй — id телеграма жены, нет жены, тогда брата, нет брата, тогда кота 🙂
Протестировали? Красиво? Но это же не все. Во-первых, где программы сигнализаций движения/воды/температуры? Во-вторых, а как мне отправлять сообщения из системы в моего бота? Не тогда, когда я нажимаю на кнопку и получаю callback, а если что-то случится, чтобы бот сам прислал?
Спокойствие, только спокойствие! Сейчас все сделаем.
Оправка сообщений из linux в Telegram
Щас прогреем и поедем (с) Сейчас все напишем.
Для реализации такого сценария нам понадобится все тот же telepot и знание вашего персонального id и токена от бота. Ниже напишу пример скрипта, который будет отправлять входящую переменную $1 через вашего бота на ваш id. Этот сценарий можно будет вызывать любой вашей программой или приложением из системы. Согласитесь, весьма удобно. И где эти вайберы / вотсапы / скайпы ? Правильно, api — отличная вещь.
Ну, за дело:
1
|
# vim /home/pi/telegram_sender.py
|
1
2
3
4
5
6
7
8
9
10
11
|
#!/usr/bin/env python3.5
# -*- coding: utf-8 -*-
#project: paradox-sec.ru
import sys
import telepot
text = sys.argv[1]
chat_id = 123456789 #замените на свой id
TOKEN = «ВСТАВЬТЕ ВАШ ТОКЕН»
bot = telepot.Bot(TOKEN)
bot.sendMessage(chat_id, str(text))
|
Делаем скрипт исполняемым и работающим только под root:
1
|
# chmod 700 /home/pi/telegram_sender.py
|
Проверяем, как это работает. Запускаем сценарий и на вход подаем произвольную строку в кавычках:
1
|
# /home/pi/telegram_sender.py ‘Привет, хозяин! Готов отсылать все, что ты скажешь :)’
|
Радуемся! Дело остается за малым: нужно написать небольшие сигнализации, которые бы отсылали в бота информацию о происшествиях, если таковые случились. Нам потребуется написать циклы, которые будут крутиться всегда и периодически проверять, как дела на наших датчиках.
Пишем свои маленькие «сигнализации»
Общий принцип следующий: вращаем бесконечный цикл, отвечающий за датчик, который будет периодически снимать показания, и если программа обнаружит, что показания проблемные, будет отправлять нам сообщения, скажем, раз в 10 минут (на самом деле лучше так не делать, ибо очень назойливо получать аварийные сообщения, а написать так, чтобы отсылка происходила один раз и потом только тогда, когда состояние снова станет нормальным. Но статья не об этом. Поэтому пример максимально упрощен.
1
|
# vim /home/pi/multialert.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#project: paradox-sec.ru
import time,os
import subprocess
from subprocess import Popen, PIPE
import threading
from threading import Thread
t_s = 3 #штатный интервал ожидания
t_s2 = 30 #аварийный интервал ожидания воды
t_s3 = 30 #повторный интервал ожидания движения
t_s4 = 30 #интервал ожидания после того как температура упала ниже критической
# «Айдишники» сигнализаций
# Сигнализация будет включена или выключена в зависимости от наличия айдишника
water_id = ‘/home/pi/alert_state/w_on’
motion_id = ‘/home/pi/alert_state/m_on’
temper_id = ‘/home/pi/alert_state/t_on’
# Далее каждую сигнализацию вращаем в своем классе через трединг.
# Сделано зе тем, чтобы каждый цикл имел свой отдельный процесс (тред)
# со своим таймаутом после срабатывания сигнализации скоростью работы и т.д.
#Класс ВОДЫ
class water_check(threading.Thread):
def run ( self ):
while True:
#ели сигнализация сейчас включена (через телеграм)
if os.path.exists(water_id):
proc = Popen([»’/home/pi/water_read.py»’], shell=True, stdout=PIPE, stderr=PIPE)
proc.wait()
w = proc.communicate()[0]
w = w.decode(encoding=‘utf-8’)
w = w.replace(‘n’,»)
if w == u‘на датчике обнаружена вода!’:
print w
text = «Хозяин! Мы тонем! На датчике вода! Проверь скоре»
subprocess.call([»’/home/pi/telegram_sender.py «%s»»’ %text], shell=True)
time.sleep(t_s2)
elif w == u‘датчик сухой’:
time.sleep(t_s)
else:
print(«ERR»)
time.sleep(t_s)
else:
#если сигналка воды сейчас неактивна
print(‘сигналка воды выключена’)
time.sleep(t_s)
water_check().start()
#Класс ДВИЖЕНИЯ
class motion_check(threading.Thread):
def run ( self ):
while True:
#ели сигнализация движения сейчас включена (через телеграм)
if os.path.exists(motion_id):
proc = Popen([»’/home/pi/motion_read.py»’], shell=True, stdout=PIPE, stderr=PIPE)
proc.wait()
m = proc.communicate()[0]
m = m.decode(encoding=‘utf-8’)
m = m.replace(‘n’,»)
if m == u‘обнаружено движение’:
print m
text = «Хозяин! Кто-то тут ходит! Это ты? Мне страшно..»
subprocess.call([»’/home/pi/telegram_sender.py «%s»»’ %text], shell=True)
time.sleep(t_s3)
else:
print(«ERR»)
time.sleep(t_s)
else:
#если сигналка движения сейчас неактивна
print(‘сигналка движения выключена’)
time.sleep(t_s)
motion_check().start()
#обьявим необходимые ф-ции для температуры:
#считывание минимального градуса
# файл должен быть создан и в нем должно быть целое значение
def criticar_read():
inf = Popen(»’cat /home/pi/alert_state/critical_temp»’, shell=True, stdout = PIPE)
inf.wait()
out = inf.communicate()
t = out[0].replace(«n»,«»)
t = int(t)
return t
# получаем актуальную температуру
# cтрого говоря так делать не надо :)
# Потому что вы обращаетесь к одному девайсу из разных
# мест и шина i2c может быть занят другим запросом.
# в дальнейшем стоит позаботиться о текущей температуре и
# запрашивать ее только из единого места. Сейчас это может делать бот телеграми и этот скрипт
# поэтому есть вероятность того что при одновременном запросе вы получите отсутствие данных
# тем не менее можно сделать минимальную фильтрацию на входе и это частично поможет.
#project: paradox-sec.ru
def temper_inf():
txt = «/home/pi/si7021_temp.py»
exe = Popen(«%s» % txt, shell=True, stdout = PIPE)
exe.wait()
inf = exe.communicate()
inf = inf[0].replace(«n»,«»)
if inf == »:
print(‘нету результата текущей температуры’)
none = 1000
return none
elif inf == ‘None’:
none = 1000
print(‘нету результата текущей температуры’)
return none
else:
t = round(float(inf))
t = int(t)
return t
#Класс ТЕМПЕРАТУРЫ
class temper_check(threading.Thread):
def run ( self ):
while True:
#если сигнализация включена
if os.path.exists(temper_id):
#считываем занчения T и лимита срабатывания
t_now = temper_inf()
t_critical = criticar_read()
#если датчик не ответил (лучше всегда ставить проверки такого рода)
if t_now == 1000:
time.sleep(t_s)
#если считанная температура ниже минимальной
elif t_now < t_critical:
info = «Текущая температура: %s C» %t_now
print(info)
text = «Хозяин! Температура ниже критической! Я замерзаю… Текущая температура: %s C» %t_now
subprocess.call([»’/home/pi/telegram_sender.py «%s»»’ %text], shell=True)
time.sleep(t_s4)
else:
#если считанная температура в норме
time.sleep(t_s)
else:
#если сигналка выключена
time.sleep(t_s)
temper_check().start()
|
Не забываем сделать сценарий исполняемым:
1
|
# chmod +x /home/pi/multialert.py
|
Последний скрипт, который мы не указали и который должен присутствовать, — это скрипт по контролю объемника.
Мы воспользуемся прерываниями библиотеки gpio и будем ждать, когда произойдет движение, и только тогда скажем об этом нашему треду из multialert.py:
1
|
# vim /home/pi/motion_read.py
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#project: paradox-sec.ru
import subprocess
import RPi.GPIO as GPIO
def RCtime (RCpin):
GPIO.setmode(GPIO.BCM)
GPIO.setup(RCpin, GPIO.IN)
# GPIO.wait_for_edge(RCpin,GPIO.FALLING) #можете поиграть с этими строками, оставив 1 нужную
GPIO.wait_for_edge(RCpin,GPIO.RISING) #можете поиграть с этими строками, оставив 1 нужную
# GPIO.wait_for_edge(RCpin,GPIO.BOTH) #можете поиграть с этими строками, оставив 1 нужную
signal = GPIO.input(RCpin)
print «обнаружено движение»
RCtime(16)
|
Добавляем бит исполнения:
1
|
# chmod +x /home/pi/motion_read.py
|
Обьемник подключен на 16 пин. При запуске этого сценария вы получите ответ, что движение есть только тогда, когда ваш обьемник действительно сработает. После этого программа завершится.
Добавление бота Telegram и остальных сценариев в автозагрузку
Знающие люди, конечно, могут написать своих демонов (шутка). Но если вы хотите, чтобы бот и сигнализации всего лишь постоянно работали после загрузки системы, можно добавить все в /etc/rc.local:
1
|
# vim /etc/rc.local
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
#!/bin/sh -e
#
# rc.local
#
# This script is executed at the end of each multiuser runlevel.
# Make sure that the script will «exit 0» on success or any other
# value on error.
#
# In order to enable or disable this script just change the execution
# bits.
#
# By default this script does nothing.
# Print the IP address
_IP=$(hostname -I) || true
if [ «$_IP» ]; then
printf «My IP address is %sn» «$_IP»
fi
#бот телеграма
#paradox-sec.ru
/home/pi/bot.py &
#сигнализации
#paradox-sec.ru
/home/pi/multialert.py &
exit 0
|
Итог
В статье предоставлен весьма подробный туториал, как сделать своего бота, работающего с различными устройствами. Причем выглядит это весьма неплохо, все кликабельно и претендует на жизнь.
Дополнительно описаны процессы создания сигнализаций и простых отсылок сообщений из системы.
Все в совокупности может тянуть на небольшой проект, НО не стоит радоваться раньше времени! Перед тем как пустить это в дело в автономное плавание, программно стоит позаботиться о корректности всех входящих данных, протестировать: а что будет, если у вас нет интернета, а если датчик перегорит , если провод сожрет кот, если… если, если.
Можно считать этот материал лабораторным, но весьма близким к реальному.
Всем упорства в работе, отсутствия коротких замыканий и да прибудет с вами сила 🙂
Подписывайтесь на блог, обещаю потихоньку выкладывать такие же функциональные и полезные статьи.
умный дом это