read receipts: bridges TG <-> Matrix

Signed-off-by: Sumner Evans <sumner.evans@automattic.com>
This commit is contained in:
Sumner Evans
2024-08-07 12:38:01 -06:00
parent 838f291220
commit a86c2c2544
4 changed files with 131 additions and 20 deletions
+3 -17
View File
@@ -38,13 +38,7 @@ func (t *TelegramClient) getDMChatInfo(ctx context.Context, userID int64) (*brid
}, },
UserInfo: userInfo, UserInfo: userInfo,
}, },
{ {EventSender: t.mySender()},
EventSender: bridgev2.EventSender{
IsFromMe: true,
SenderLogin: t.loginID,
Sender: t.userID,
},
},
} }
} }
return &chatInfo, nil return &chatInfo, nil
@@ -58,16 +52,8 @@ func (t *TelegramClient) getGroupChatInfo(ctx context.Context, fullChat *tg.Mess
chatInfo := bridgev2.ChatInfo{ chatInfo := bridgev2.ChatInfo{
Type: ptr.Ptr(database.RoomTypeGroupDM), // TODO Is this correct for channels? Type: ptr.Ptr(database.RoomTypeGroupDM), // TODO Is this correct for channels?
Members: &bridgev2.ChatMemberList{ Members: &bridgev2.ChatMemberList{
IsFull: true, IsFull: true,
Members: []bridgev2.ChatMember{ Members: []bridgev2.ChatMember{{EventSender: t.mySender()}},
{
EventSender: bridgev2.EventSender{
IsFromMe: true,
SenderLogin: t.loginID,
Sender: t.userID,
},
},
},
}, },
} }
var isBroadcastChannel bool var isBroadcastChannel bool
+17
View File
@@ -142,6 +142,15 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
dispatcher.OnChannelUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateChannelUserTyping) error { dispatcher.OnChannelUserTyping(func(ctx context.Context, e tg.Entities, update *tg.UpdateChannelUserTyping) error {
return client.handleTyping(ids.PeerTypeChannel.AsPortalKey(update.ChannelID, ""), update.FromID.(*tg.PeerUser).UserID, update.Action) return client.handleTyping(ids.PeerTypeChannel.AsPortalKey(update.ChannelID, ""), update.FromID.(*tg.PeerUser).UserID, update.Action)
}) })
dispatcher.OnReadHistoryOutbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadHistoryOutbox) error {
return client.updateReadReceipt(update)
})
dispatcher.OnReadHistoryInbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadHistoryInbox) error {
return client.onOwnReadReceipt(ids.MakePortalKey(update.Peer, login.ID), update.MaxID)
})
dispatcher.OnReadChannelInbox(func(ctx context.Context, e tg.Entities, update *tg.UpdateReadChannelInbox) error {
return client.onOwnReadReceipt(ids.PeerTypeChannel.AsPortalKey(update.ChannelID, ""), update.MaxID)
})
client.ScopedStore = tc.Store.GetScopedStore(telegramUserID) client.ScopedStore = tc.Store.GetScopedStore(telegramUserID)
@@ -448,3 +457,11 @@ func (t *TelegramClient) GetCapabilities(ctx context.Context, portal *bridgev2.P
Reactions: true, Reactions: true,
} }
} }
func (t *TelegramClient) mySender() bridgev2.EventSender {
return bridgev2.EventSender{
IsFromMe: true,
SenderLogin: t.loginID,
Sender: t.userID,
}
}
+78 -2
View File
@@ -3,10 +3,12 @@ package connector
import ( import (
"context" "context"
"crypto/sha256" "crypto/sha256"
"errors"
"fmt" "fmt"
"math/rand" "math/rand"
"strconv" "strconv"
"strings" "strings"
"sync"
"time" "time"
"github.com/gotd/td/telegram/message" "github.com/gotd/td/telegram/message"
@@ -396,8 +398,82 @@ func (t *TelegramClient) HandleMatrixReactionRemove(ctx context.Context, msg *br
} }
func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridgev2.MatrixReadReceipt) error { func (t *TelegramClient) HandleMatrixReadReceipt(ctx context.Context, msg *bridgev2.MatrixReadReceipt) error {
// TODO peerType, id, parseErr := ids.ParsePortalID(msg.Portal.ID)
return nil if parseErr != nil {
return parseErr
}
inputPeer, parseErr := t.inputPeerForPortalID(ctx, msg.Portal.ID)
if parseErr != nil {
return parseErr
}
var readMentionsErr, readReactionsErr, readMessagesErr error
var wg sync.WaitGroup
// Read mentions
wg.Add(1)
go func() {
defer wg.Done()
_, readMentionsErr = t.client.API().MessagesReadMentions(ctx, &tg.MessagesReadMentionsRequest{
Peer: inputPeer,
})
}()
// Read reactions
wg.Add(1)
go func() {
defer wg.Done()
_, readMentionsErr = t.client.API().MessagesReadReactions(ctx, &tg.MessagesReadReactionsRequest{
Peer: inputPeer,
})
}()
// Read messages
wg.Add(1)
go func() {
defer wg.Done()
message := msg.ExactMessage
if message == nil {
message, readMessagesErr = t.main.Bridge.DB.Message.GetLastPartAtOrBeforeTime(ctx, msg.Portal.PortalKey, time.Now())
if readMessagesErr != nil {
return
}
}
var maxID int
maxID, readMessagesErr = ids.ParseMessageID(message.ID)
if readMessagesErr != nil {
return
}
switch peerType {
case ids.PeerTypeUser, ids.PeerTypeChat:
_, readMessagesErr = t.client.API().MessagesReadHistory(ctx, &tg.MessagesReadHistoryRequest{
Peer: inputPeer,
MaxID: maxID,
})
case ids.PeerTypeChannel:
var accessHash int64
var found bool
accessHash, found, readMessagesErr = t.ScopedStore.GetChannelAccessHash(ctx, t.telegramUserID, id)
if readMessagesErr != nil {
return
} else if !found {
readMessagesErr = fmt.Errorf("channel access hash not found for %d", id)
return
}
_, readMessagesErr = t.client.API().ChannelsReadHistory(ctx, &tg.ChannelsReadHistoryRequest{
Channel: &tg.InputChannel{ChannelID: id, AccessHash: accessHash},
})
default:
readMessagesErr = fmt.Errorf("unknown peer type %s", peerType)
}
}()
// TODO handle sponsored message read receipts
wg.Wait()
return errors.Join(readMentionsErr, readReactionsErr, readMessagesErr)
} }
func (t *TelegramClient) HandleMatrixTyping(ctx context.Context, msg *bridgev2.MatrixTyping) error { func (t *TelegramClient) HandleMatrixTyping(ctx context.Context, msg *bridgev2.MatrixTyping) error {
+33 -1
View File
@@ -174,7 +174,7 @@ func (t *TelegramClient) getEventSender(msg interface {
GetPeerID() tg.PeerClass GetPeerID() tg.PeerClass
}) bridgev2.EventSender { }) bridgev2.EventSender {
if msg.GetOut() { if msg.GetOut() {
return bridgev2.EventSender{IsFromMe: true, SenderLogin: t.loginID, Sender: t.userID} return t.mySender()
} else if f, ok := msg.GetFromID(); ok && f.TypeID() == tg.PeerUserTypeID { } else if f, ok := msg.GetFromID(); ok && f.TypeID() == tg.PeerUserTypeID {
from := f.(*tg.PeerUser) from := f.(*tg.PeerUser)
return bridgev2.EventSender{ return bridgev2.EventSender{
@@ -305,6 +305,7 @@ func (t *TelegramClient) onMessageEdit(ctx context.Context, update IGetMessage)
return nil return nil
} }
func (t *TelegramClient) handleTyping(portal networkid.PortalKey, userID int64, action tg.SendMessageActionClass) error { func (t *TelegramClient) handleTyping(portal networkid.PortalKey, userID int64, action tg.SendMessageActionClass) error {
if userID == t.telegramUserID { if userID == t.telegramUserID {
return nil return nil
@@ -327,6 +328,37 @@ func (t *TelegramClient) handleTyping(portal networkid.PortalKey, userID int64,
return nil return nil
} }
func (t *TelegramClient) updateReadReceipt(update *tg.UpdateReadHistoryOutbox) error {
user, ok := update.Peer.(*tg.PeerUser)
if !ok {
return fmt.Errorf("unsupported peer type %T", update.Peer)
}
t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventReadReceipt,
PortalKey: ids.MakePortalKey(update.Peer, t.loginID),
Sender: bridgev2.EventSender{
SenderLogin: ids.MakeUserLoginID(user.UserID),
Sender: ids.MakeUserID(user.UserID),
},
},
LastTarget: ids.MakeMessageID(update.MaxID),
})
return nil
}
func (t *TelegramClient) onOwnReadReceipt(portalKey networkid.PortalKey, maxID int) error {
t.main.Bridge.QueueRemoteEvent(t.userLogin, &simplevent.Receipt{
EventMeta: simplevent.EventMeta{
Type: bridgev2.RemoteEventReadReceipt,
PortalKey: portalKey,
Sender: t.mySender(),
},
LastTarget: ids.MakeMessageID(maxID),
})
return nil
}
func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Message) { func (t *TelegramClient) handleTelegramReactions(ctx context.Context, msg *tg.Message) {
log := zerolog.Ctx(ctx).With(). log := zerolog.Ctx(ctx).With().
Str("handler", "handle_telegram_reactions"). Str("handler", "handle_telegram_reactions").