client,gotd: refactor connection event handling
This might cause regressions if the onSession handler was load bearing
This commit is contained in:
+48
-29
@@ -17,6 +17,7 @@
|
|||||||
package connector
|
package connector
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"cmp"
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
@@ -269,8 +270,8 @@ func NewTelegramClient(ctx context.Context, tc *TelegramConnector, login *bridge
|
|||||||
Logger: zaplog,
|
Logger: zaplog,
|
||||||
UpdateHandler: client.updatesManager,
|
UpdateHandler: client.updatesManager,
|
||||||
OnDead: client.onDead,
|
OnDead: client.onDead,
|
||||||
OnSession: client.onConnectionStateChange("session"),
|
OnSession: client.onSession,
|
||||||
OnConnected: client.onConnectionStateChange("connected"),
|
OnConnected: client.onConnected,
|
||||||
PingCallback: client.onPing,
|
PingCallback: client.onPing,
|
||||||
OnAuthError: client.onAuthError,
|
OnAuthError: client.onAuthError,
|
||||||
PingTimeout: time.Duration(tc.Config.Ping.TimeoutSeconds) * time.Second,
|
PingTimeout: time.Duration(tc.Config.Ping.TimeoutSeconds) * time.Second,
|
||||||
@@ -426,41 +427,59 @@ func (t *TelegramClient) sendBadCredentialsOrUnknownError(err error) {
|
|||||||
|
|
||||||
func (t *TelegramClient) onPing() {
|
func (t *TelegramClient) onPing() {
|
||||||
if t.userLogin.BridgeState.GetPrev().StateEvent == status.StateConnected {
|
if t.userLogin.BridgeState.GetPrev().StateEvent == status.StateConnected {
|
||||||
t.main.Bridge.Log.Trace().Msg("Got ping, not checking connectivity because we are already connected")
|
return
|
||||||
|
}
|
||||||
|
ctx := t.userLogin.Log.WithContext(t.main.Bridge.BackgroundCtx)
|
||||||
|
t.userLogin.Log.Debug().Msg("Got ping while not connected, checking auth")
|
||||||
|
|
||||||
|
me, err := t.client.Self(ctx)
|
||||||
|
if auth.IsUnauthorized(err) {
|
||||||
|
t.onAuthError(fmt.Errorf("not logged in"))
|
||||||
|
} else if errors.Is(err, syscall.EPIPE) {
|
||||||
|
// This is a pipe error, try disconnecting which will force the
|
||||||
|
// updatesManager to fail and cause the client to reconnect.
|
||||||
|
t.userLogin.BridgeState.Send(status.BridgeState{
|
||||||
|
StateEvent: status.StateTransientDisconnect,
|
||||||
|
Error: "pipe-error",
|
||||||
|
Message: humanise.Error(err),
|
||||||
|
})
|
||||||
|
} else if err != nil {
|
||||||
|
t.sendBadCredentialsOrUnknownError(err)
|
||||||
} else {
|
} else {
|
||||||
t.onConnectionStateChange("ping while not connected")
|
t.onConnected(me)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TelegramClient) onConnectionStateChange(reason string) func() {
|
func userToRemoteProfile(self *tg.User) (profile status.RemoteProfile, name string) {
|
||||||
return func() {
|
profile.Name = util.FormatFullName(self.FirstName, self.LastName, self.Deleted, self.ID)
|
||||||
log := t.main.Bridge.Log.With().
|
profile.Phone = "+" + strings.TrimPrefix(self.Phone, "+")
|
||||||
Str("component", "telegram_client").
|
profile.Username = self.Username
|
||||||
Str("user_login_id", string(t.userLogin.ID)).
|
if self.Username == "" && len(self.Usernames) > 0 {
|
||||||
Str("reason", reason).
|
profile.Username = self.Usernames[0].Username
|
||||||
Logger()
|
}
|
||||||
log.Info().Msg("Connection state changed")
|
name = cmp.Or(profile.Username, profile.Phone, profile.Name)
|
||||||
ctx := log.WithContext(context.Background())
|
return
|
||||||
|
}
|
||||||
|
|
||||||
authStatus, err := t.client.Auth().Status(ctx)
|
func (t *TelegramClient) onConnected(self *tg.User) {
|
||||||
|
// TODO update ghost info?
|
||||||
|
newProfile, newName := userToRemoteProfile(self)
|
||||||
|
// TODO fill avatar from ghost or something?
|
||||||
|
newProfile.Avatar = t.userLogin.RemoteProfile.Avatar
|
||||||
|
newProfile.AvatarFile = t.userLogin.RemoteProfile.AvatarFile
|
||||||
|
if t.userLogin.RemoteProfile != newProfile || t.userLogin.RemoteName != newName {
|
||||||
|
t.userLogin.RemoteProfile = newProfile
|
||||||
|
t.userLogin.RemoteName = newName
|
||||||
|
err := t.userLogin.Save(t.main.Bridge.BackgroundCtx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, syscall.EPIPE) {
|
t.userLogin.Log.Err(err).Msg("Failed to save user login after profile update")
|
||||||
// This is a pipe error, try disconnecting which will force the
|
|
||||||
// updatesManager to fail and cause the client to reconnect.
|
|
||||||
t.userLogin.BridgeState.Send(status.BridgeState{
|
|
||||||
StateEvent: status.StateTransientDisconnect,
|
|
||||||
Error: "pipe-error",
|
|
||||||
Message: humanise.Error(err),
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
t.sendBadCredentialsOrUnknownError(err)
|
|
||||||
}
|
|
||||||
} else if authStatus.Authorized {
|
|
||||||
t.userLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
|
|
||||||
} else {
|
|
||||||
t.onAuthError(fmt.Errorf("not logged in"))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
t.userLogin.BridgeState.Send(status.BridgeState{StateEvent: status.StateConnected})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *TelegramClient) onSession() {
|
||||||
|
t.userLogin.Log.Debug().Msg("Got session created event")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TelegramClient) onAuthError(err error) {
|
func (t *TelegramClient) onAuthError(err error) {
|
||||||
|
|||||||
+2
-23
@@ -20,16 +20,13 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"maunium.net/go/mautrix/bridgev2"
|
"maunium.net/go/mautrix/bridgev2"
|
||||||
"maunium.net/go/mautrix/bridgev2/database"
|
"maunium.net/go/mautrix/bridgev2/database"
|
||||||
"maunium.net/go/mautrix/bridgev2/status"
|
|
||||||
|
|
||||||
"go.mau.fi/mautrix-telegram/pkg/connector/ids"
|
"go.mau.fi/mautrix-telegram/pkg/connector/ids"
|
||||||
"go.mau.fi/mautrix-telegram/pkg/connector/util"
|
|
||||||
"go.mau.fi/mautrix-telegram/pkg/gotd/tg"
|
"go.mau.fi/mautrix-telegram/pkg/gotd/tg"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -131,25 +128,7 @@ func finalizeLogin(ctx context.Context, user *bridgev2.User, authorization *tg.A
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
fullName := util.FormatFullName(me.FirstName, me.LastName, me.Deleted, me.ID)
|
ul.RemoteProfile, ul.RemoteName = userToRemoteProfile(me)
|
||||||
username := me.Username
|
|
||||||
if username == "" && len(me.Usernames) > 0 {
|
|
||||||
username = me.Usernames[0].Username
|
|
||||||
}
|
|
||||||
normalizedPhone := "+" + strings.TrimPrefix(me.Phone, "+")
|
|
||||||
remoteName := username
|
|
||||||
if remoteName == "" {
|
|
||||||
remoteName = normalizedPhone
|
|
||||||
}
|
|
||||||
if remoteName == "" {
|
|
||||||
remoteName = fullName
|
|
||||||
}
|
|
||||||
ul.RemoteName = remoteName
|
|
||||||
ul.RemoteProfile = status.RemoteProfile{
|
|
||||||
Phone: me.Phone,
|
|
||||||
Username: username,
|
|
||||||
Name: fullName,
|
|
||||||
}
|
|
||||||
err = ul.Save(ctx)
|
err = ul.Save(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to save login: %w", err)
|
return nil, fmt.Errorf("failed to save login: %w", err)
|
||||||
@@ -158,7 +137,7 @@ func finalizeLogin(ctx context.Context, user *bridgev2.User, authorization *tg.A
|
|||||||
return &bridgev2.LoginStep{
|
return &bridgev2.LoginStep{
|
||||||
Type: bridgev2.LoginStepTypeComplete,
|
Type: bridgev2.LoginStepTypeComplete,
|
||||||
StepID: LoginStepIDComplete,
|
StepID: LoginStepIDComplete,
|
||||||
Instructions: fmt.Sprintf("Successfully logged in as %d / +%s (%s)", me.ID, me.Phone, remoteName),
|
Instructions: fmt.Sprintf("Successfully logged in as %s (`%d`)", ul.RemoteName, me.ID),
|
||||||
CompleteParams: &bridgev2.LoginCompleteParams{
|
CompleteParams: &bridgev2.LoginCompleteParams{
|
||||||
UserLoginID: ul.ID,
|
UserLoginID: ul.ID,
|
||||||
UserLogin: ul,
|
UserLogin: ul,
|
||||||
|
|||||||
@@ -64,7 +64,7 @@ type Client struct {
|
|||||||
resolver dcs.Resolver // immutable
|
resolver dcs.Resolver // immutable
|
||||||
onDead func() // immutable
|
onDead func() // immutable
|
||||||
onAuthError func(error) // immutable
|
onAuthError func(error) // immutable
|
||||||
onConnected func() // immutable
|
onConnected func(*tg.User) // immutable
|
||||||
newConnBackoff func() backoff.BackOff // immutable
|
newConnBackoff func() backoff.BackOff // immutable
|
||||||
defaultMode manager.ConnMode // immutable
|
defaultMode manager.ConnMode // immutable
|
||||||
|
|
||||||
|
|||||||
@@ -43,7 +43,7 @@ func (c *Client) runUntilRestart(ctx context.Context) error {
|
|||||||
|
|
||||||
c.log.Info("Got self", zap.String("username", self.Username))
|
c.log.Info("Got self", zap.String("username", self.Username))
|
||||||
if c.onConnected != nil {
|
if c.onConnected != nil {
|
||||||
c.onConnected()
|
c.onConnected(self)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -56,7 +56,7 @@ type Options struct {
|
|||||||
OnAuthError func(error)
|
OnAuthError func(error)
|
||||||
// OnConnected will be called when the connection has been established and
|
// OnConnected will be called when the connection has been established and
|
||||||
// the user has been fetched successfully.
|
// the user has been fetched successfully.
|
||||||
OnConnected func()
|
OnConnected func(*tg.User)
|
||||||
// MigrationTimeout configures migration timeout.
|
// MigrationTimeout configures migration timeout.
|
||||||
MigrationTimeout time.Duration
|
MigrationTimeout time.Duration
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user