개요
해당 실습의 경우 python-telegram-bot
라이브러리의 버전이 20.1
이상이어야 한다.
20.1
이하인 라이브러리를 사용 중인 경우 정상적인 실습 진행이 불가능하다
python-telegram-bot
에 비동기 로직이 추가되면서 20.1 이하 버전에서 해당 실습 코드 사용 시 아래와 같은 에러를 만나게 될 것이다.
coroutine 'Bot.send_message' was never awaited
라이브러리 설치
$ pip install python-telegram-bot
텍스트 전송하기
텍스트를 전송할 때에는 send_message
메소드를 사용하면 된다.
import telegram
import asyncio
TELEGRAM_TOKEN :str = "텔레그램 봇 토큰"
TELEGRAM_CHAT_ID :int = "텔레그램 채팅 ID"
async def main() -> None:
bot = telegram.Bot(token=TELEGRAM_TOKEN)
message :str = "안녕하세요!"
await bot.send_message(chat_id=TELEGRAM_CHAT_ID, text=message)
asyncio.run(main())
$ python3 main.py
이미지 전송하기
이미지를 전송할 때에는 텍스트를 전송했을 때와 비슷하게 send_photo
메소드를 사용하면 된다.
import telegram
import asyncio
TELEGRAM_TOKEN :str = "텔레그램 봇 토큰"
TELEGRAM_CHAT_ID :int = "텔레그램 채팅 ID"
async def main() -> None:
bot = telegram.Bot(token=TELEGRAM_TOKEN)
image_file_name :str = 'test.png'
with open(image_file_name, 'rb') as fp:
await bot.send_photo(chat_id=TELEGRAM_CHAT_ID, photo=fp)
asyncio.run(main())
$ python3 main.py
HTML 파일 전송하기
HTML 파일을 전송할 때에는 sendDocuemnt
메소드를 사용해보자.
import telegram
import asyncio
TELEGRAM_TOKEN :str = "텔레그램 봇 토큰"
TELEGRAM_CHAT_ID :int = "텔레그램 채팅 ID"
async def main() -> None:
bot = telegram.Bot(token=TELEGRAM_TOKEN)
html_file :str = 'test.html'
await bot.sendDocument(chat_id=TELEGRAM_CHAT_ID, document=html_file)
asyncio.run(main())
$ python3 main.py
주기적으로 텍스트 전송하기
특정 메시지를 3초 간격마다 보내려면 아래와 같이 작성한다.
add_job
함수에는 seconds
말고 minutes
, hours
등의 인자로도 변경할 수 있다.
from telegram.ext import ApplicationBuilder
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from telegram import Bot
import asyncio
import pytz
TELEGRAM_TOKEN :str = "텔레그램 봇 토큰"
TELEGRAM_CHAT_ID :int = "텔레그램 채팅 ID"
# 애플리케이션 빌더 생성
application = ApplicationBuilder().token(TELEGRAM_TOKEN).build()
scheduler = AsyncIOScheduler(timezone=pytz.UTC) # 타임존 설정
# 메시지를 보내는 함수
async def send_message(text: str) -> None:
await application.bot.send_message(chat_id=TELEGRAM_CHAT_ID, text=text)
# 봇 초기화 함수
async def initialize() -> None:
await application.initialize()
await application.start()
await application.updater.start_polling()
# 봇 정지 함수
async def stop() -> None:
await application.updater.stop()
# 메인 함수
async def main() -> None:
await initialize()
await send_message(text='안녕하세요!')
# 스케줄러에 작업 추가 (args를 튜플로 수정)
scheduler.add_job(send_message, trigger='interval', seconds=3, args=('안녕하세요!',))
scheduler.start()
try:
await asyncio.Event().wait() # 무한 대기
except (KeyboardInterrupt, SystemExit):
print("프로그램 종료!")
finally:
await application.updater.stop()
# 이벤트 루프 실행
if __name__ == '__main__':
asyncio.run(main())
$ python main.py
실행하면 3초 간격으로 메시지가 전송된다.