feat: stop polling when chat relat is present. Process incoming messages from chat relay

Signed-off-by: Dorra Jaouad <dorra.jaoued7@gmail.com>
This commit is contained in:
Dorra Jaouad 2025-11-12 13:51:09 +01:00
parent a3d644d89e
commit adb1652b81
4 changed files with 112 additions and 3 deletions

View file

@ -20,7 +20,8 @@ import { subscribe, unsubscribe } from '@nextcloud/event-bus'
import { computed, inject, onBeforeUnmount, provide, ref, watch } from 'vue'
import { START_LOCATION, useRoute } from 'vue-router'
import { useStore } from 'vuex'
import { CHAT, MESSAGE } from '../constants.ts'
import { CHAT, CONFIG, MESSAGE } from '../constants.ts'
import { getTalkConfig } from '../services/CapabilitiesManager.ts'
import { EventBus } from '../services/EventBus.ts'
import { useChatStore } from '../stores/chat.ts'
import { useChatExtrasStore } from '../stores/chatExtras.ts'
@ -41,6 +42,8 @@ type GetMessagesContext = {
}
const GET_MESSAGES_CONTEXT_KEY: InjectionKey<GetMessagesContext> = Symbol.for('GET_MESSAGES_CONTEXT')
// TOREMOVE in main branch
const experimentalChatRelay = (getTalkConfig('local', 'experiments', 'enabled') ?? 0) & CONFIG.EXPERIMENTAL.CHAT_RELAY
/**
* Check whether caught error is from OCS API
@ -54,6 +57,7 @@ function isAxiosErrorResponse(exception: unknown): exception is AxiosError<strin
let pollingTimeout: NodeJS.Timeout | undefined
let expirationInterval: NodeJS.Timeout | undefined
let pollingErrorTimeout = 1_000
let chatRelaySupported = false
/**
* Composable to provide control logic for fetching messages list
@ -74,6 +78,7 @@ export function useGetMessagesProvider() {
const loadingNewMessages = ref(false)
const isInitialisingMessages = ref(true)
const stopFetchingOldMessages = ref(false)
let chatRelayEnabled = false
/**
* Returns whether the current participant is a participant of current conversation.
@ -144,12 +149,14 @@ export function useGetMessagesProvider() {
}
if (oldToken && oldToken !== newToken) {
store.dispatch('cancelPollNewMessages', { requestId: oldToken })
stopChatRelay()
}
if (newToken && canGetMessages) {
handleStartGettingMessagesPreconditions(newToken)
} else {
store.dispatch('cancelPollNewMessages', { requestId: newToken })
stopChatRelay()
}
/** Remove expired messages when joining a room */
@ -162,6 +169,8 @@ export function useGetMessagesProvider() {
subscribe('networkOnline', handleNetworkOnline)
EventBus.on('route-change', onRouteChange)
EventBus.on('set-context-id-to-bottom', setContextIdToBottom)
EventBus.on('signaling-supported-features', checkChatRelaySupport)
EventBus.on('should-refresh-chat-messages', tryAbortChatRelay)
/** Every 30 seconds we remove expired messages from the store */
expirationInterval = setInterval(() => {
@ -173,8 +182,12 @@ export function useGetMessagesProvider() {
unsubscribe('networkOnline', handleNetworkOnline)
EventBus.off('route-change', onRouteChange)
EventBus.off('set-context-id-to-bottom', setContextIdToBottom)
EventBus.off('signaling-message-received', addMessageFromChatRelay)
EventBus.off('signaling-supported-features', checkChatRelaySupport)
EventBus.off('should-refresh-chat-messages', tryAbortChatRelay)
store.dispatch('cancelPollNewMessages', { requestId: currentToken.value })
stopChatRelay()
clearInterval(pollingTimeout)
clearInterval(expirationInterval)
})
@ -195,6 +208,7 @@ export function useGetMessagesProvider() {
if (currentToken.value) {
console.debug('Canceling message request as we are offline')
store.dispatch('cancelPollNewMessages', { requestId: currentToken.value })
stopChatRelay()
}
}
@ -482,6 +496,10 @@ export function useGetMessagesProvider() {
* @param token token of conversation where a method was called
*/
async function pollNewMessages(token: string) {
if (chatRelayEnabled) {
// Stop polling if chat relay is supported
return
}
// Check that the token has not changed
if (currentToken.value !== token) {
console.debug(`token has changed to ${currentToken.value}, breaking the loop for ${token}`)
@ -499,6 +517,7 @@ export function useGetMessagesProvider() {
requestId: token,
})
debugTimer.end(`${token} | long polling`, 'status 200')
tryChatRelay()
} catch (exception) {
if (Axios.isCancel(exception)) {
debugTimer.end(`${token} | long polling`, 'cancelled')
@ -512,6 +531,7 @@ export function useGetMessagesProvider() {
// This is not an error, so reset error timeout and poll again
pollingErrorTimeout = 1_000
clearTimeout(pollingTimeout)
tryChatRelay({ force: true })
pollingTimeout = setTimeout(() => {
pollNewMessages(token)
}, 500)
@ -539,6 +559,83 @@ export function useGetMessagesProvider() {
}, 500)
}
/**
* Try to start chat relay
*
* @param options
* @param options.force - to skip end reached check when it is guaranteed
*/
function tryChatRelay(options?: { force: boolean }) {
if (chatRelaySupported && (isChatEndReached.value || options?.force)) {
startChatRelay()
}
}
/**
* Check whether chat relay is supported
*
* @param features
*/
function checkChatRelaySupport(features: string[]) {
if (experimentalChatRelay && features.includes('chat-relay')) {
chatRelaySupported = true
tryChatRelay()
} else {
chatRelaySupported = false
}
}
/**
* Initialize chat relay support by stopping polling and listening to chat relay messages
*/
function startChatRelay() {
if (currentToken.value) {
// it might have been set already, ensure we cancel it
store.dispatch('cancelPollNewMessages', { requestId: currentToken.value })
}
chatRelayEnabled = true
EventBus.on('signaling-message-received', addMessageFromChatRelay)
}
/**
* Chat relay sends one message at a time, we update our stores directly
*
* @param payload
* @param payload.token
* @param payload.message
*/
function addMessageFromChatRelay(payload: { token: string, message: ChatMessage }) {
const { token, message } = payload
if (token !== currentToken.value) {
// Guard: Message is for another conversation
// e.g., user switched conversation while messages were in-flight
return
}
chatStore.processChatBlocks(token, [message], { mergeBy: chatStore.getLastKnownId(token) })
store.dispatch('processMessage', { token, message })
}
/**
* Stop chat relay and remove listener
*/
function stopChatRelay() {
chatRelayEnabled = false
EventBus.off('signaling-message-received', addMessageFromChatRelay)
}
/**
* This is needed when something went wrong after starting chat relay
* and the server is no longer sending us messages events
* so we need to abort it to continue getting messages via polling
*/
function tryAbortChatRelay() {
if (chatRelayEnabled && chatRelaySupported) {
stopChatRelay()
pollNewMessages(currentToken.value)
}
}
provide(GET_MESSAGES_CONTEXT_KEY, {
contextMessageId,
loadingOldMessages,

View file

@ -22,6 +22,11 @@ export const CONFIG = {
* to allow join the call without page reload
*/
RECOVER_SESSION: 2,
/**
* Since 22.0.3
* Send chat messages via the High performance-backend / websocket
*/
CHAT_RELAY: 4,
},
} as const

View file

@ -21,6 +21,7 @@ import mitt from 'mitt'
export type Events = {
[key: EventType]: unknown
'audio-player-ended': number
'signaling-message-received': { token: string, message: ChatMessage }
'conversations-received': { singleConversation?: Conversation, fromBrowserStorage?: boolean }
'session-conflict-confirmation': string
'deleted-session-detected': void
@ -53,6 +54,7 @@ export type Events = {
'signaling-users-joined': [StandaloneSignalingJoinSession[]]
'signaling-users-left': [string[]]
'signaling-all-users-changed-in-call-to-disconnected': void
'signaling-supported-features': string[]
'smart-picker-open': void
'switch-to-conversation': { token: string }
'talk:poll-added': { token: string, message: ChatMessage }

View file

@ -1113,6 +1113,7 @@ Signaling.Standalone.prototype.helloResponseReceived = function(data) {
for (i = 0; i < features.length; i++) {
this.features[features[i]] = true
}
this._trigger('supportedFeatures', features)
}
if (!this.settings.helloAuthParams.internal
@ -1430,8 +1431,12 @@ Signaling.Standalone.prototype.processRoomEvent = function(data) {
Signaling.Standalone.prototype.processRoomMessageEvent = function(token, data) {
switch (data.type) {
case 'chat':
// FIXME this is not listened to
EventBus.emit('should-refresh-chat-messages')
if ('comment' in data.chat) {
EventBus.emit('signaling-message-received', { token, message: { ...data.chat.comment, token } })
} else {
// TOREMOVE after HPB next release
EventBus.emit('should-refresh-chat-messages')
}
break
case 'recording':
EventBus.emit('signaling-recording-status-changed', [token, data.recording.status])