Use transactions for internal signaling

Signed-off-by: Joas Schilling <coding@schilljs.com>
This commit is contained in:
Joas Schilling 2022-02-25 20:55:55 +01:00
parent 41d750b534
commit 7576431bac
No known key found for this signature in database
GPG key ID: 7076EA9751AACDDA

View file

@ -26,12 +26,15 @@ namespace OCA\Talk\Signaling;
use OCA\Talk\Model\Session;
use OCA\Talk\Room;
use OCA\Talk\Service\ParticipantService;
use OCP\AppFramework\Db\TTransactional;
use OCP\AppFramework\Utility\ITimeFactory;
use OCP\DB\QueryBuilder\IQueryBuilder;
use OCP\IDBConnection;
class Messages {
use TTransactional;
/** @var IDBConnection */
protected $db;
@ -57,7 +60,10 @@ class Messages {
$delete->delete('talk_internalsignaling')
->where($delete->expr()->in('recipient', $delete->createNamedParameter($sessionIds, IQueryBuilder::PARAM_STR_ARRAY)))
->orWhere($delete->expr()->in('sender', $delete->createNamedParameter($sessionIds, IQueryBuilder::PARAM_STR_ARRAY)));
$delete->executeStatement();
$this->atomic(function () use ($delete) {
$delete->executeStatement();
}, $this->db);
}
/**
@ -96,14 +102,16 @@ class Messages {
);
$participants = $this->participantService->getParticipantsForAllSessions($room);
foreach ($participants as $participant) {
$session = $participant->getSession();
if ($session instanceof Session) {
$insert->setParameter('sender', $session->getSessionId())
->setParameter('recipient', $session->getSessionId())
->executeStatement();
$this->atomic(function () use ($participants, $insert) {
foreach ($participants as $participant) {
$session = $participant->getSession();
if ($session instanceof Session) {
$insert->setParameter('sender', $session->getSessionId())
->setParameter('recipient', $session->getSessionId())
->executeStatement();
}
}
}
}, $this->db);
}
/**
@ -126,18 +134,22 @@ class Messages {
->from('talk_internalsignaling')
->where($query->expr()->eq('recipient', $query->createNamedParameter($sessionId)))
->andWhere($query->expr()->lte('timestamp', $query->createNamedParameter($time)));
$result = $query->executeQuery();
while ($row = $result->fetch()) {
$messages[] = ['type' => 'message', 'data' => $row['message']];
}
$result->closeCursor();
$delete = $this->db->getQueryBuilder();
$delete->delete('talk_internalsignaling')
->where($delete->expr()->eq('recipient', $delete->createNamedParameter($sessionId)))
->andWhere($delete->expr()->lte('timestamp', $delete->createNamedParameter($time)));
$delete->executeStatement();
$this->atomic(function () use (&$messages, $query, $delete) {
$result = $query->executeQuery();
while ($row = $result->fetch()) {
$messages[] = ['type' => 'message', 'data' => $row['message']];
}
$result->closeCursor();
$delete->executeStatement();
}, $this->db);
return $messages;
}
@ -153,6 +165,9 @@ class Messages {
$delete = $this->db->getQueryBuilder();
$delete->delete('talk_internalsignaling')
->where($delete->expr()->lt('timestamp', $delete->createNamedParameter($time)));
$delete->executeStatement();
$this->atomic(function () use ($delete) {
$delete->executeStatement();
}, $this->db);
}
}