Add deduplication with hashes. Fixes #5
This commit is contained in:
+64
-3
@@ -13,9 +13,9 @@
|
||||
//
|
||||
// You should have received a copy of the GNU General Public License
|
||||
// along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
const md5 = require("md5")
|
||||
const TelegramPeer = require("./telegram-peer")
|
||||
const formatter = require("./formatter")
|
||||
const chalk = require("chalk")
|
||||
|
||||
/**
|
||||
* Portal represents a portal from a Matrix room to a Telegram chat.
|
||||
@@ -28,6 +28,10 @@ class Portal {
|
||||
this.roomID = roomID
|
||||
this.peer = peer
|
||||
this.accessHashes = new Map()
|
||||
// deduplicate duplication caused by telegram-mtproto bugs
|
||||
this.lastMessageIDs = new Map()
|
||||
// deduplicate duplication caused by multiple users
|
||||
this.messageHashes = []
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -230,6 +234,14 @@ class Portal {
|
||||
await this.createMatrixRoom(evt.source, { invite: [evt.source.matrixUser.userID] })
|
||||
return
|
||||
}
|
||||
if (evt.id) {
|
||||
const last = this.lastMessageIDs.get(evt.source.userID)
|
||||
if (last && evt.id <= last) {
|
||||
this.app.debug(`Received old/duplicate message with ID ${evt.id} (latest ID: ${last})`)
|
||||
return
|
||||
}
|
||||
}
|
||||
this.lastMessageIDs.set(evt.source.userID, evt.id)
|
||||
let matrixUser, telegramUser
|
||||
switch (evt.action._) {
|
||||
case "messageActionChatCreate":
|
||||
@@ -330,6 +342,33 @@ class Portal {
|
||||
return false
|
||||
}
|
||||
|
||||
hash(evt) {
|
||||
// Including the timestamp in the hash prevents reliably deduplicating multiuser-caused duplicates of
|
||||
// Matrix-sent messages.
|
||||
let base = (evt.text || evt.caption) + /*evt.date +*/ evt.from + evt.fwdFrom + evt.to.id
|
||||
if (evt.geo) {
|
||||
base += evt.geo.lat
|
||||
base += evt.geo.long
|
||||
} else if (evt.document) {
|
||||
base += evt.document.id
|
||||
} else if (evt.photo) {
|
||||
base += evt.photo.id
|
||||
}
|
||||
return md5(base)
|
||||
}
|
||||
|
||||
deduplicate(evt) {
|
||||
const hashed = this.hash(evt)
|
||||
if (this.messageHashes.includes(hashed)) {
|
||||
return true
|
||||
}
|
||||
this.messageHashes.unshift(hashed)
|
||||
if (this.messageHashes.length > 20) {
|
||||
this.messageHashes.length = 20
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle a Telegram service message event.
|
||||
*
|
||||
@@ -347,8 +386,6 @@ class Portal {
|
||||
* @param {messageMediaGeo} [evt.geo] The Telegram {@link https://tjhorner.com/tl-schema/constructor/messageMediaGeo Location} attached to the message.
|
||||
*/
|
||||
async handleTelegramMessage(evt) {
|
||||
const a = Object.assign({}, evt)
|
||||
delete a.source
|
||||
if (!this.isMatrixRoomCreated()) {
|
||||
try {
|
||||
const result = await this.createMatrixRoom(evt.source, { invite: [evt.source.matrixUser.userID] })
|
||||
@@ -362,6 +399,10 @@ class Portal {
|
||||
}
|
||||
}
|
||||
|
||||
if (this.deduplicate(evt)) {
|
||||
return
|
||||
}
|
||||
|
||||
const sender = await this.app.getTelegramUser(evt.from)
|
||||
try {
|
||||
await sender.intent.sendTyping(this.roomID, false)
|
||||
@@ -424,6 +465,16 @@ class Portal {
|
||||
evt.content.formatted_body || evt.content.body,
|
||||
evt.content.format === "org.matrix.custom.html",
|
||||
this.app)
|
||||
// evt.text || evt.caption) + evt.date + evt.from + evt.fwdFrom + evt.to.id)
|
||||
this.deduplicate({
|
||||
text: message,
|
||||
date: Math.round(Date.now() / 1000),
|
||||
from: sender.telegramPuppet.userID,
|
||||
fwdFrom: 0,
|
||||
to: {
|
||||
id: this.peer.id,
|
||||
},
|
||||
})
|
||||
await sender.telegramPuppet.sendMessage(this.peer, message, entities)
|
||||
break
|
||||
case "m.video":
|
||||
@@ -440,6 +491,16 @@ class Portal {
|
||||
break
|
||||
case "m.location":
|
||||
const [, lat, long] = /geo:([-]?[0-9]+\.[0-9]+)+,([-]?[0-9]+\.[0-9]+)/.exec()
|
||||
this.deduplicate({
|
||||
text: message,
|
||||
date: Math.round(Date.now() / 1000),
|
||||
from: sender.telegramPuppet.userID,
|
||||
fwdFrom: 0,
|
||||
to: {
|
||||
id: this.peer.id,
|
||||
},
|
||||
geo: {lat, long },
|
||||
})
|
||||
await sender.telegramPuppet.sendMedia(this.peer, {
|
||||
_: "inputMediaGeoPoint",
|
||||
geo_point: {
|
||||
|
||||
@@ -351,13 +351,6 @@ class TelegramPuppet {
|
||||
if (update._ === "messageService" && update.action._ === "messageActionChannelMigrateFrom") {
|
||||
return
|
||||
}
|
||||
if (update.id) {
|
||||
if (update.id <= this.lastID) {
|
||||
this.app.debug(`Received old/duplicate message with ID ${update.id} (latest ID: ${this.lastID})`)
|
||||
return
|
||||
}
|
||||
this.lastID = update.id
|
||||
}
|
||||
|
||||
portal = await this.app.getPortalByPeer(to)
|
||||
if (update._ === "messageService") {
|
||||
@@ -372,6 +365,8 @@ class TelegramPuppet {
|
||||
await portal.handleTelegramMessage({
|
||||
from,
|
||||
to,
|
||||
id: update.id,
|
||||
date: update.date,
|
||||
fwdFrom: update.fwd_from ? update.fwd_from.from_id : 0,
|
||||
source: this,
|
||||
text: update.message,
|
||||
@@ -391,6 +386,8 @@ class TelegramPuppet {
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
|
||||
async handleUpdate(data) {
|
||||
if (!data.update || data.update._ !== "updateUserStatus") {
|
||||
this.app.debug("green", "Raw event for", this.userID, JSON.stringify(data, "", " "))
|
||||
@@ -427,6 +424,10 @@ class TelegramPuppet {
|
||||
date: this.date,
|
||||
qts: -1,
|
||||
})
|
||||
if (dat._ === "updates.differenceEmpty") {
|
||||
this.date = dat.date
|
||||
break
|
||||
}
|
||||
this.app.debug("magenta", `updates.getDifference: ${JSON.stringify(dat, "", " ")}`)
|
||||
// TODO use dat.users and dat.chats
|
||||
this.pts = dat.state.pts
|
||||
|
||||
Reference in New Issue
Block a user