File size: 5,349 Bytes
e38e721
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
de40a02
 
 
 
e38e721
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import requests
from bs4 import BeautifulSoup
import json

app = FastAPI()

class ArticleScraper:
    def __init__(self):
        self.scraper_api_key = "24610cfe7680c5a15d77bd32cfd23fc3"
        self.scraper_api_url = "http://api.scraperapi.com/"

    def scrape_bloomberg(self, article_url):
        params = {
            "api_key": self.scraper_api_key,
            "url": article_url,
        }
        response = requests.get(self.scraper_api_url, params=params)
        html = response.text
        soup = BeautifulSoup(html, 'html.parser')
        script = soup.find('script', {'id': '__NEXT_DATA__'})
        json_data = json.loads(script.text)
        props = json_data['props']['pageProps']
        contents = props['story']['body']['content']
        
        article_text = []
        for item in contents:
            text = self.extract_text(item)
            if text:
                article_text.append(text)

        return '\n\n'.join(article_text)

    def scrape_financial_times(self, article_url):
        headers = {
            'Referer': 'https://twitter.com'
        }
        cookies = {
            'FTCookieConsentGDPR': 'true',
            'FTAllocation': '00000000-0000-0000-0000-000000000000'
        }
        response = requests.get(article_url, headers=headers, cookies=cookies)
        
        if response.status_code == 200:
            soup = BeautifulSoup(response.content, 'html.parser')
            article_script = soup.find('script', {'type': 'application/ld+json'})
            
            if article_script:
                article_data = json.loads(article_script.string)
                return article_data.get('articleBody', '')
            else:
                return "Article content not found in the expected format."
        else:
            return f"Failed to retrieve the webpage. Status code: {response.status_code}"

    def extract_text(self, content_item):
        if content_item['type'] == 'paragraph':
            text_parts = []
            for item in content_item['content']:
                if item['type'] == 'text':
                    text_parts.append(item['value'])
                elif item['type'] == 'entity':
                    if 'link' in item['data'] and item['data']['link']['destination'].get('web'):
                        url = item['data']['link']['destination']['web']
                        text = ' '.join([sub_item['value'] for sub_item in item['content'] if sub_item['type'] == 'text'])
                        text_parts.append(f"[{text}]({url})")
                    else:
                        text_parts.extend([sub_item['value'] for sub_item in item['content'] if sub_item['type'] == 'text'])
                elif item['type'] == 'link':
                    url = item['data']['destination'].get('web', '')
                    text = ' '.join([sub_item['value'] for sub_item in item['content'] if sub_item['type'] == 'text'])
                    if url:
                        text_parts.append(f"[{text}]({url})")
                    else:
                        text_parts.append(text)
            return ' '.join(text_parts)
        elif content_item['type'] == 'entity' and content_item['subType'] == 'story':
            url = content_item['data']['link']['destination'].get('web', '')
            text = ' '.join([sub_item['value'] for sub_item in content_item['content'] if sub_item['type'] == 'text'])
            return f"Read More: [{text}]({url})"
        elif content_item['type'] == 'media' and content_item['subType'] == 'photo':
            photo_data = content_item['data']['photo']
            caption = photo_data.get('caption', '')
            credit = photo_data.get('credit', '')
            src = photo_data.get('src', '')
            alt = photo_data.get('alt', '')
            return f"\n![{alt}]({src})\n*{caption}* {credit}\n"
        elif content_item['type'] == 'media' and content_item['subType'] == 'chart':
            chart_data = content_item['data']['chart']
            attachment = content_item['data']['attachment']
            title = attachment.get('title', '')
            subtitle = attachment.get('subtitle', '')
            source = attachment.get('source', '')
            fallback_image = chart_data.get('fallback', '')
            footnote = attachment.get('footnote', '')
            return f"\n![{title}]({fallback_image})\n**{title}**\n*{subtitle}*\n{footnote}\n{source}\n"
        return ''

    def scrape_article(self, url):
        if 'bloomberg.com' in url:
            return self.scrape_bloomberg(url)
        elif 'ft.com' in url:
            return self.scrape_financial_times(url)
        else:
            return "Unsupported website. Please provide a URL from Bloomberg or Financial Times."

class ArticleRequest(BaseModel):
    url: str

@app.get("/")
async def hello():
    return {"response" : "Greetings from SS!"}

@app.post("/scrape_article/")
async def scrape_article(request: ArticleRequest):
    scraper = ArticleScraper()
    content = scraper.scrape_article(request.url)
    if "Unsupported website" in content:
        raise HTTPException(status_code=400, detail=content)
    return {"content": content}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=7860)