이전에 https://jeenukchung.tistory.com/62 을 작성하며 gRPC 통신을 간단하게 구현한 적 있다. 이제 공식적으로 알림서비스가 배포되었기 때문에 해당 서비스에 맞는 요청을 보내야 한다. 이번 글에서는 오프라인 유저를 찾아서 gRPC 로 요청하는 로직을 구현한다.


1. 오프라인 유저 찾기 로직

오프라인 유저를 찾기위한 가정을 다음과 같이 하였다.

User 1, 2, 3 이 속해 있는 room A 가 있다고 가정하자. 만약 1 과 2 가 로그인하였을 경우 바로 socket.io 에 연결이 되며 동시에 userId 와 socket.id 가 맵핑을 한다. 유저 1과 2가 room A 에 입장하여 대화를 주고 받을 때, 해당 방에 속해있는 유저를 찾아 현재 대화방에 접속한 유저와 대조하여 오프라인 유저를 찾아낸다. 이때, 3 유저가 오프라인이기 때문에 3 유저에게만 알림을 보낸다. 여기서 2 가 해당 방을 나간 상태에서 1 이 채팅을 보냈을 경우 2 와 3 에게 알림을 보낸다.


2. userId 와 socket.id 의 맵핑 이유

userId 와 socket.id 의 맵핑 이유는 크게 3가지로 들 수 있다. 예를들어 현재 핸드폰과 PC 에 서로 같은 아이디로 로그인이 되어 있을 때, 핸드폰에는 푸시 알림이 와야 하고, PC 에서는 푸시알림 대신 새로운 메시지의 갯수만 증가하게 하기 위해서 사용되어야 한다.

 

2.1 멀티 디바이스 로그인 문제

한 사용자가 여러개의 기기(PC, 모바일 등)에서 같은 계정(userId) 으로 로그인 할 경우 서버가 socket.id 를 관리하지 않으면, 어떤 기기가 접속되어 있는지 알 수 없다. 특정 방에서 한 기기에서 leavRoom 을 한다면, 다른 기기에서도 강제로 나가게 되는 문제가 발생할 수 있다. 이를 해결하기 위해 userId 와 함께 socket.id 를 저장하여, 기기별로 개별적으로 방 관리를 할 수 있도록 한다.

 

2.2 dicsonnect 시, 잘못된 유저 삭제

disconnect 이벤트 발생 시, 어떤 기기에서 연결이 끊겼는지 모른다. userId 만 관리한다면, 한 기기가 disconnect 될 때 모든 기기가 오프라인으로 처리가 된다. 이를 해결하기 위해선 userId 와 socket.id 를 매핑하여, 연결이 끊긴 socket.id 만 삭제해야 한다.

 

2.3 특정 기기에서만 알림을 보내는 기능이 불가능

사용자가 여러 개의 기기에서 접속했을 때, 특정 기기에서만 알림을 받고 싶을 경우 문제가 발생한다. userId 만을 기반으로 메시지 전송을 처리한다면, 어떤 기기가 온라인이고, 어떤 기기가 오프라인인지 알 수 없다. 결과적으로 푸시 알림을 받을 기기와 실시간 메시지를 받을 기기를 구분할 수 없다. 이를 해결하기 위해 userId가 아닌 socket.id 를 기반으로 개별 기기를 관리하여, 어떤 기기에서 알림을 받을지 선택할 수 있어야 한다.


3. 흐름도


4. 소켓 연결시 userId ↔ socket.id 매핑

4.1 userSocketMap 선언

const userSocketMap = new Map(); // User ID와 Socket ID 매핑 관리

 

userSocketMap 은 User ID 를 Key 로, 해당 유저의 Socket ID 목록(Set) 을 값으로 저장하는 Map 객체이다. 한명의 유저가 여러 기기나 브라우저 탭에서 접속할 수 있기 때문에 Set 을 사용하여 중복을 방지하면서 여러개의 socket.id 를 저장한다.

userSocketMap 의 구조 : { userId : Set ( socketId1, socketId2, ...) } 

 

4.2 socketHandler 함수 정의

const socketHandler = (io) => {
io.on('connection', (socket) => {
console.log('New user connected:', socket.id);

Socket.io 의 이벤트 리스너를 설정한다. 새로운 클라이언트가 연결될 때마다 실행된다.

 

4.3 유저 ID 확인

const userId = socket.handshake.query.userId;
if (!userId) {
console.error("커넥션을 위한 유저아이디가 존재하지 않습니다.");
return;
}

클라이언트가 연결될 때, URL 쿼리 파라미터를 가져와 유저를 식별한다. API GET 요청시 파라미터 값을 URL 쿼리로 전송하는 방법과 같다. socket.handshake.query.userId 는 클라이언트에서 Socket 연결시 전달한 유저 ID 의 값이다. 

예시로 클라이언트 연결시 다음과 같은 주소로 연결을 시도한다. ws://localhost:3000?userId=user123

 

4.4 해당 유저가 처음 접속한 경우, 새로운 Set 생성

if (!userSocketMap.has(userId)) {
userSocketMap.set(userId, new Set());
}

userSocketMap 에서 userId 가 존재하지 않을 경우 새로운 Set 을 생성하여 userId 를 Key 로 추가한다.

 

4.5 해당 유저가 있을 경우, 해당 유저의 Set 에 현재 socket.id 추가

userSocketMap.get(userId).add(socket.id);

.get(userId) 를 통해 해당 유저의 Socket ID Set 을 가져온 뒤에 현재 연결된 Socket ID 를 추가한다.

하나의 유저 ID 값으로 여러개의 소켓을 관리하는 경우 userSocketMap 의 형태는 다음과 같다.

userSocketMap = { "user123" : Set { "socket1", "socket2" } , "user456" : Set { "socket3" } }

 

4.6 socket.data.userId 에 유저 ID 저장

socket.data.userId = userId;
console.log(`💡User ${userId} registered with socket ${socket.id}`);

socket.data 는 각 소켓 연결에 대한 데이터를 저장할 수 있는 공간이다. 해당 공간에 저장하게 되면 이후 socket 객체에서 직접 유저 정보를 가지고 올 수 있다.

 

4.7 결과

 


5. socket.join 을 통해 방에 입장

5.1 사용자가 'joinRoom' 이벤트를 발생시키면 실행

socket.on('joinRoom', (data) => {
if (typeof data === 'string') {
data = JSON.parse(data); // 문자열을 JSON 객체로 변환
}
const { roomId } = data;
const userId = socket.data.userId;
socket.join(roomId);

클라이언트에서 joinRoom 이벤트를 보내면 실행된다. 만약 받은 데이터가 문자열이면 JSON 으로 변환해준뒤 클라이언트에서 넘겨준 데이터의 값 중 roomId 를 추출한다. socket.data.userID 는 소켓 연결 시 저장한 userId 를 가지고 온다. 다음 해당 방에 유저의 socket.id 를 추가한다.

 

5.2 roomUserMap 에서 해당 방이 존재하는지 확인하고 없으면 추가

if (!roomUserMap.has(roomId)) {
roomUserMap.set(roomId, new Map());
}

roomUserMap 은 방 ID 를 키로 하고, 해당 방에 속한 유저들을 저장하는 Map 이다.

 

5.3 해당 방에 있는 유저 목록을 가져온 뒤 현재 유저가 방에 존재하는지 확인하고 없으면 추가

const roomUsers = roomUserMap.get(roomId);
if (!roomUsers.has(userId)) {
roomUsers.set(userId, new Set());
}
roomUsers.get(userId).add(socket.id);
console.log(`💡User ${userId} joined room ${roomId}`);
console.log("💡", io.sockets.adapter.rooms.get(roomId));

roomUsers 는 해당 방에 속한 유저들의 userId 를 저장하는 Map 이다. userId 가 존재하지 않으면 새로운 Set 을 생성하여 유저를 추가한다. Set 을 사용하는 이유는 한 명의 유저가 여러개의 소켓으로 연결될 수 있기 때문이다. 후에 roomUsers 에 userId 의 socket.id 를 추가한다. roomUsers 의 형태는 userSocketMap 과 동일하다.

 

5.4 결과


6. sendMessage 이벤트 발생

6.1 메시지 데이터 처리

if (typeof data === 'string') {
data = JSON.parse(data);
}
const { roomId, context } = data;
const topic = 'chat_messages';
const userId = socket.data.userId;
console.log(userId);

클라이언트에서 받은 메시지가 문자열일 경우 객체로 변환한 뒤 roomId 와 context 를 추출한다. Kafka 로 전송을 위한 topic 을 설정하고 현재 메시지를 보낸 유저 ID 를 저장한다.

 

6.2 Kafka 를 활용한 메시지 전송

await sendMessageToKafka(topic, roomId, context, userId);

Kafka 에 메시지를 전송한다. 

 

6.3 현재 방에 속한 유저 목록과 오프라인 유저 판별

const usersInRoom = await getUsersInRoom(roomId);
console.log("💡usersInRoom --------> ", usersInRoom);
const offlineUsers = usersInRoom.filter(userId => {
const isOnline = userSocketMap.has(userId);
const isInRoom = roomUserMap.has(roomId) && roomUserMap.get(roomId).has(userId);
console.log(`💡Checking user ${userId}: isOnline=${isOnline}, isInRoom=${isInRoom}`);
return !isOnline || !isInRoom; // 방에 없거나 완전히 오프라인이면 offlineUsers로 간주
});
console.log("💡offlineUser --------> ", offlineUsers);

먼저 getUsersInRoom 을 통해 ScyllaDB 에서 현재 방에 속해 있는 유저의 목록을 가지고 온다. 다음 현재 방에 있는 유저 목록과 현재 접속된 유저 정보를 비교한다. isOnline 은 userSocketMap 을 통해 유저가 현재 온라인인지 확인한다. isInRoom 은 roomUserMap 을 통해 유저가 해당 방에 속해 있는지 확인한다. 둘 중 하나라도 false 이면 오프라인 유저로 판별하여 offlineUsers 목록에 추가한다. 

유저가 온라인 이지만 해당 방에 속해있지 않은 경우 알림을 보내야 하기 때문에 다음과 같은 로직을 작성하였다. 

 

6.4 오프라인 유저가 없으면 종료, 있을 경우 gRPC 를 통해 오프라인 유저에게 알림 전송

if (offlineUsers.length === 0) {
console.log("💡No offline users to notify.");
return;
}
await Promise.all(
offlineUsers.map(async (userId) => {
try {
await sendNotification(roomId, userId, context);
console.log(`💡Notification sent to user ${userId} in room ${roomId}`);
} catch (error) {
console.error("Error in socketHandler_sendMessage: ", error);
}
})
);

오프라인 유저가 없다면 더 이상 알림을 보낼 필요가 없기 때문에 함수를 종료한다. 만약에 오프라인 유저가 있을 경우 sendNotification 을 통해 gRPC 서버에 요청을 보낸다.

 

6.5 결과


7. 마치며

현재 gRPC 에 오프라인 유저를 판별한 뒤 알림 요청을 보내는 코드 구현을 작성하였다. 다음 글에서는 변경된 방 나가기 기능과 자잘한 오류 수정에 관해서 작성해보도록 할 예정이다.

더보기

전체코드

const socketHandler = (io) => {
io.on('connection', (socket) => {
console.log('New user connected:', socket.id);
// 클라이언트가 연결할 때 자동으로 userId 등록
const userId = socket.handshake.query.userId;
if (!userId) {
console.error("커넥션을 위한 유저아이디가 존재하지 않습니다.");
return;
}
if (!userSocketMap.has(userId)) {
userSocketMap.set(userId, new Set());
}
userSocketMap.get(userId).add(socket.id);
socket.data.userId = userId;
console.log(`💡User ${userId} registered with socket ${socket.id}`);
//방 입장
socket.on('joinRoom', (data) => {
if (typeof data === 'string') {
data = JSON.parse(data); // 문자열을 JSON 객체로 변환
}
const { roomId } = data;
const userId = socket.data.userId;
socket.join(roomId);
if (!roomUserMap.has(roomId)) {
roomUserMap.set(roomId, new Map());
}
const roomUsers = roomUserMap.get(roomId);
if (!roomUsers.has(userId)) {
roomUsers.set(userId, new Set());
}
roomUsers.get(userId).add(socket.id);
console.log(`💡User ${userId} joined room ${roomId}`);
console.log("💡", io.sockets.adapter.rooms.get(roomId));
})
// 클라이언트에서 메시지 전송
socket.on('sendMessage', async (data) => {
if (typeof data === 'string') {
data = JSON.parse(data);
}
const { roomId, context } = data;
const topic = 'chat_messages'
const userId = socket.data.userId;
console.log(userId);
// kafka 로 메시지 전송
await sendMessageToKafka(topic, roomId, context, userId);
const usersInRoom = await getUsersInRoom(roomId); // 방에 속한 유저 목록
console.log("💡usersInRoom --------> ", usersInRoom);
const offlineUsers = usersInRoom.filter(userId => {
const isOnline = userSocketMap.has(userId);
const isInRoom = roomUserMap.has(roomId) && roomUserMap.get(roomId).has(userId);
console.log(`💡Checking user ${userId}: isOnline=${isOnline}, isInRoom=${isInRoom}`);
return !isOnline || !isInRoom; // 방에 없거나 완전히 오프라인이면 offlineUsers 로 간주
});
console.log("💡offlineUser --------> ", offlineUsers);
if (offlineUsers.length === 0) {
console.log("💡No offline users to notify.");
return;
}
await Promise.all(
offlineUsers.map(async (userId) => {
try{
await sendNotification(roomId, userId, context);
console.log(`💡Notification sent to user ${userId} in room ${roomId}`);
} catch (error) {
console.error("Error in socketHandler_sendMessage: ", error);
}
})
);
})
// Kafka 에서 메시지 소비 및 브로드캐스트
consumeMessageFromKafka('chat_messages', (roomId, message) => {
io.to(roomId).emit('newMessage', message);
}).catch((err) => {
console.error('Error in SocketHandler_consumeMessageFromKafka_single: ', err);
})
})
}