Merge pull request #16359 from nextcloud/fix/noid/chat-relay-adjustment

fix: continue chat relay permanently
This commit is contained in:
Dorra 2025-11-24 15:14:56 +01:00 committed by GitHub
commit fd72c5a04e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 98 additions and 59 deletions

View file

@ -57,7 +57,7 @@ function isAxiosErrorResponse(exception: unknown): exception is AxiosError<strin
let pollingTimeout: NodeJS.Timeout | undefined
let expirationInterval: NodeJS.Timeout | undefined
let pollingErrorTimeout = 1_000
let chatRelaySupported = false
let chatRelaySupported: boolean | null = null
/**
* Composable to provide control logic for fetching messages list
@ -78,7 +78,6 @@ export function useGetMessagesProvider() {
const loadingNewMessages = ref(false)
const isInitialisingMessages = ref(true)
const stopFetchingOldMessages = ref(false)
let chatRelayEnabled = false
/**
* Returns whether the current participant is a participant of current conversation.
@ -149,14 +148,13 @@ export function useGetMessagesProvider() {
}
if (oldToken && oldToken !== newToken) {
store.dispatch('cancelPollNewMessages', { requestId: oldToken })
stopChatRelay()
chatRelaySupported = null
}
if (newToken && canGetMessages) {
handleStartGettingMessagesPreconditions(newToken)
} else {
store.dispatch('cancelPollNewMessages', { requestId: newToken })
stopChatRelay()
}
/** Remove expired messages when joining a room */
@ -169,8 +167,11 @@ export function useGetMessagesProvider() {
subscribe('networkOnline', handleNetworkOnline)
EventBus.on('route-change', onRouteChange)
EventBus.on('set-context-id-to-bottom', setContextIdToBottom)
EventBus.on('signaling-supported-features', checkChatRelaySupport)
EventBus.on('should-refresh-chat-messages', tryAbortChatRelay)
if (experimentalChatRelay) {
EventBus.on('signaling-message-received', addMessageFromChatRelay)
EventBus.on('signaling-supported-features', checkChatRelaySupport)
EventBus.on('should-refresh-chat-messages', tryPollNewMessages)
}
/** Every 30 seconds we remove expired messages from the store */
expirationInterval = setInterval(() => {
@ -184,10 +185,9 @@ export function useGetMessagesProvider() {
EventBus.off('set-context-id-to-bottom', setContextIdToBottom)
EventBus.off('signaling-message-received', addMessageFromChatRelay)
EventBus.off('signaling-supported-features', checkChatRelaySupport)
EventBus.off('should-refresh-chat-messages', tryAbortChatRelay)
EventBus.off('should-refresh-chat-messages', tryPollNewMessages)
store.dispatch('cancelPollNewMessages', { requestId: currentToken.value })
stopChatRelay()
clearInterval(pollingTimeout)
clearInterval(expirationInterval)
})
@ -208,7 +208,6 @@ export function useGetMessagesProvider() {
if (currentToken.value) {
console.debug('Canceling message request as we are offline')
store.dispatch('cancelPollNewMessages', { requestId: currentToken.value })
stopChatRelay()
}
}
@ -280,7 +279,7 @@ export function useGetMessagesProvider() {
messageId = nearestContextMessageId
}
if (messageId === firstContextMessageId) {
if (messageId === firstContextMessageId || !chatStore.hasEnoughMessages(token, { messageId, threadId })) {
// message is the first one in the block, try to get some messages above
isInitialisingMessages.value = true
await getOldMessages(token, false, { messageId, threadId })
@ -349,8 +348,22 @@ export function useGetMessagesProvider() {
isInitialisingMessages.value = false
// Once the history is received, starts looking for new messages.
await pollNewMessages(token)
if (!experimentalChatRelay) {
pollNewMessages(token)
} else if (chatRelaySupported !== null) {
// Case: chat relay is confirmed to be supported / not supported from signaling hello message,
// but polling was not immediately triggered (e.g, when received while context request is ongoing)
pollNewMessages(token)
} else {
// Fallback polling in case signaling does not work and we will never receive Hello message
// chatRelaySupported is still null (signaling hello was not received yet)
pollingTimeout = setTimeout(() => {
if (chatRelaySupported) {
return
}
pollNewMessages(token)
}, 30_000)
}
}
/**
@ -496,10 +509,6 @@ export function useGetMessagesProvider() {
* @param token token of conversation where a method was called
*/
async function pollNewMessages(token: string) {
if (chatRelayEnabled) {
// Stop polling if chat relay is supported
return
}
// Check that the token has not changed
if (currentToken.value !== token) {
console.debug(`token has changed to ${currentToken.value}, breaking the loop for ${token}`)
@ -514,10 +523,10 @@ export function useGetMessagesProvider() {
token,
lastKnownMessageId: chatStore.getLastKnownId(token),
requestId: token,
timeout: chatRelaySupported ? 0 : undefined,
})
pollingErrorTimeout = 1_000
debugTimer.end(`${token} | long polling`, 'status 200')
tryChatRelay()
} catch (exception) {
if (Axios.isCancel(exception)) {
debugTimer.end(`${token} | long polling`, 'cancelled')
@ -531,7 +540,9 @@ export function useGetMessagesProvider() {
// This is not an error, so reset error timeout and poll again
pollingErrorTimeout = 1_000
clearTimeout(pollingTimeout)
tryChatRelay({ force: true })
if (chatRelaySupported) {
return
}
pollingTimeout = setTimeout(() => {
pollNewMessages(token)
}, 500)
@ -547,6 +558,9 @@ export function useGetMessagesProvider() {
console.debug('Error happened while getting chat messages. Trying again in %d seconds', pollingErrorTimeout / 1_000, exception)
clearTimeout(pollingTimeout)
if (chatRelaySupported) {
return
}
pollingTimeout = setTimeout(() => {
pollNewMessages(token)
}, pollingErrorTimeout)
@ -554,21 +568,23 @@ export function useGetMessagesProvider() {
}
clearTimeout(pollingTimeout)
if (chatRelaySupported) {
return
}
pollingTimeout = setTimeout(() => {
pollNewMessages(token)
}, 500)
}
/**
* Try to start chat relay
*
* @param options
* @param options.force - to skip end reached check when it is guaranteed
*/
function tryChatRelay(options?: { force: boolean }) {
if (chatRelaySupported && (isChatEndReached.value || options?.force)) {
startChatRelay()
function tryPollNewMessages() {
if (!chatRelaySupported) {
// the event is only relevant when chat relay is supported
return
}
pollNewMessages(currentToken.value)
}
/**
@ -576,25 +592,20 @@ export function useGetMessagesProvider() {
*
* @param features
*/
function checkChatRelaySupport(features: string[]) {
if (experimentalChatRelay && features.includes('chat-relay')) {
async function checkChatRelaySupport(features: string[]) {
if (features.includes('chat-relay')) {
chatRelaySupported = true
tryChatRelay()
} else {
chatRelaySupported = false
}
}
/**
* Initialize chat relay support by stopping polling and listening to chat relay messages
*/
function startChatRelay() {
if (currentToken.value) {
// it might have been set already, ensure we cancel it
store.dispatch('cancelPollNewMessages', { requestId: currentToken.value })
if (!pollingTimeout) {
// Context request is still ongoing
return
}
chatRelayEnabled = true
EventBus.on('signaling-message-received', addMessageFromChatRelay)
// Once the history and Hello signaling message is received, starts looking for new messages.
clearTimeout(pollingTimeout)
await pollNewMessages(currentToken.value)
}
/**
@ -605,6 +616,11 @@ export function useGetMessagesProvider() {
* @param payload.message
*/
function addMessageFromChatRelay(payload: { token: string, message: ChatMessage }) {
if (!chatRelaySupported) {
// chat relay is not supported, ignore the message
return
}
const { token, message } = payload
if (token !== currentToken.value) {
// Guard: Message is for another conversation
@ -616,26 +632,6 @@ export function useGetMessagesProvider() {
store.dispatch('processMessage', { token, message })
}
/**
* Stop chat relay and remove listener
*/
function stopChatRelay() {
chatRelayEnabled = false
EventBus.off('signaling-message-received', addMessageFromChatRelay)
}
/**
* This is needed when something went wrong after starting chat relay
* and the server is no longer sending us messages events
* so we need to abort it to continue getting messages via polling
*/
function tryAbortChatRelay() {
if (chatRelayEnabled && chatRelaySupported) {
stopChatRelay()
pollNewMessages(currentToken.value)
}
}
provide(GET_MESSAGES_CONTEXT_KEY, {
contextMessageId,
loadingOldMessages,

View file

@ -87,12 +87,14 @@ async function fetchMessages({
* @param data.lastKnownMessageId The id of the last message in the store.
* @param data.token The conversation token;
* @param [data.limit] Number of messages to load
* @param data.timeout Timeout duration for long polling
* @param [options] Axios request options
*/
async function pollNewMessages({
token,
lastKnownMessageId,
limit = 100,
timeout,
}: ReceiveMessagesPayload, options?: AxiosRequestConfig): receiveMessagesResponse {
return axios.get(generateOcsUrl('apps/spreed/api/v1/chat/{token}', { token }), {
...options,
@ -103,6 +105,7 @@ async function pollNewMessages({
limit,
includeLastKnown: 0,
markNotificationsAsRead: 0,
timeout,
} as receiveMessagesParams,
})
}

View file

@ -1046,8 +1046,9 @@ const actions = {
* @param {string} data.requestId id to identify request uniquely
* @param {object} data.requestOptions request options;
* @param {number} data.lastKnownMessageId The id of the last message in the store.
* @param data.timeout
*/
async pollNewMessages(context, { token, lastKnownMessageId, requestId, requestOptions }) {
async pollNewMessages(context, { token, lastKnownMessageId, requestId, timeout, requestOptions }) {
const actorStore = useActorStore()
context.dispatch('cancelPollNewMessages', { requestId })
@ -1067,6 +1068,7 @@ const actions = {
token,
lastKnownMessageId,
limit: CHAT.FETCH_LIMIT,
timeout,
}, requestOptions)
context.commit('setCancelPollNewMessages', { requestId })

View file

@ -196,6 +196,40 @@ export const useChatStore = defineStore('chat', () => {
return Math.min(...filterNumericIds(contextBlock))
}
/**
*
* Check whether there are enough messages to render in the selected context, in particular
* when there are other blocks, it means there is likely more history to load
*
* @param token The conversation token
* @param data The data object containing messageId and threadId
* @param data.messageId The message id
* @param data.threadId The thread id
*/
function hasEnoughMessages(
token: string,
{ messageId = 0, threadId = 0 }: GetMessagesListOptions = { messageId: 0, threadId: 0 },
): boolean {
let contextBlock: Set<number>
const numBlocks = (threadId ? threadBlocks[token][threadId]?.length : chatBlocks[token]?.length) ?? 0
if (numBlocks <= 1) {
// If only one block, we cannot assume there is more history to load
return true
}
if (threadId) {
contextBlock = (messageId <= 0)
? threadBlocks[token][threadId][0]
: threadBlocks[token][threadId].find((set) => set.has(messageId)) ?? threadBlocks[token][threadId][0]
} else {
contextBlock = (messageId <= 0)
? chatBlocks[token][0]
: chatBlocks[token].find((set) => set.has(messageId)) ?? chatBlocks[token][0]
}
return contextBlock.size > 10
}
/**
* Returns last known message id, belonging to current context. Defaults to given messageId
*
@ -554,5 +588,6 @@ export const useChatStore = defineStore('chat', () => {
removeMessagesFromChatBlocks,
clearMessagesHistory,
purgeChatStore,
hasEnoughMessages,
}
})

View file

@ -454,6 +454,8 @@ Signaling.Internal.prototype._joinRoomSuccess = function(token, sessionId) {
this.sessionId = sessionId
this._trigger('sessionId', [this.sessionId])
this._startPullingMessages()
// Event needed to inform about chat relay support
this._trigger('supportedFeatures', [])
}
Signaling.Internal.prototype._doLeaveRoom = function(token) {
@ -1113,7 +1115,6 @@ Signaling.Standalone.prototype.helloResponseReceived = function(data) {
for (i = 0; i < features.length; i++) {
this.features[features[i]] = true
}
this._trigger('supportedFeatures', features)
}
if (!this.settings.helloAuthParams.internal
@ -1306,6 +1307,8 @@ Signaling.Standalone.prototype.joinResponseReceived = function(data, token) {
})
this.roomCollection.sort()
}
this._trigger('supportedFeatures', Object.keys(this.features))
}
Signaling.Standalone.prototype._doLeaveRoom = function(token) {