Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement LivyOperatorAsync #178

Merged
merged 32 commits into from
Apr 8, 2022
Merged
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c3aecc4
Add apache livy requirements in setup.cfg
sunank200 Mar 31, 2022
e725c8d
Add LivyOperatorAsync, LivyTrigger and LivyHookAsync
sunank200 Mar 31, 2022
3c62273
Add example DAG for LivyOperatorAsync and __init__.py
sunank200 Mar 31, 2022
1324b50
Propagate the log from triggerer to worker
sunank200 Mar 31, 2022
78c2da5
Add __init__.py in folder structure
sunank200 Mar 31, 2022
0f23c82
Propagate the log in case of error from triggerer to worker
sunank200 Apr 1, 2022
d887c6d
Change sleep to asyncio.sleep
sunank200 Apr 1, 2022
e9566bf
Add request_mocks in tests in setup.cfg
sunank200 Apr 1, 2022
8d0d354
Add the tests for hooks, operators and triggerer
sunank200 Apr 1, 2022
3ef53ac
Fix the typo in test for triggers
sunank200 Apr 1, 2022
71eebbe
Add missing log lines and change the test accordingly
sunank200 Apr 2, 2022
c924300
Add test for error scenarios for hooks
sunank200 Apr 4, 2022
644dc92
Remove comment for apache foundation license
sunank200 Apr 4, 2022
057ddb8
Add the comment for module name in trigger
sunank200 Apr 4, 2022
04c8937
Add extra_option example in docstring
sunank200 Apr 4, 2022
64a3792
Remove the licenses for Apache Software Foundation
sunank200 Apr 5, 2022
132fa19
Rewrite the relevant docstrings in tests
sunank200 Apr 5, 2022
54da2a1
Add the correct typehint as per docstring and remove dict from logs
sunank200 Apr 5, 2022
24092a3
Add getter to dictionary object
sunank200 Apr 5, 2022
f1f7fe1
Remove the redundant logic from the trigger
sunank200 Apr 5, 2022
57986db
Add relavent docstring for poll interval
sunank200 Apr 5, 2022
8fab5a5
Add relavent docstrings for polling_interval in trigger
sunank200 Apr 5, 2022
18c4719
Add example for spark_params in docstrings
sunank200 Apr 5, 2022
dd0e58f
remove request_mock
sunank200 Apr 5, 2022
8fdfea4
Fix mypy errors
sunank200 Apr 5, 2022
e14f0f2
Update setup.cfg
sunank200 Apr 5, 2022
88b1875
Merge branch 'livy-operator-async' of https://github.com/astronomer/a…
sunank200 Apr 5, 2022
a045201
Update astronomer/providers/apache/livy/hooks/livy.py
sunank200 Apr 5, 2022
cc5ada8
Fix the docstrings in build_post_batch_body in the hooks
sunank200 Apr 5, 2022
7b25ec1
Merge branch 'livy-operator-async' of https://github.com/astronomer/a…
sunank200 Apr 5, 2022
0ddab2a
Remove doc string from private method
sunank200 Apr 5, 2022
62f2d60
remove the liscense and add tag to example dag
sunank200 Apr 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Empty file.
Empty file.
56 changes: 56 additions & 0 deletions astronomer/providers/apache/livy/example_dags/example_livy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

"""
This is an example DAG which uses the LivyOperatorAsync.
The tasks below trigger the computation of pi on the Spark instance
using the Java and Python executables provided in the example library.
"""
import os
from datetime import datetime

from airflow import DAG

from astronomer.providers.apache.livy.operators.livy import LivyOperatorAsync

LIVY_JAVA_FILE = os.environ.get("LIVY_JAVA_FILE", "/spark-examples.jar")
LIVY_PYTHON_FILE = os.environ.get("LIVY_PYTHON_FILE", "/user/hadoop/pi.py")

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This DAG still needs work to be self sufficient. We might want to use existing operators and boto3 api to

  • spin up the EMR cluster along with Livy, and Spark
  • create the SSH tunnel
  • execute job using this async livy operator
  • terminate EMR cluster

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I am working on this right now.

with DAG(
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
dag_id="example_livy_operator",
default_args={"args": [10]},
schedule_interval="@daily",
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
start_date=datetime(2021, 1, 1),
catchup=False,
) as dag:
sunank200 marked this conversation as resolved.
Show resolved Hide resolved

# [START create_livy]
livy_java_task = LivyOperatorAsync(
task_id="pi_java_task",
file=LIVY_JAVA_FILE,
num_executors=1,
conf={
"spark.shuffle.compress": "false",
},
class_name="org.apache.spark.examples.SparkPi",
polling_interval=0,
sunank200 marked this conversation as resolved.
Show resolved Hide resolved
)

livy_python_task = LivyOperatorAsync(task_id="pi_python_task", file=LIVY_PYTHON_FILE, polling_interval=30)

livy_java_task >> livy_python_task
# [END create_livy]
Empty file.
Loading