Data Engineering/spark 19

Yarn log 확인

로그 파일 위치 Command 로 확인 1. 특정 application 로그 확인 yarn logs -applicationId 2. 에러로그만 확인 yarn logs -applicationId -log_files stderr 3. container별 로그 확인 # application에서 사용하는 container 전체출력 yarn logs -applicationId -show_application_log_info # 위에 container정보에서 특정 container에 대한 로그 확인 yarn logs -applicationId -containerId [spark application architecture] executor는 항상 실행 중 작업이 없더라도 이 모델의 장점은 빠르다. 빨리 뜨고 빠르게..

kafka 와 spark stream의 데이터 처리 방식

나는 exactly once를 구현했다. 무슨 뜻이냐면... 『데이터는 정확히 한번 처리된다』 당연히 데이터가 여러번 처리되면 안되겠지.. Spark Streaming는 처음에 Receiver를 이용하여 Kafka에 초기에 한번 접속 후 데이터를 받아오는 구조로 설계되었다. 그 후에 복구를 위해 WAL 기능이 추가되었는데 이게 문제가 되는게 Receiver가 데이터를 가져오자마자 WAL에 저장하고 데이터 처리 도중에 죽었다면 Spark Streaming은 기동 시 WAL 에서 데이터를 꺼내서 처리하고 Kafka는 아까 읽었던 부분부터 Read Offset을 다시 수행한다. 데이터가 중복된다는 뜻이지. 이걸 방지하기 위해 Kafka Direct API를 제공하는데 메카니즘은 이렇다. Offset 관리를 일..

spark 성능 개선

SPARK job 성능에 중요한 영향을 미치는 설정 값들에 대해 정리한다. 1. 용어정리 하나의 Spark executor = 하나의 YARN container 라고 보면된다.그리고 하나의 task = 하나의 core = 하나의 vcore 다.하나의 executor가 여러개의 task를 동시에(concurrent) 부릴 수 있다. Driver : 중앙 조정자Executor : 분산 작업 노드 드라이버와 익스큐터는 각각 독립된 자바 프로세스 1. 드라이버 사용자의 main 메소드가 실행되는 프로세스 • 주요 역할 사용자 프로그램을 태스크로 변환하여 클러스터로 전송 1. 연산들의 관계를 DAG(Directed Acyclic Graph) 생성 2. DAG를 물리적인 실행 계획으로 변환 •최적화를 거쳐 여러 개..

머신러닝과 딥러닝

둘다 AI의 범주에 속하는 방법론인데 머신러닝과 딥러닝은 다르다. '사람'이 하느냐, '기계'가 하느냐에 따라 구분된다. 가령 머신러닝은 개발자가 다양한 고양이 사진을 컴퓨터에게 보여주고 ‘이것이 고양이이다.’라고 알려준 다음, 새로운 고양이 사진을 보았을 때 ‘고양이’라고 판단할 수 있도록 하는 것이며, 딥러닝은 스스로가 여러 가지 고양이 사진을 찾아보고 '고양이'에 대해 학습한 다음 새로운 고양이 사진을 보고 '고양이'라고 구분하는 것이다.

업무 범위

Spark는 ML을 지원하는 분석툴도 가능하지만 나는 spark를 big data engineering Tool로서 사용한다. 내가 지금 하는 일은 Data Science가 아니다.Data Engineering 이다. 분석가가 만든 모델을 주기적으로 학습시켜 정확도가 확보된 모델을 자동으로 생성시킨다. 또한 분석가가 필요로 하는 데이터를 확보할 수 있도록 데이터 파이프라인들을 만든다.주로 HBase에서 꺼내다가 전처리를(StandardScaling, Outlier Remove, etc) 한 후 분석가에게 갖다 바친다. 데이터 엔지니어로 시작했다가 지금은 비즈니스적인 문제를 ML/DL을 이용하여 해결하는 역할을 하고 있다.데이터 수집/분석/전처리/학습/모델링/Serving/운영까지 다 하는 Full Sta..

혼잡 인지 모델 검증(Loss 구하기)

현재 진행중인 혼잡 인지 모델 개발 과제에서 Spark으로 구현한 Linear Regression 모델의 성능을 측정하였다. 먼저 CrossValidation Class를 이용하여 교차 검증을 하였다. CrossValidation 을 생성하면 Default 가 로딩된 데이터를 3벌로 나누고 2벌은 Training으로 사용하고 1벌은 Test 용도로 사용된다. (70:30 비율) val crossval = new CrossValidator() .setEstimator(pipeline) .setEvaluator(new RegressionEvaluator().setLabelCol("loading_time")) .setEstimatorParamMaps(paramGrid) Spark.org에 가면 CrossVal..