all: add support for topics and refactor other things

This commit is contained in:
Tulir Asokan
2025-12-06 14:52:39 +02:00
parent 14b3b1fed7
commit d5f87d2ec1
26 changed files with 797 additions and 390 deletions
+44 -29
View File
@@ -29,6 +29,7 @@ import (
"maunium.net/go/mautrix/bridgev2/networkid"
"go.mau.fi/mautrix-telegram/pkg/connector/ids"
"go.mau.fi/mautrix-telegram/pkg/gotd/bin"
"go.mau.fi/mautrix-telegram/pkg/gotd/tg"
"go.mau.fi/mautrix-telegram/pkg/gotd/tgerr"
)
@@ -106,7 +107,7 @@ func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) er
}
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor != "" {
var err error
req.OffsetPeer, err = t.inputPeerForPortalID(ctx, t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor)
req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor)
if err != nil {
return fmt.Errorf("failed to get input peer for pagination: %w", err)
}
@@ -150,7 +151,7 @@ func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) er
return fmt.Errorf("failed to handle dialogs: %w", err)
}
portalKey := t.makePortalKeyFromPeer(dialogs.GetDialogs()[len(dialogs.GetDialogs())-1].GetPeer())
portalKey := t.makePortalKeyFromPeer(dialogs.GetDialogs()[len(dialogs.GetDialogs())-1].GetPeer(), 0)
if t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlCursor == portalKey.ID {
t.userLogin.Metadata.(*UserLoginMetadata).TakeoutDialogCrawlDone = true
@@ -164,7 +165,7 @@ func (t *TelegramClient) takeoutDialogs(ctx context.Context, takeoutID int64) er
return fmt.Errorf("failed to save user login: %w", err)
}
req.OffsetPeer, err = t.inputPeerForPortalID(ctx, portalKey.ID)
req.OffsetPeer, _, err = t.inputPeerForPortalID(ctx, portalKey.ID)
if err != nil {
return fmt.Errorf("failed to get input peer for pagination: %w", err)
}
@@ -206,43 +207,57 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
}()
}
peer, err := t.inputPeerForPortalID(ctx, fetchParams.Portal.ID)
peer, topicID, err := t.inputPeerForPortalID(ctx, fetchParams.Portal.ID)
if err != nil {
return nil, err
}
req := tg.MessagesGetHistoryRequest{
Peer: peer,
Limit: fetchParams.Count,
}
var minID, offsetID int
if fetchParams.AnchorMessage != nil {
if fetchParams.Forward {
_, req.MinID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID)
_, minID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID)
} else {
_, req.OffsetID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID)
_, offsetID, err = ids.ParseMessageID(fetchParams.AnchorMessage.ID)
}
if err != nil {
return nil, err
}
}
if fetchParams.Portal.Metadata.(*PortalMetadata).IsForumGeneral {
topicID = 1
}
var req bin.Object
if topicID == ids.TopicIDSpaceRoom {
return nil, nil
} else if topicID > 0 {
req = &tg.MessagesGetRepliesRequest{
Peer: peer,
MsgID: topicID,
Limit: fetchParams.Count,
MinID: minID,
OffsetID: offsetID,
}
} else {
req = &tg.MessagesGetHistoryRequest{
Peer: peer,
Limit: fetchParams.Count,
MinID: minID,
OffsetID: offsetID,
}
}
if !fetchParams.Forward {
req = &tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: req}
}
log.Info().Any("req", req).Msg("Fetching messages")
msgs, err := APICallWithUpdates(ctx, t, func() (tg.ModifiedMessagesMessages, error) {
var rawMsgs tg.MessagesMessagesClass
if fetchParams.Forward {
rawMsgs, err = t.client.API().MessagesGetHistory(ctx, &req)
} else {
var messages tg.MessagesMessagesBox
err = t.client.Invoke(ctx,
&tg.InvokeWithTakeoutRequest{TakeoutID: takeoutID, Query: &req},
&messages)
rawMsgs = messages.Messages
}
var box tg.MessagesMessagesBox
err = t.client.Invoke(ctx, req, &box)
if err != nil {
return nil, err
}
msgs, ok := rawMsgs.(tg.ModifiedMessagesMessages)
msgs, ok := box.Messages.(tg.ModifiedMessagesMessages)
if !ok {
return nil, fmt.Errorf("unsupported messages type %T", rawMsgs)
return nil, fmt.Errorf("unsupported messages type %T", box.Messages)
}
return msgs, nil
})
@@ -251,11 +266,7 @@ func (t *TelegramClient) FetchMessages(ctx context.Context, fetchParams bridgev2
}
messages := msgs.GetMessages()
portal, err := t.main.Bridge.GetPortalByKey(ctx, fetchParams.Portal.PortalKey)
if err != nil {
return nil, err
}
portal := fetchParams.Portal
// If the first message is the last read message, mark the chat as read
// during backfill.
@@ -363,7 +374,7 @@ func (c *TelegramClient) GetBackfillMaxBatchCount(ctx context.Context, portal *b
log := zerolog.Ctx(ctx).With().
Str("method", "GetBackfillMaxBatchCount").
Logger()
peerType, _, err := ids.ParsePortalID(portal.ID)
peerType, _, topicID, err := ids.ParsePortalID(portal.ID)
if err != nil {
log.Err(err).Msg("failed to parse portal ID")
return 0
@@ -374,7 +385,11 @@ func (c *TelegramClient) GetBackfillMaxBatchCount(ctx context.Context, portal *b
case ids.PeerTypeChat:
return c.main.Bridge.Config.Backfill.Queue.GetOverride("normal_group")
case ids.PeerTypeChannel:
if portal.Metadata.(*PortalMetadata).IsSuperGroup {
if topicID == ids.TopicIDSpaceRoom {
return 0
} else if topicID > 0 {
return c.main.Bridge.Config.Backfill.Queue.GetOverride("topic", "supergroup")
} else if portal.Metadata.(*PortalMetadata).IsSuperGroup {
return c.main.Bridge.Config.Backfill.Queue.GetOverride("supergroup")
} else {
return c.main.Bridge.Config.Backfill.Queue.GetOverride("channel")