1403 字
7 分钟
Telegram 部署私聊机器人

前言#

一个免费的通过 Cloudflare 配置 Worker 来搭建私聊机器人的方式
项目地址

部署参考

1.Telegram 创建机器人, 获取 Token#

@BotFather 处创建机器人获取到 Token, 并通过 /setjoingroups 来禁止此 Bot 被添加到群组

2.获取一个随机的 UUID#

UUID生成网站 随机获取一个 UUID

3.获取你的 Telegram 用户 id#

@username_to_id_bot 处获取到你的 用户 id

4.登录到 Cloudflare, 创建 Workder#

5.添加 Worker 变量#

ENV_BOT_TOKEN: 值为 步骤1 中获得的 Token
ENV_BOT_SECRET: 值为 步骤2 中获得的 UUID
ENV_ADMIN_UID: 值为 步骤3 中获得的 用户 id

6.创建 kv 数据库, 命名为 nfd, 并绑定到 Worker 上#

7.重新编辑 Worker 代码#

代码参考

const TOKEN = ENV_BOT_TOKEN // Get it from @BotFather
const WEBHOOK = '/endpoint'
const SECRET = ENV_BOT_SECRET // A-Z, a-z, 0-9, _ and -
const ADMIN_UID = ENV_ADMIN_UID // your user id, get it from https://t.me/username_to_id_bot

const NOTIFY_INTERVAL = 3600 * 1000;
const fraudDb = 'https://raw.githubusercontent.com/LloydAsp/nfd/main/data/fraud.db';
const notificationUrl = 'https://raw.githubusercontent.com/Ctory-Nily/tg-chat-bot/main/data/notification.txt'
const startMsgUrl = 'https://raw.githubusercontent.com/Ctory-Nily/tg-chat-bot/main/data/startMessage.md';

const enable_notification = false
/**
 * Return url to telegram api, optionally with parameters added
 */
function apiUrl (methodName, params = null) {
  let query = ''
  if (params) {
    query = '?' + new URLSearchParams(params).toString()
  }
  return `https://api.telegram.org/bot${TOKEN}/${methodName}${query}`
}

function requestTelegram(methodName, body, params = null){
  return fetch(apiUrl(methodName, params), body)
    .then(r => r.json())
}

function makeReqBody(body){
  return {
    method:'POST',
    headers:{
      'content-type':'application/json'
    },
    body:JSON.stringify(body)
  }
}

function sendMessage(msg = {}){
  return requestTelegram('sendMessage', makeReqBody(msg))
}

function copyMessage(msg = {}){
  return requestTelegram('copyMessage', makeReqBody(msg))
}

function forwardMessage(msg){
  return requestTelegram('forwardMessage', makeReqBody(msg))
}

/**
 * Wait for requests to the worker
 */
addEventListener('fetch', event => {
  const url = new URL(event.request.url)
  if (url.pathname === WEBHOOK) {
    event.respondWith(handleWebhook(event))
  } else if (url.pathname === '/registerWebhook') {
    event.respondWith(registerWebhook(event, url, WEBHOOK, SECRET))
  } else if (url.pathname === '/unRegisterWebhook') {
    event.respondWith(unRegisterWebhook(event))
  } else {
    event.respondWith(new Response('No handler for this request'))
  }
})

/**
 * Handle requests to WEBHOOK
 * https://core.telegram.org/bots/api#update
 */
async function handleWebhook (event) {
  // Check secret
  if (event.request.headers.get('X-Telegram-Bot-Api-Secret-Token') !== SECRET) {
    return new Response('Unauthorized', { status: 403 })
  }

  // Read request body synchronously
  const update = await event.request.json()
  // Deal with response asynchronously
  event.waitUntil(onUpdate(update))

  return new Response('Ok')
}

/**
 * Handle incoming Update
 * https://core.telegram.org/bots/api#update
 */
async function onUpdate (update) {
  if ('message' in update) {
    await onMessage(update.message)
  }
}

/**
 * Handle incoming Message
 * https://core.telegram.org/bots/api#message
 */
async function onMessage (message) {
  if(message.text === '/start'){
    let startMsg = await fetch(startMsgUrl).then(r => r.text())
    return sendMessage({
      chat_id:message.chat.id,
      text:`@${message.chat.username.toString()} ` + startMsg,
      parse_mode: "HTML"
    })
  }

  if(message.text === '/nowtime'){
    const now = new Date();
    
    // 北京时间(UTC+8)
    const chinaOffset = 8 * 60 * 60 * 1000; // 8小时的毫秒数
    const chinaTime = new Date(now.getTime() + chinaOffset);
    
    // 提取年、月、日、时、分、秒
    const year = chinaTime.getUTCFullYear();
    const month = String(chinaTime.getUTCMonth() + 1).padStart(2, '0');
    const day = String(chinaTime.getUTCDate()).padStart(2, '0');
    const hours = String(chinaTime.getUTCHours()).padStart(2, '0');
    const minutes = String(chinaTime.getUTCMinutes()).padStart(2, '0');
    const seconds = String(chinaTime.getUTCSeconds()).padStart(2, '0');

    return sendMessage({
      chat_id:message.chat.id,
      text:`${year}年${month}月${day}日 ${hours}:${minutes}:${seconds}`,
    })
  }

  if(message.chat.id.toString() === ADMIN_UID){
    if(!message?.reply_to_message?.chat){
      return sendMessage({
        chat_id:ADMIN_UID,
        text:'使用方法,回复转发的消息,并发送回复消息,或者`/block`、`/unblock`、`/checkblock`等指令'
      })
    }
    if(/^\/block$/.exec(message.text)){
      return handleBlock(message)
    }
    if(/^\/unblock$/.exec(message.text)){
      return handleUnBlock(message)
    }
    if(/^\/checkblock$/.exec(message.text)){
      return checkBlock(message)
    }
    let guestChantId = await nfd.get('msg-map-' + message?.reply_to_message.message_id,
                                      { type: "json" })
    return copyMessage({
      chat_id: guestChantId,
      from_chat_id:message.chat.id,
      message_id:message.message_id,
    })
  }
  return handleGuestMessage(message)
}

async function handleGuestMessage(message){
  let chatId = message.chat.id;
  let isblocked = await nfd.get('isblocked-' + chatId, { type: "json" })
  
  if(isblocked){
    return sendMessage({
      chat_id: chatId,
      text:'Your are blocked'
    })
  }

  let forwardReq = await forwardMessage({
    chat_id:ADMIN_UID,
    from_chat_id:message.chat.id,
    message_id:message.message_id
  })
  console.log(JSON.stringify(forwardReq))
  if(forwardReq.ok){
    await nfd.put('msg-map-' + forwardReq.result.message_id, chatId)
  }
  return handleNotify(message)
}

async function handleNotify(message){
  // 先判断是否是诈骗人员,如果是,则直接提醒
  // 如果不是,则根据时间间隔提醒:用户id,交易注意点等
  let chatId = message.chat.id;
  if(await isFraud(chatId)){
    return sendMessage({
      chat_id: ADMIN_UID,
      text:`检测到骗子,UID${chatId}`
    })
  }
  if(enable_notification){
    let lastMsgTime = await nfd.get('lastmsg-' + chatId, { type: "json" })
    if(!lastMsgTime || Date.now() - lastMsgTime > NOTIFY_INTERVAL){
      await nfd.put('lastmsg-' + chatId, Date.now())
      return sendMessage({
        chat_id: ADMIN_UID,
        text:await fetch(notificationUrl).then(r => r.text())
      })
    }
  }
}

async function handleBlock(message){
  let guestChantId = await nfd.get('msg-map-' + message.reply_to_message.message_id,
                                      { type: "json" })
  if(guestChantId === ADMIN_UID){
    return sendMessage({
      chat_id: ADMIN_UID,
      text:'不能屏蔽自己'
    })
  }
  await nfd.put('isblocked-' + guestChantId, true)

  return sendMessage({
    chat_id: ADMIN_UID,
    text: `UID:${guestChantId}屏蔽成功`,
  })
}

async function handleUnBlock(message){
  let guestChantId = await nfd.get('msg-map-' + message.reply_to_message.message_id,
  { type: "json" })

  await nfd.put('isblocked-' + guestChantId, false)

  return sendMessage({
    chat_id: ADMIN_UID,
    text:`UID:${guestChantId}解除屏蔽成功`,
  })
}

async function checkBlock(message){
  let guestChantId = await nfd.get('msg-map-' + message.reply_to_message.message_id,
  { type: "json" })
  let blocked = await nfd.get('isblocked-' + guestChantId, { type: "json" })

  return sendMessage({
    chat_id: ADMIN_UID,
    text: `UID:${guestChantId}` + (blocked ? '被屏蔽' : '没有被屏蔽')
  })
}

/**
 * Send plain text message
 * https://core.telegram.org/bots/api#sendmessage
 */
async function sendPlainText (chatId, text) {
  return sendMessage({
    chat_id: chatId,
    text
  })
}

/**
 * Set webhook to this worker's url
 * https://core.telegram.org/bots/api#setwebhook
 */
async function registerWebhook (event, requestUrl, suffix, secret) {
  // https://core.telegram.org/bots/api#setwebhook
  const webhookUrl = `${requestUrl.protocol}//${requestUrl.hostname}${suffix}`
  const r = await (await fetch(apiUrl('setWebhook', { url: webhookUrl, secret_token: secret }))).json()
  return new Response('ok' in r && r.ok ? 'Ok' : JSON.stringify(r, null, 2))
}

/**
 * Remove webhook
 * https://core.telegram.org/bots/api#setwebhook
 */
async function unRegisterWebhook (event) {
  const r = await (await fetch(apiUrl('setWebhook', { url: '' }))).json()
  return new Response('ok' in r && r.ok ? 'Ok' : JSON.stringify(r, null, 2))
}

async function isFraud(id){
  id = id.toString()
  let db = await fetch(fraudDb).then(r => r.text())
  let arr = db.split('\n').filter(v => v)
  console.log(JSON.stringify(arr))
  let flag = arr.filter(v => v === id).length !== 0
  console.log(flag)
  return flag
}

8.检测是否可用#

根据你的域名, 在后面添加 /registerWebhook 来进行检测, 返回 OK 则表示没问题

https://xxx.workers.dev/registerWebhook 

9.使用方法#

  • 当其他用户给bot发消息,会被转发到bot创建者
  • 用户回复普通文字给转发的消息时,会回复到原消息发送者
  • 用户回复 /block, /unblock, /checkblock 等命令会执行相关指令,不会回复到原消息发送者

10.欺诈数据源#

  • 文件 fraud.db 为欺诈数据,格式为每行一个uid
  • 可以通过pr扩展本数据,也可以通过提issue方式补充
  • 提供额外欺诈信息时,需要提供一定的消息出处

11.扩展-将 Worker 代码上传到 Github 并在 Cloudflare 上实现自动构建#

① 本地创建项目文件夹并新建 worker.js 文件和 wrangler.toml 文件
② 在 worker.js 文件中写入 Worker 代码
③ 配置 wrangler.toml 指定执行的脚本为 worker.js

# wrangler.toml
name = 'mimi-chat'
main = "worker.js"  # 指定入口文件
compatibility_date = "2025-03-27"

kv_namespaces = [
  { binding = "nfd", id = "你的kv id" }
]

[observability.logs]
enabled = true

④ 在 Github 上新建仓库,然后将本地的项目文件夹同步到 Github 仓库
⑤ 在之前创建的 Cloudflare Worker 项目中, 连接这个 Github 仓库

⑥ 在这之后如果要更改 Worker 代码, 只要在本地修改好代码后, 上传到 Github, 此时 Cloudflare Worker 就会自动同步并构建

Telegram 部署私聊机器人
https://fuwari.vercel.app/posts/部署教程/tgchatbot/telegram-部署私聊机器人/
作者
Ctory-Nily
发布于
2025-03-14
许可协议
CC BY-NC-SA 4.0