순서가 조금 바뀌었긴 했지만 어쨌든 gRPC 설정을 마칠 무렵 Kakfa 세팅이 되어서 node 와 Kafka 를 연결한 과정에 대하여 작성한다.
1. 디렉토리 구조
필자의 디렉토리 구조는 다음과 같다. 설정 파일을 작성하는 Config, Producer, Consumer 를 나누었고, 환경변수의 경우 .env 를 활용하였다.
chat/
├── src/
│ ├── config/
│ │ ├── kafkaConfig.js ---> Kafka 설정 파일 (Kafka 브로커, 토픽 등 설정)
│ ├── grpc/
│ ├── routes/
│ ├── services/
│ │ ├── KafkaConsumer.js ---> Kafka Consumer 관련 로직
│ │ ├── kafkaProducer.js ---> Kafka Producer 관련 로직
│ │ ├── socketHandler.js ---> Socket.IO와 Kafka 연계 로직
│ ├── utils/
│ ├── server.js ---> 메인 서버 엔트리 포인트
├── public/
├── .env ---> 환경 변수 파일 (Kafka 설정 포함 가능)
├── package.json ---> 프로젝트 종속성 및 스크립트 정의 파일
2. 흐름도
3. 코드 구현
3.1 필요한 라이브러리 설치
Kafka 를 사용하기 위해 라이브러리를 설치했다. 필자의 경우 npm 보다 빠른 bun 을 사용중이지만 bun 이 없는 경우 npm 으로 설치해도 무방하다.
$ bun install kafkajs
3.2 KafkaConfig.js 설정
require ('dotenv').config({ path: '../.env' }); // .env 파일 로드
const { Kafka } = require('kafkajs');
const kafkaBrokers = [process.env.KAFKA_BROKERS];
// Kafka 객체 생성 함수 ( 로그와 모니터링을 쉽게하기 위해 clientID 를 Consumer 와 Producer 에서 나누어서 관리하기 )
const createKafkaInstance = (clientId) => {
return new Kafka({
clientId: clientId, // 역할별 clientId 지정
brokers: kafkaBrokers,
});
};
module.exports = createKafkaInstance;
처음 .env 파일은 path 를 지정하지 않아도 알아서 잘 인식하였는데, 어느순간 경로를 제대로 읽지 못하는 경우가 생겨 경로를 지정해주었다. .env 에서 설정한 Kafka 브로커 주소를 불러온 뒤 객체를 생성하였는데, 이 때 clientID 는 실제 성능에 영향을 미치지 않지만 나중에 로그와 모니터링이 필요한 경우 쉽게 관리하기 위해 Consumer 와 Producer 를 나누어주었다.
3.3 KafkaProducer.js 설정
const { v4: uuidv4 } = require('uuid'); // UUID 생성 라이브러리
const createKafkaInstance = require('../config/kafkaConfig');
const kafka = createKafkaInstance('chat-service-producer'); // Kafka 객체 생성
Kafka 에서 ScyllB 로 채팅 로그를 저장하기 때문에 각 메시지의 고유 ID 를 지정하기 위해 UUID 생성 라이브러리를 사용했다. KafkaConfig 설정을 받아온 뒤에, clientID 를 지정해서 Kafka 객체를 생성한다.
const producer = kafka.producer();
// Producer 연결을 유지하는 함수
const connectProducer = async () => {
try {
await producer.connect();
console.log('Kafka producer connected');
} catch (err) {
console.error('Error in kafkaProducer_connectProducer: ', err);
}
};
// 애플리케이션 종료 시 Kafka Producer 연결 해제
const disconnectProducer = async () => {
try {
await producer.disconnect();
console.log('Kafka Producer disconnected');
} catch (err) {
console.error('Error disconnecting Kafka Producer:', err);
}
};
필자의 경우 Producer 의 connect 와 disconnect 를 따로 분리하였다. 연결을 명시적으로 관리하기 때문에 어플리케이션의 상태를 명확히 관리할 수 있고, 어플리케이션 종료시 명시적으로 해제하기 때문에 리소스 관리에도 용이하다. 또한 연결 중 문제가 발생하였을 경우 에러를 처리할 수 있다는 장점을 가지고 있다.
let retryCount = 0;
const maxRetries = 5; // 최대 재시도 횟수
// Kafka 메시지를 전송하는 함수
const sendMessageToKafka = async (topic, roomId, context, userId) => {
const payload = {
message_id: uuidv4(),
room_id: roomId,
user_id: userId,
context: context,
timestamp: new Date().toISOString(), // 현재 시간
};
const trySendMessage = async () => {
try {
await producer.send({
topic: topic,
messages: [{ key: roomId, value: JSON.stringify(payload) }],
});
console.log('Message sent to Kafka:', payload);
} catch (err) {
console.error('Error in kafkaProducer:', err);
if (retryCount < maxRetries) {
retryCount++;
console.log(`Retrying (${retryCount}/${maxRetries})...`);
setTimeout(trySendMessage, 1000); // 1초 대기 후 재시도
} else {
console.error('Max retry limit reached. Message failed:', payload);
}
}
};
await trySendMessage();
}
module.exports = { connectProducer,sendMessageToKafka, disconnectProducer };
Kafka로 메시지를 전송하는 함수이다. 토픽의 경우 { 개인방 : 단체방 = single : group } 의 형태로 두가지로 나누어서 관리하려고 하였으나, Kafka Connect 를 통해 ScyllaDB 에 데이터 저장시 topic 과 scyllaDB 의 테이블 명이 같아야 오류 없이 작동하기 때문에 토픽은 한가지로 정의하여 관리하기로 하였다.
해당 로직은 Socket.IO 로 부터 메시지를 전달 받은 다음, Kafka 로 발행하는 코드이다. 만약 오류 상황을 대비하기 위해 재시도 로직이 들어가 있으며, 재시도 횟수를 지정하지 않게 될 경우 무한루프에 빠질 수 있기 때문에 최대 5번 시도하는 것으로 설정하였다.
3.4 KafkaConsumer.js 설정
const createKafkaInstance = require('../config/kafkaConfig');
const kafka = createKafkaInstance('chat-service-consumer'); // Consumer 용 clientId 설정
const consumer = kafka.consumer({ groupId: 'chat_service' });
Producer 와 동일하게 설정해주었으며, group_id 의 경우 디렉토리라고 생각하면 된다. 예를 들어, 채팅 서비스의 그룹은 chat_service 이고 채팅 로그를 관리하는 그룹이 chat_backup 일 경우 그룹마다 독립적인 오프셋에서 관리되기 때문에 서로 영향을 주지 않는다. 따라서 메시지 소비 목적에 따라 그룹을 나누어 관리가 가능한 셈이다. 현재 필자의 경우 채팅서비스만 관리하면 되기 때문에 group_id 는 하나로 설정했다.
const consumeMessageFromKafka = async (topic, handleMessage) => {
try {
if (!isConsumerRunning) {
await consumer.connect();
// 특정 토픽에 메시지를 구독, 새로운 메시지만 소비하겠다는 설정. true 일 경우에 모든 메시지를 처음부터 소비한다.
//await consumer.subscribe({ topic, fromBeginning: false });
await consumer.subscribe({topic, fromBeginning: process.env.CONSUME_FROM_BEGINNING === 'true'});
// eachMessage : 메시지가 들어올 때마다 호출되는 콜백함수.
await consumer.run({
eachMessage: async ({topic, partition, message}) => {
try {
// Kafka 메시지는 문자열 형식으로 저장되므로, JSON 으로 파싱.
const parsedMessage = JSON.parse(message.value.toString());
const roomId = parsedMessage.room_id;
await handleMessage(roomId, parsedMessage);
console.log('Message consumed from Kafka:', {roomId, parsedMessage});
} catch (err) {
console.error('Error in KafkaConsumer_eachMessage: ', err);
}
},
});
isConsumerRunning = true; // Consumer 실행 상태 업데이트
} else {
console.log('Kafka Consumer is already running.');
}
} catch (err) {
console.error('Error in KafkaConsumer: ', err);
}
};
현재 필자의 경우 하나의 topic 만 사용하고 있긴 하지만, 추후 변경될 수 있다는 점에서 topic 을 매개변수로 받았다. (아직 사용되지는 않는 중) 만약 Consumer 의 연결이 끊겼다 재접속 될 경우 Consumer 가 이미 작동중이기 때문에 재접속 오류를 방지하기 위한 로직을 추가해주었다.
const disconnectConsumer = async () => {
try {
await consumer.disconnect();
console.log('Kafka consumer disconnected');
} catch (err) {
console.error('Error disconnecting Kafka consumer:', err);
}
};
// 종료 처리
process.on('SIGINT', async () => {
console.log('SIGINT received. Closing Kafka consumer...');
await disconnectConsumer();
process.exit(0);
});
process.on('SIGTERM', async () => {
console.log('SIGTERM received. Closing Kafka consumer...');
await disconnectConsumer();
process.exit(0);
});
module.exports = { consumeMessageFromKafka };
서버 종료시 리소스 누수 방지를 위해 종료하는 메서드도 만들어 주었다.
3.5 socketHandler.js 설정
const { sendMessageToKafka } = require('./kafkaProducer');
const { consumeMessageFromKafka } = require('./KafkaConsumer');
const socketHandler = (io) => {
io.on('connection', (socket) => {
console.log('New user connected:', socket.id);
//방 입장
socket.on('joinRoom', (data) => {
if (typeof data === 'string') {
data = JSON.parse(data); // 문자열을 JSON 객체로 변환
}
const { roomId } = data;
console.log(data);
socket.join(roomId);
console.log(`User ${socket.id} joined room ${data.roomId}`);
})
// 클라이언트에서 메시지 전송
socket.on('sendMessage', async (data) => {
if (typeof data === 'string') {
data = JSON.parse(data);
}
const { roomId, message } = data;
const topic = 'chat_messages'
// kafka 로 메시지 전송
await sendMessageToKafka(topic, roomId, message, socket.id);
console.log(`Message from ${socket.id} sent to room ${roomId}`);
})
// Kafka 에서 메시지 소비 및 브로드캐스트
consumeMessageFromKafka('chat_messages', (roomId, message) => {
io.to(roomId).emit('newMessage', message);
}).catch((err) => {
console.error('Error in SocketHandler_consumeMessageFromKafka_single: ', err);
})
// 연결 해제
socket.on('disconnect', () => {
console.log('User disconnected:', socket.id);
});
})
}
module.exports = socketHandler;
설정한 Producer 와 Consumer 파일을 사용하여 Socket.IO 연결시 작동되어야 할 메소드들을 정의하였다. 테스트 환경에서 Socket.IO 를 사용중에 string 데이터로 넘어오는 문제가 발생하여 JSON 으로 파싱하는 과정을 거쳤다.
3.6 Server.js 설정
const cors = require('cors');
const { connectProducer } = require('./services/kafkaProducer');
const io = new Server(server, {
cors: {
origin: '*', // 모든 도메인 허용 (테스트 환경)
methods: ['GET', 'POST'],
},
});
// Express CORS 설정
app.use(cors({
origin: '*', // 모든 도메인 허용 (테스트용)
methods: ['GET', 'POST'],
allowedHeaders: ['Content-Type'],
}));
const socketHandler = require('./services/socketHandler');
socketHandler(io);
외부 써드파티에서 접근하려고 할 때 CORS 문제를 해결하기 위해 모든 도메인에서 접속할 수 있게 설정하였다. 생성된 Socket.IO 서버를 socketHandler 로 전달하여 이벤트를 처리하게 하였다. 따라서 서버 작동시 미리 설정한 socketHandler.js 로 접근하여 내부에서 처리하도록 하였다.
4. 써드파티 (Hoppscotch) 사용
RESTful-API 를 테스트 할 때에는 Postman 을 주로 사용한다. 신기하게도 Postwoman 이라고 불리는 테스트용 툴이 있고 현재는 Hoppscotch 로 변경되었다. Socket.IO 를 테스트 하기 위해 검색하던 중 찾았는데 꽤나 유용하다.
4.1 테스트
먼저 서버를 실행시킨다.
Hoppscotch 에 접속하여 Socket.IO 로 이동 후 Packge.json 에서 Socket.IO 의 버전을 확인한 뒤, 서버 주소로 접근하면 된다. 필자의 경우 Socket.IO 의 버전은 v4, 주소는 ws://localhost:3000 으로 연결한다.
Socket.IO 에 정상적으로 연결되면 "New user connected : " 와 함께 socket 에 연결된 ID 가 출력되도록 하였다.
다음과 같이 작성하여 보내기를 클릭하면 정상적으로 kafka 가 작동하여 해당 방에 join 되어지고 Consumer 가 연결 된 것을 확인할 수 있다.
마지막으로 해당 방에 메시지를 보내게 되면 모두 정상적으로 넘어가는 것을 확인할 수 있다. 현재 joinRoom 을 통해 해당 Socket 으로 유저가 연결된 상태에서 메시지를 보내게 되면 Producer 와 Consumer 모두 정상적으로 동작하는 것을 확인할 수 있다.
'Project > ST00CK' 카테고리의 다른 글
배포환경에서 Socket.IO 의 연결 끊김 이슈 및 해결방법 (1) | 2025.01.16 |
---|---|
Socket.IO TLS 설정 및 배포 (1) | 2025.01.16 |
node.js 에서 gRPC 설정하기 (0) | 2025.01.08 |
ScyllaDB 연결 및 채팅방 API 구현 (1) | 2025.01.01 |
ScyllaDB ERD 작성, API 명세서 작성 (1차) (0) | 2024.12.31 |