개발자
api 를 만들고 Airflow 를 사용해서 ETL 파이프라인을 만드는 도중에 생긴 문제인데요, simplehttpOperator 로 api 에서 결과를 가져왔고 (성공), 결과값을 transform 하려고 simplehttpOperator 를 xcom_pull 하는 코드를 짰습니다. (코드는 아래와 같습니다.) def _processing_api(ti): if not len(assets): raise ValueError('empty') assets = ti.xcom_pull(task_ids = 'extract_riot_api') 결과는 ValueError 를 리턴했습니다. 이유를 찾으려고 riot api 에서 테스트를 시작했고 riot api 웹에서 얻어지는 리턴값을 airflow UI 의 simplehttpOperator 의 xcom 에서 key - return_value , value - [{}, {}, ...] json 형식의 값을 확인할 수 있었는데, 역시 xcom_pull 을 한 결과가 ValueError 였습니다. extract_data 는 api 에서 json data 를 가져오는 코드이고 _processing_api 함수는 pythonoperator 에서 쓰려고 만든 코드입니다. stackoverflow, airflow docs 등을 3~4 일 정도 본 것 같은데, 해결하지 못했네요.. 비슷한 경험을 해결하셨던 분 계신가요 ..? 어디 물어볼 곳도 없어서.. 여기에 마지막으로 올려봐요. 아시는 분 도움주시면 감사하겠습니다
1extract_data = SimpleHttpOperator(
2 task_id = 'extract_riot_api',
3 http_conn_id = 'riot_api',
4 endpoint = f"lol/league-exp/v4/entries/RANKED_SOLO_5x5/CHALLENGER/I?
5 page=1&api_key={API_KEY}",
6 method = 'GET',
7 response_filter = lambda x: json.loads(x.text),
8 log_response = True,
9 )
10
11def _processing_api(ti):
12 assets = ti.xcom_pull(task_ids = 'extract_riot_api') # cross communication
13 if not len(assets):
14 raise ValueError('empty')
15 print(assets)
16
17 process_api_data = PythonOperator(
18 task_id = 'process_api_data',
19 python_callable = _processing_api,
20 )
답변 1
혹시 task 결과가 xcom 테이블에 들어갔는지 확인이 필요해보입니다! 연동하신 (데이터베이스)에 접속하셔서 xcom 테이블에 해당 task id 로 쿼리 조회를 한번해보시는 것도 좋을 것 같네요
동규
작성자
코드스테이츠 AI AI • 2023년 08월 05일
Airflow ui 에서 simplehttpoperator 의 xcom 에들어기서 확인한 key valie 가 첫번째 사진입니다. 왜 xcom pull이 안될까요
최충은
Community dev • 2023년 08월 07일
제 화면에서는 이미지가 안보이네요... 올려주신 코드가 원본 코드 전체라면 의존성 명시가 없어서 'extract_riot_api' 와 'process_api_data'가 동시에 실행되기 때문에 생기는 문제 일 수 도 있을 것 같네요 extract_riot_api >> process_api_data 와 같이 의존성 명시하시구 한번 더 확인해보시면 될 것 같습니다
지금 가입하면 모든 질문의 답변을 볼 수 있어요!
현직자들의 명쾌한 답변을 얻을 수 있어요.
이미 회원이신가요?
지금 가입하면 모든 질문의 답변을 볼 수 있어요!