서버를 키고 postman 에서 socet.io 를 통해 연결할 때 , user123 과 user456 을 동시에 실행시키면 오류가 발생하였다. 이를 해결하는 과정을 작성한다.
1. 문제 분석
Error in KafkaConsumer: KafkaJSNonRetriableError: Cannot subscribe to topic while consumer is running
at Object.subscribe (/Users/jeenukchung/Documents/ST00CK/node_modules/kafkajs/src/consumer/index.js:136:13)
at consumeMessageFromKafka (/Users/jeenukchung/Documents/ST00CK/src/services/KafkaConsumer.js:14:28)
at process.processTicksAndRejections (node:internal/process/task_queues:105:5) {
retriable: false,
helpUrl: undefined,
[cause]: undefined
}
해당 오류 코드를 보게 되면 Kafka Consumer 가 이미 실행 중인데, 또 다시 subscribe() 를 호출하면서 발생한 문제이다.
2. 변경 전 KafkaConsumer.js
const consumeMessageFromKafka = async (topic, handleMessage) => {
try {
if (!isConsumerRunning) {
await consumer.connect();
// 특정 토픽에 메시지를 구독, 새로운 메시지만 소비하겠다는 설정. true 일 경우에 모든 메시지를 처음부터 소비한다.
//await consumer.subscribe({ topic, fromBeginning: false });
await consumer.subscribe({topic, fromBeginning: process.env.CONSUME_FROM_BEGINNING === 'true'});
// eachMessage : 메시지가 들어올 때마다 호출되는 콜백함수.
await consumer.run({
eachMessage: async ({topic, partition, message}) => {
try {
// Kafka 메시지는 문자열 형식으로 저장되므로, JSON 으로 파싱.
const parsedMessage = JSON.parse(message.value.toString());
const roomId = parsedMessage.room_id;
await handleMessage(roomId, parsedMessage);
console.log('Message consumed from Kafka:', {roomId, parsedMessage});
} catch (err) {
console.error('Error in KafkaConsumer_eachMessage: ', err);
}
},
});
isConsumerRunning = true; // Consumer 실행 상태 업데이트
} else {
console.log('Kafka Consumer is already running.');
} catch (err) {
console.error('Error in KafkaConsumer: ', err);
}
};
필자는 server.js 가 실행되고 socket.io 에 연결될 때 kafkaConsumer 가 동작되도록 설계하였다. 따라서 이 때 isConsumerRunning 을 통해 중복 실행되는 것을 방지하였다. 하지만 해당 플래그는 여러 요청이 빠르게 들어올 경우 subscribe() 가 중복 호출 될 가능성이 존재한다. 플래그가 true 가 되기 전에 또 다른 연결이 들어오면서 subscribe() 를 다시 호출하는 문제가 발생한 것이다.
현재 코드는 server.js → socketHandler.js → KafkaConsumer 순으로 동작한다. KafkaConsumer 의 실행 여부를 KafkaConsumer 에서 확인한다. 따라서 socketHandler.js 에서 먼저 중복 방지를 확인한 후 KafkaConsumer 로 넘겨주도록 변경해야 한다.
3. 변경 후 KafkaConsumer.js 와 socketHandler.js
const consumeMessageFromKafka = async (topic, handleMessage) => {
try {
if(isConsumerRunning) {
console.log("Kafka Consumer is already running.");
return;
}
console.log("📡 Connecting Kafka Consumer...");
await consumer.connect();
await consumer.subscribe({topic, fromBeginning: process.env.CONSUME_FROM_BEGINNING === 'true'});
isConsumerRunning = true;
await consumer.run({
eachMessage: async ({topic, partition, message}) => {
try {
const parsedMessage = JSON.parse(message.value.toString());
const roomId = parsedMessage.room_id;
await handleMessage(roomId, parsedMessage);
console.log('📩 Message consumed from Kafka:', {roomId, parsedMessage});
} catch (err) {
console.error('❌ Error in KafkaConsumer_eachMessage: ', err);
}
}
});
} catch (err) {
console.error('Error in KafkaConsumer: ', err);
}
};
const { consumeMessageFromKafka } = require('../services/KafkaConsumer');
let isKafkaInitialized = false; // Kafka 중복 실행 방지
const socketHandler = (io) => {
io.on('connection', (socket) => {
console.log('New user connected:', socket.id);
if (!isKafkaInitialized) { // Kafka Consumer가 한 번만 실행되도록 설정
console.log("Initializing Kafka Consumer...");
isKafkaInitialized = true;
}
});
};
module.exports = socketHandler;
- 서버가 처음 시작될 때 Kafka Consumer 를 실행한다.
- isKafkaInitialized 가 true 로 변경됨에 따라 이후 소켓이 연결되더라고 if 문이 실행되지 않는다.
- Kafka Consumer 는 run() 이 실행된 상태에서 계속 메시지를 소비한다.
4. 결과

서버 실행 후 user123 과 user456 을 동시에 접속시도하였을 때 오류 없이 정상적으로 작동 되는 것을 확인했다. 로그를 보면 user123 과 user456 이 socket 에 연결된 이후 Consumer 에 접근 되는 것을 확인할 수 있다.
'Project > ST00CK' 카테고리의 다른 글
gRPC 와 REST API 서버 동시에 사용하기 (0) | 2025.03.04 |
---|---|
Redis 를 활용한 오프라인 유저 찾기 로직 (0) | 2025.02.27 |
socket.io 방 나가기 기능 및 연결 해제 처리 (0) | 2025.02.25 |
gRPC로 오프라인 유저를 찾기 위한 로직 구현 (0) | 2025.02.24 |
Jenkins Pipeline 설정 및 K8S 배포를 위한 deployment 작성 (3) (1) | 2025.02.21 |
서버를 키고 postman 에서 socet.io 를 통해 연결할 때 , user123 과 user456 을 동시에 실행시키면 오류가 발생하였다. 이를 해결하는 과정을 작성한다.
1. 문제 분석
Error in KafkaConsumer: KafkaJSNonRetriableError: Cannot subscribe to topic while consumer is running
at Object.subscribe (/Users/jeenukchung/Documents/ST00CK/node_modules/kafkajs/src/consumer/index.js:136:13)
at consumeMessageFromKafka (/Users/jeenukchung/Documents/ST00CK/src/services/KafkaConsumer.js:14:28)
at process.processTicksAndRejections (node:internal/process/task_queues:105:5) {
retriable: false,
helpUrl: undefined,
[cause]: undefined
}
해당 오류 코드를 보게 되면 Kafka Consumer 가 이미 실행 중인데, 또 다시 subscribe() 를 호출하면서 발생한 문제이다.
2. 변경 전 KafkaConsumer.js
const consumeMessageFromKafka = async (topic, handleMessage) => {
try {
if (!isConsumerRunning) {
await consumer.connect();
// 특정 토픽에 메시지를 구독, 새로운 메시지만 소비하겠다는 설정. true 일 경우에 모든 메시지를 처음부터 소비한다.
//await consumer.subscribe({ topic, fromBeginning: false });
await consumer.subscribe({topic, fromBeginning: process.env.CONSUME_FROM_BEGINNING === 'true'});
// eachMessage : 메시지가 들어올 때마다 호출되는 콜백함수.
await consumer.run({
eachMessage: async ({topic, partition, message}) => {
try {
// Kafka 메시지는 문자열 형식으로 저장되므로, JSON 으로 파싱.
const parsedMessage = JSON.parse(message.value.toString());
const roomId = parsedMessage.room_id;
await handleMessage(roomId, parsedMessage);
console.log('Message consumed from Kafka:', {roomId, parsedMessage});
} catch (err) {
console.error('Error in KafkaConsumer_eachMessage: ', err);
}
},
});
isConsumerRunning = true; // Consumer 실행 상태 업데이트
} else {
console.log('Kafka Consumer is already running.');
} catch (err) {
console.error('Error in KafkaConsumer: ', err);
}
};
필자는 server.js 가 실행되고 socket.io 에 연결될 때 kafkaConsumer 가 동작되도록 설계하였다. 따라서 이 때 isConsumerRunning 을 통해 중복 실행되는 것을 방지하였다. 하지만 해당 플래그는 여러 요청이 빠르게 들어올 경우 subscribe() 가 중복 호출 될 가능성이 존재한다. 플래그가 true 가 되기 전에 또 다른 연결이 들어오면서 subscribe() 를 다시 호출하는 문제가 발생한 것이다.
현재 코드는 server.js → socketHandler.js → KafkaConsumer 순으로 동작한다. KafkaConsumer 의 실행 여부를 KafkaConsumer 에서 확인한다. 따라서 socketHandler.js 에서 먼저 중복 방지를 확인한 후 KafkaConsumer 로 넘겨주도록 변경해야 한다.
3. 변경 후 KafkaConsumer.js 와 socketHandler.js
const consumeMessageFromKafka = async (topic, handleMessage) => {
try {
if(isConsumerRunning) {
console.log("Kafka Consumer is already running.");
return;
}
console.log("📡 Connecting Kafka Consumer...");
await consumer.connect();
await consumer.subscribe({topic, fromBeginning: process.env.CONSUME_FROM_BEGINNING === 'true'});
isConsumerRunning = true;
await consumer.run({
eachMessage: async ({topic, partition, message}) => {
try {
const parsedMessage = JSON.parse(message.value.toString());
const roomId = parsedMessage.room_id;
await handleMessage(roomId, parsedMessage);
console.log('📩 Message consumed from Kafka:', {roomId, parsedMessage});
} catch (err) {
console.error('❌ Error in KafkaConsumer_eachMessage: ', err);
}
}
});
} catch (err) {
console.error('Error in KafkaConsumer: ', err);
}
};
const { consumeMessageFromKafka } = require('../services/KafkaConsumer');
let isKafkaInitialized = false; // Kafka 중복 실행 방지
const socketHandler = (io) => {
io.on('connection', (socket) => {
console.log('New user connected:', socket.id);
if (!isKafkaInitialized) { // Kafka Consumer가 한 번만 실행되도록 설정
console.log("Initializing Kafka Consumer...");
isKafkaInitialized = true;
}
});
};
module.exports = socketHandler;
- 서버가 처음 시작될 때 Kafka Consumer 를 실행한다.
- isKafkaInitialized 가 true 로 변경됨에 따라 이후 소켓이 연결되더라고 if 문이 실행되지 않는다.
- Kafka Consumer 는 run() 이 실행된 상태에서 계속 메시지를 소비한다.
4. 결과

서버 실행 후 user123 과 user456 을 동시에 접속시도하였을 때 오류 없이 정상적으로 작동 되는 것을 확인했다. 로그를 보면 user123 과 user456 이 socket 에 연결된 이후 Consumer 에 접근 되는 것을 확인할 수 있다.
'Project > ST00CK' 카테고리의 다른 글
gRPC 와 REST API 서버 동시에 사용하기 (0) | 2025.03.04 |
---|---|
Redis 를 활용한 오프라인 유저 찾기 로직 (0) | 2025.02.27 |
socket.io 방 나가기 기능 및 연결 해제 처리 (0) | 2025.02.25 |
gRPC로 오프라인 유저를 찾기 위한 로직 구현 (0) | 2025.02.24 |
Jenkins Pipeline 설정 및 K8S 배포를 위한 deployment 작성 (3) (1) | 2025.02.21 |