05 Logstash
05 Logstash
1. Note
1. Note
ES 진영에서 제공 하는 애플리케이션
구성 역할 핵심 기능 특징 Filebeat 로그 수집 에이전트 서버 로그 파일 읽어서 전달 매우 가볍고 단순, 거의 가공 없음 Logstash 데이터 처리 파이프라인 수집된 로그 가공/필터링/변환 후 전달 무겁지만 강력한 데이터 처리 기능 Elasticsearch 데이터 저장/검색 엔진 대용량 데이터 저장, 검색, 집계 빠른 검색이 핵심 Kibana 시각화 도구 검색 데이터 기반 대시보드/그래프 제공 로그 분석 화면 구성 - ELK (Elasticsearch / Logstash / Kibana)
- 분리해서 고려할점
- 이 부분에서 인지할 부분은 카비나가 필수는 아님
- 카비나는 데이터를 시각화 해주는 용도로 사용함.
- Logstash는 그냥 파이프라인임
- 필요에 따라서 그냥 옮기는 용도로 사용할수도 있기는 함.
2. Logstash
1. Logstash
- 다양한 시스템에서 발생하는 데이터를 모아 처리하는 데이터 파이프라인 도구
- 수집한 데이터는 그대로 전달되지 않고, 필터를 통해 파싱·정제·형식 변환 과정을 거침
- 이 과정에서 비정형 로그는 검색 가능한 구조화된 데이터로 바뀜
- 최종적으로 Elasticsearch, Kafka 같은 외부 시스템으로 전달되어 저장·분석에 사용
- Elasticsearch 진영이 제공하는 데이터 수집·가공용 별도 애플리케이션
2. 특징
1. 다양한 데이터 입력 지원
- Logstash는 거의 모든 형태의 데이터 소스를 받을 수 있음
- 파일 로그, TCP/UDP 스트림, HTTP 요청, Kafka 메시지 등 다양한 입력을 처리할 수 있음
- 특정 시스템에 종속되지 않고 “데이터 수집 허브” 역할을 함.
2. 파이프라인 구조 (Input → Filter → Output)
- 데이터 처리 흐름이 Input → Filter → Output으로 명확하게 분리됨
- Input에서 데이터를 받고 Filter에서 가공한 뒤 Output으로 전달함
- 단계별 구조 덕분에 설정이 직관적이고 유지보수가 쉬움
3. 강력한 데이터 가공 기능
- Filter 단계에서 로그를 자유롭게 변환할 수 있음
- grok으로 문자열 로그를 구조화된 필드로 분해 가능
- date, mutate 등으로 타입 변환, 필드 수정, 정규화 수행 가능
- 비정형 로그를 검색 가능한 데이터로 바꾸는 핵심 역할 수행
4. 플러그인 기반 구조
- Input / Filter / Output 모두 플러그인 형태로 구성됨
- 필요한 기능을 조합해서 사용하는 구조
- Kafka, Elasticsearch, CSV, JDBC 등 다양한 플러그인 제공
- 확장성이 높아 다양한 환경에 적용 가능
5. 비정형 데이터 처리에 최적화
- 형식이 일정하지 않은 로그 데이터 처리에 강점이 있음
- 단순 JSON뿐 아니라 자유 텍스트 로그도 처리 가능
- 패턴 기반 파싱을 통해 구조화된 데이터로 변환
6. 외부 시스템 연동 중심 구조
- 자체 저장 기능은 없음
- 처리된 데이터를 Elasticsearch, Kafka, Redis, 파일 등으로 전달함
- 데이터 “저장”보다 “전처리 및 전달”에 집중된 구조
7. 독립 실행형 애플리케이션
- Elasticsearch 내부 기능이 아니라 별도 프로세스로 실행됨
- Java 기반으로 동작하는 독립 서버 프로그램
- ES와 HTTP 기반으로 통신하는 구조
8. 상대적으로 무거운 처리 구조
- JVM 기반이라 메모리/CPU 사용량이 높은 편
- 단순 로그 수집에는 과한 경우가 있음
- 복잡한 변환이 필요할 때 사용하는 것이 일반적
3. 안정성 관련
1. Persistent Queue
- Logstash 내부에 있는 디스크 기반 큐
- Input → 바로 Output으로 보내지 않음
- 중간에 디스크에 저장해두고 처리
- 장애가 나도 데이터가 남아 있음
흐름
1 2 3 4 5
Input → PQ(디스크 저장) → Filter → Output # Logstash가 죽어도 데이터 유지됨 # Output이 느려도 쌓아둘 수 있음 # 재시작 후 이어서 처리 가능
2. Dead Letter Queue
- 처리 중 완전히 실패한 이벤트를 따로 저장
- 데이터 버리지 않음
- 별도 파일로 저장됨
- 나중에 재처리 가능
- 운영자가 분석 가능
- 들어가는 케이스
- Elasticsearch mapping 오류
- 필드 타입 불일치
- index 존재 문제
- 파싱 실패 후 복구 불가
4. Logstash 아키텍처
1. 전체 흐름
1
2
3
4
5
6
7
8
9
10
11
# 일반적인 Logstash 아키텍처 표현
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Input │→ │ Filter │ → │ Output │
└─────────┘ └─────────┘ └─────────┘
(수집) (변환) (전송)
# 실제 Logstash 아키텍처
┌─────────┐ ┌───────┐ ┌─────────┐ ┌───────┐ ┌─────────┐
│ Input │ → │ Codec │ → │ Filter │ → │ Codec │ → │ Output │
└─────────┘ └───────┘ └─────────┘ └───────┘ └─────────┘
(수집) (디코딩) (변환) (인코딩) (전송)
2. Input (데이터 수신)
- 여러 플러그인으로 부터 데이터를 수집함.
- 주요 Input Plugin
- File : 로컬 파일에서 로그 데이터 수집
- Beats : Filebeat와 같은 로그 수집기를 통해 데이터 수집
- HTTP : HTTP 요청을 통해 데이터 수집
- JDBC : 데이터베이스에서 데이터 수집
- Kafka : Apache Kafka를 통해 실시간 데이터 스트리밍
3. Filter 단계 (가공)
- 데이터를 변환, 정제 및 가공함.
- 주요 플러그인
- grok : 정규식을 사용하여 로그를 분석하고 구조화된 데이터로 변환
- mutate : 데이터 필드를 수정, 추가, 삭제 (데이터 가공)
- json : JSON 데이터를 파싱하여 필드로 변환
- date : 날짜 데이터를 표준 형식으로 변환
- geoip : IP 주소를 기반으로 위치 정보를 추가
4. Output 단계 (전송)
- Output은 가공된 데이터를 다양한 대상으로 전송함.
- 주요 플러그인
- ElasticSearch : 데이터를 Elasticsearch로 전송하여 검색 및 분석 가능
- File : 변환된 데이터를 파일로 저장
- Stdout : 터미널에 데이터 출력 (디버깅용)
- Kafka : Apache Kafka로 데이터 전달
5. Codec
- 입력 시 raw 바이트를 이벤트로 디코딩, 출력 시 이벤트를 형식 있는 바이트로 인코딩.
- 규격이 전부 다르기때문에 그거에 맞게 조정함
4. Logstash 구축
1. 초기형
1. 디렉토리 구조
1
2
3
4
5
6
project-root/
├── docker-compose.yml
├── pipeline/
│ └── logstash.conf
└── logs/
└── app.log
2. Logstash 생성
- logstash 생성 yaml
1 2 3 4 5 6 7 8 9 10 11
services: logstash: image: docker.elastic.co/logstash/logstash:8.5.0 container_name: logstash environment: - "LS_JAVA_OPTS=-Xms256m -Xmx256m" volumes: - ./pipeline:/usr/share/logstash/pipeline # logstash 설정 mount # - ./logs:/logs # 로그 파일 mount - Volumes 경로 설정
- 내 PC는 dokcer-compose.yaml 실행 한 디렉토리가 기준
- 비교
내 PC(project-root 기준) 컨테이너 내부 역할 ./pipeline/usr/share/logstash/pipelineLogstash 설정(conf) ./logs/logs읽을 로그 파일
3. conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
input {
file {
path => "/logs/*.log"
# 처음부터 읽기
start_position => "beginning"
# 재시작 시 다시 읽기 (테스트용)
sincedb_path => "/dev/null"
}
}
filter {
}
output {
stdout {
codec => rubydebug
}
}
2. logstash 로그를 ELS가 읽음
1. docker Copmse.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
logstash:
image: docker.elastic.co/logstash/logstash:8.5.0
container_name: logstash
volumes:
- ./pipeline:/usr/share/logstash/pipeline
- ./logs:/logs # 여기에 특정 AP가 로그를 떨궈줘야함.
depends_on:
- elasticsearch
2. logstash.conf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
input {
file {
path => "/logs/*.log"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "app-log"
}
stdout {
codec => rubydebug
}
}
This post is licensed under CC BY 4.0 by the author.