WoothAmwar commited on
Commit
022c2a8
·
1 Parent(s): def662e

Rought Flight MCP Server application

Browse files
Files changed (2) hide show
  1. app.py +347 -22
  2. requirements.txt +2 -1
app.py CHANGED
@@ -1,38 +1,363 @@
 
 
 
 
1
  import json
 
2
  import gradio as gr
3
- from textblob import TextBlob
4
 
5
- def sentiment_analysis(text: str) -> str:
 
 
 
6
  """
7
- Analyze the sentiment of the given text.
8
 
9
  Args:
10
- text (str): The text to analyze
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
11
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
12
  Returns:
13
- str: A JSON string containing polarity, subjectivity, and assessment
14
  """
15
- blob = TextBlob(text)
16
- sentiment = blob.sentiment
17
 
18
- result = {
19
- "polarity": round(sentiment.polarity, 2), # -1 (negative) to 1 (positive)
20
- "subjectivity": round(sentiment.subjectivity, 2), # 0 (objective) to 1 (subjective)
21
- "assessment": "positive" if sentiment.polarity > 0 else "negative" if sentiment.polarity < 0 else "neutral"
 
 
 
22
  }
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
23
 
24
- return json.dumps(result)
 
 
 
 
 
 
25
 
26
- # Create the Gradio interface
27
- demo = gr.Interface(
28
- fn=sentiment_analysis,
29
- inputs=gr.Textbox(placeholder="Enter text to analyze..."),
30
- outputs=gr.Textbox(), # Changed from gr.JSON() to gr.Textbox()
31
- title="Text Sentiment Analysis",
32
- description="Analyze the sentiment of text using TextBlob",
33
- api_name="sentiment_analysis"
34
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
 
36
  # Launch the interface and MCP server
37
  if __name__ == "__main__":
38
- demo.launch(mcp_server=True)
 
1
+ import os
2
+ from dotenv import load_dotenv
3
+ import requests
4
+ import re
5
  import json
6
+ from datetime import date
7
  import gradio as gr
8
+ from forex_python.converter import CurrencyRates
9
 
10
+ # Load environment variables from .env.local
11
+ load_dotenv(dotenv_path='.env.local')
12
+
13
+ def get_access_token() -> str|None:
14
  """
15
+ Get the Access Token required to make requests on the Amadeus API
16
 
17
  Args:
18
+
19
+ Returns:
20
+ str | None: A string containing the Access Token, or None if an error occurred.
21
+ """
22
+ api_key = os.environ.get("AMADEUS_API")
23
+ api_secret = os.environ.get("AMADEUS_SECRET")
24
+
25
+ # Check if env variables loaded
26
+ if not api_key or not api_secret:
27
+ print("Error: AMADEUS_API or AMADEUS_SECRET not found in .env.local")
28
+ return None
29
+
30
+ # Getting the Access Token to use for the API
31
+ token_url = "https://test.api.amadeus.com/v1/security/oauth2/token"
32
+ headers = {
33
+ "Content-Type": "application/x-www-form-urlencoded"
34
+ }
35
+ data = {
36
+ "grant_type": "client_credentials",
37
+ "client_id": api_key,
38
+ "client_secret": api_secret
39
+ }
40
+
41
+ try:
42
+ response = requests.post(token_url, headers=headers, data=data)
43
+ response.raise_for_status()
44
+
45
+ token_data = response.json()
46
+
47
+ # Check the 'state' field in the response JSON
48
+ if token_data.get('state') != 'approved':
49
+ print(f"Authorization not approved. State: {token_data.get('state')}")
50
+ return None
51
+
52
+ print("Authorization approved. Token received.")
53
+ return token_data.get("access_token")
54
+
55
+ except requests.exceptions.RequestException as e:
56
+ print(f"An error occurred during token request: {e}")
57
+ return None
58
+ except KeyError:
59
+ print("Error: 'access_token' not found in the response.")
60
+ return None
61
+
62
+ GLOBAL_TOKEN = get_access_token()
63
+
64
+ def get_date(year, month, day):
65
+ """
66
+ Returns date specified in ISO 8601
67
 
68
+ Args:
69
+ year (int): The year of the date.
70
+ month (int): The month of the date.
71
+ day (int): The day of the date.
72
+
73
+ Returns:
74
+ str: A string representing the date in ISO 8601 format.
75
+
76
+ """
77
+ return date(year, month, day).isoformat()
78
+
79
+
80
+ def get_flight_offers(departure_year: int, departure_month: int, departure_day: int, departureLocationCode: str, arrivalLocationCode: str) -> dict|None:
81
+ """
82
+ Obtain information about the flights offered between two locations from
83
+ various airlines like Frontier Airlines or United Airlines, but critically
84
+ does not contain flights from American Airlines or Delta Airlines.
85
+
86
+ Args:
87
+ departure_year (int): The year of the departure date.
88
+ departure_month (int): The month of the departure date.
89
+ departure_day (int): The day of the departure date.
90
+ departureLocationCode (str): The IATA code of the departure location.
91
+ arrivalLocationCode (str): The IATA code of the arrival location
92
+
93
  Returns:
94
+ dict: A JSON dictionary containing the flight offers and information about them.
95
  """
 
 
96
 
97
+ if not GLOBAL_TOKEN:
98
+ print("Cannot get flight offers without a valid access token.")
99
+ return
100
+
101
+ offer_url = "https://test.api.amadeus.com/v2/shopping/flight-offers" # Note: v2 for flight offers
102
+ headers = {
103
+ "Authorization": f"Bearer {GLOBAL_TOKEN}" # Correctly format the Authorization header
104
  }
105
+ # For GET requests, parameters are passed in 'params', not 'data'
106
+ params = {
107
+ "originLocationCode":departureLocationCode,
108
+ "destinationLocationCode":arrivalLocationCode,
109
+ "departureDate":get_date(departure_year, departure_month, departure_day),
110
+ "adults":1
111
+ }
112
+
113
+ try:
114
+ response = requests.get(offer_url, headers=headers, params=params)
115
+ response.raise_for_status()
116
+
117
+ # if write_to_file:
118
+ # with open("api_output.json", "w") as file:
119
+ # json.dump(response.json(), file, indent=2)
120
+
121
+ return response.json()
122
+ except requests.exceptions.RequestException as e:
123
+ print(f"An error occurred during flight offer request: {e}")
124
+
125
+
126
+ def path_is_direct(flight_path: dict) -> bool:
127
+ """
128
+ Determines if a path is a direct flight
129
+
130
+ Args:
131
+ flight_path (dict): A dictionary representing the flight path.
132
+
133
+ Returns:
134
+ bool: True if the path is a direct flight, False otherwise.
135
+ """
136
+ return len(list(flight_path.keys())) == 1
137
+
138
+
139
+ def parse_duration_to_minutes(duration_str: str) -> int:
140
+ """
141
+ Parses an ISO 8601 duration string (e.g., 'PT2H8M') and returns the total duration in minutes.
142
+
143
+ Args:
144
+ duration_str (str): The ISO 8601 duration string.
145
+
146
+ Returns:
147
+ int: The total duration in minutes.
148
+ """
149
+ if not duration_str.startswith('PT'):
150
+ return 0
151
+
152
+ hours = 0
153
+ minutes = 0
154
+
155
+ hours_match = re.search(r'(\d+)H', duration_str)
156
+ if hours_match:
157
+ hours = int(hours_match.group(1))
158
+
159
+ minutes_match = re.search(r'(\d+)M', duration_str)
160
+ if minutes_match:
161
+ minutes = int(minutes_match.group(1))
162
+
163
+ return (hours * 60) + minutes
164
+
165
+
166
+ def get_carrier_name(carrier_dict: dict[str, str], carrier_code: str) -> str:
167
+ """
168
+ From an IATA carrier code, returns the full name of the airline carrier
169
+
170
+ Args:
171
+ carrier_dict (dict[str, str]): A dictionary with keys of the IATA code
172
+ and values of the full airline carrier name.
173
+ carrier_code (str): The IATA code of the airline carrier.
174
+
175
+ Returns:
176
+ str: The full name of the airline carrier.
177
+ """
178
+ return carrier_dict.get(carrier_code, "Unknown")
179
+
180
+
181
+ def get_flight_paths(offers_json: dict) -> list[dict]:
182
+ """
183
+ Parses flight offers and returns a list of dictionaries,
184
+ each representing the path of a flight schedule and
185
+ info about the flight path.
186
+
187
+ Args:
188
+ offers_json (dict): A JSON dictionary containing the flight offers.
189
+
190
+ Returns:
191
+ list[dict]: A list of dictionaries representing the flight paths and info about the flights.
192
+ Each dictionary contains info on if the flight is a direct flight,
193
+ the flight price, currency, duration in minutes, airline carrier,
194
+ and flight path, all separated by id.
195
+ """
196
+ if not offers_json or 'data' not in offers_json:
197
+ return []
198
+
199
+ all_paths = []
200
+ for offer in offers_json['data']:
201
+ # Itineraries are a list, we'll process the first one.
202
+ if not offer.get('itineraries') or not offer.get('id'):
203
+ continue
204
+ itinerary = offer['itineraries'][0]
205
+ path = {}
206
+ path['id'] = offer['id']
207
+ path['price'] = offer['price']['total']
208
+ path['currency'] = offer['price']['currency']
209
+ path['duration_in_minutes'] = parse_duration_to_minutes(itinerary.get('duration', ''))
210
+ path['airline_carrier'] = get_carrier_name(
211
+ offers_json["dictionaries"]["carriers"],
212
+ offer['validatingAirlineCodes'][0]
213
+ )
214
+ path['flight_paths'] = {}
215
+
216
+ for segment in itinerary['segments']:
217
+ departure_code = segment['departure']['iataCode']
218
+ arrival_code = segment['arrival']['iataCode']
219
+ path['flight_paths'][departure_code] = arrival_code
220
+
221
+ path['is_direct_flight'] = path_is_direct(path['flight_paths'])
222
+ all_paths.append(path)
223
+ return all_paths
224
+
225
+ def sort_flight_offers(unsorted_flight_offers_json: str, sort_by: str) -> list[dict]:
226
+ """
227
+ Sorts a list of flight offer dictionaries by a specified key.
228
+
229
+ Args:
230
+ flight_offers_json (str): A JSON string representing the list of flight offers from get_flight_paths().
231
+ sort_by (str): The string of the key to sort by. Can be 'price' or 'duration_in_minutes'.
232
+
233
+ Returns:
234
+ list[dict]: A new list of flight offers sorted by the specified key.
235
+
236
+ Raises:
237
+ ValueError: If sort_by is not 'price' or 'duration_in_minutes'.
238
+ """
239
+ flight_offers = json.loads(unsorted_flight_offers_json)
240
+ if sort_by not in ['price', 'duration_in_minutes']:
241
+ raise ValueError("Can only sort by 'price' or 'duration_in_minutes'.")
242
+
243
+ return sorted(flight_offers, key=lambda x: x[sort_by])
244
+
245
+ def remove_non_direct(all_flight_offers_json: str) -> list[dict]:
246
+ """
247
+ Returns a list of the flight offers that are direct flights.
248
+
249
+ Args:
250
+ all_flight_offers_json (str): A JSON string representing the list of flight offers from get_flight_paths().
251
+
252
+ Returns:
253
+ list[dict]: A new list of flight offers that are all direct flights.
254
+ """
255
+ if all_flight_offers_json is None:
256
+ return []
257
+ all_flight_offers = json.loads(all_flight_offers_json)
258
+ return [offer for offer in all_flight_offers if offer.get('is_direct_flight')]
259
+
260
+ def convert_eur_to_usd(price_in_eur: float) -> float:
261
+ """
262
+ Converts a price from Euros to US Dollars using the current exchange rate.
263
+
264
+ Args:
265
+ price_in_eur (float): The price in Euros.
266
+
267
+ Returns:
268
+ float: The price in US Dollars.
269
+ """
270
+ c = CurrencyRates()
271
+ exchange_rate = c.get_rate('EUR', 'USD')
272
+ price_in_usd = price_in_eur * exchange_rate
273
+ return price_in_usd
274
+
275
+ def print_paths():
276
+ global GLOBAL_TOKEN
277
+ # GLOBAL_TOKEN = get_access_token()
278
+ GLOBAL_TOKEN = None
279
+ if GLOBAL_TOKEN:
280
+ offers_json = get_flight_offers(2025, 9, 18, "CLT", "ORD")
281
+ else:
282
+ print("Using saved info")
283
+ with open("api_output.json", "r") as f:
284
+ offers_json = json.loads(f.read())
285
+
286
+ if offers_json:
287
+ flight_paths = get_flight_paths(offers_json)
288
+ for i, path in enumerate(flight_paths, 1):
289
+ print(f"Offer {i} Path: {path}")
290
+
291
+ def get_flight_offers_and_paths(
292
+ departure_year: int,
293
+ departure_month: int,
294
+ departure_day: int,
295
+ departureLocationCode: str,
296
+ arrivalLocationCode: str
297
+ ) -> list[dict] | None:
298
+ """
299
+ A wrapper function that first gets flight offers and then immediately
300
+ parses them to get the flight paths.
301
+
302
+ Args:
303
+ departure_year (int): The year of the departure date.
304
+ departure_month (int): The month of the departure date.
305
+ departure_day (int): The day of the departure date.
306
+ departureLocationCode (str): The IATA code of the departure location.
307
+ arrivalLocationCode (str): The IATA code of the arrival location
308
+
309
+ Returns:
310
+ list[dict]: A list of dictionaries representing the flight paths and info about the flights.
311
+ Each dictionary contains info on if the flight is a direct flight,
312
+ the flight price, currency, duration in minutes, airline carrier,
313
+ and flight path, all separated by id.
314
+ """
315
 
316
+ offers_json = get_flight_offers(
317
+ departure_year, departure_month, departure_day,
318
+ departureLocationCode, arrivalLocationCode
319
+ )
320
+ if offers_json:
321
+ return get_flight_paths(offers_json)
322
+ return None
323
 
324
+ with gr.Blocks() as demo:
325
+ gr.Markdown("# Amadeus Flight Offers API")
326
+ with gr.Tab("Get Flight Offers"):
327
+ with gr.Row():
328
+ dep_year = gr.Number(label="Departure Year", value=2025)
329
+ dep_month = gr.Number(label="Departure Month", value=9)
330
+ dep_day = gr.Number(label="Departure Day", value=18)
331
+ with gr.Row():
332
+ dep_loc = gr.Textbox(label="Departure Location Code", value="CLT")
333
+ arr_loc = gr.Textbox(label="Arrival Location Code", value="ORD")
334
+ get_offers_btn = gr.Button("Get Offers")
335
+ offers_output = gr.JSON(label="Flight Offers")
336
+ get_offers_btn.click(
337
+ fn=get_flight_offers_and_paths,
338
+ inputs=[dep_year, dep_month, dep_day, dep_loc, arr_loc],
339
+ outputs=offers_output,
340
+ api_name="get_flight_offers_and_paths"
341
+ )
342
+ with gr.Tab("Sort Flight Offers"):
343
+ sort_input_offers = gr.Textbox(label="Flight Offers (JSON from 'Get Flight Offers' tab)")
344
+ sort_by = gr.Radio(['price', 'duration_in_minutes'], label="Sort By", value='price')
345
+ sort_button = gr.Button("Sort Offers")
346
+ sorted_offers_output = gr.JSON(label="Sorted Flight Offers")
347
+ sort_button.click(fn=sort_flight_offers, inputs=[sort_input_offers, sort_by], outputs=sorted_offers_output, api_name="sort_flight_offers")
348
+ with gr.Tab("Filter Direct Flight Offers"):
349
+ filter_input_offers = gr.Textbox(label="Flight Offers (JSON from 'Get Flight Offers' tab)")
350
+ filter_button = gr.Button("Filter Offers")
351
+ filter_offers_output = gr.JSON(label="Filtered Flight Offers")
352
+ sort_button.click(fn=remove_non_direct, inputs=[filter_input_offers], outputs=filter_offers_output, api_name="remove_non_direct")
353
+ with gr.Tab("Convert EUR to USD"):
354
+ eur_input = gr.Number(label="Price in EUR")
355
+ convert_button = gr.Button("Convert to USD")
356
+ usd_output = gr.Number(label="Price in USD")
357
+ convert_button.click(
358
+ fn=convert_eur_to_usd, inputs=[eur_input], outputs=usd_output, api_name="convert_eur_to_usd"
359
+ )
360
 
361
  # Launch the interface and MCP server
362
  if __name__ == "__main__":
363
+ demo.launch(mcp_server=True)
requirements.txt CHANGED
@@ -1,2 +1,3 @@
1
  gradio[mcp]
2
- textblob
 
 
1
  gradio[mcp]
2
+ textblob
3
+ forex-python