zhuwq0 commited on
Commit
8111b6b
1 Parent(s): 0f20df9
Files changed (4) hide show
  1. .gitattributes +6 -0
  2. Dockerfile +14 -22
  3. example_fastapi.ipynb +0 -0
  4. phasenet/app.py +17 -134
.gitattributes ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ model/190703-214543/loss.log filter=lfs diff=lfs merge=lfs -text
2
+ model/190703-214543/model_95.ckpt.data-00000-of-00001 filter=lfs diff=lfs merge=lfs -text
3
+ model/190703-214543/model_95.ckpt.index filter=lfs diff=lfs merge=lfs -text
4
+ model/190703-214543/model_95.ckpt.meta filter=lfs diff=lfs merge=lfs -text
5
+ model/190703-214543/checkpoint filter=lfs diff=lfs merge=lfs -text
6
+ model/190703-214543/config.log filter=lfs diff=lfs merge=lfs -text
Dockerfile CHANGED
@@ -1,28 +1,20 @@
1
- FROM tensorflow/tensorflow
2
 
3
- # Create the environment:
4
- # COPY env.yml /app
5
- # RUN conda env create --name cs329s --file=env.yml
6
- # Make RUN commands use the new environment:
7
- # SHELL ["conda", "run", "-n", "cs329s", "/bin/bash", "-c"]
8
 
9
- RUN pip install tqdm obspy pandas
10
- RUN pip install uvicorn fastapi kafka-python
11
 
12
- WORKDIR /opt
 
13
 
14
- # RUN wget https://github.com/AI4EPS/models/releases/download/phasenet-2018/model.tar && tar -xvf model.tar && rm model.tar
 
 
15
 
16
- # Copy files
17
- COPY phasenet /opt/phasenet
18
- # COPY model /opt/model
19
- RUN wget https://github.com/AI4EPS/models/releases/download/PhaseNet-2018/model.tar && tar -xvf model.tar && rm model.tar
20
 
21
- # Expose API port
22
- EXPOSE 8000
23
-
24
- ENV PYTHONUNBUFFERED=1
25
-
26
- # Start API server
27
- #ENTRYPOINT ["conda", "run", "--no-capture-output", "-n", "cs329s", "uvicorn", "--app-dir", "phasenet", "app:app", "--reload", "--port", "8000", "--host", "0.0.0.0"]
28
- ENTRYPOINT ["uvicorn", "--app-dir", "phasenet", "app:app", "--reload", "--port", "7860", "--host", "0.0.0.0"]
 
1
+ FROM python:3.9
2
 
3
+ RUN useradd -m -u 1000 user
4
+ USER user
5
+ ENV PATH="/home/user/.local/bin:$PATH"
 
 
6
 
7
+ WORKDIR /app
 
8
 
9
+ COPY --chown=user ./requirements.txt requirements.txt
10
+ RUN pip install --no-cache-dir --upgrade -r requirements.txt
11
 
12
+ # COPY phasenet /app/phasenet
13
+ # COPY model /app/model
14
+ # RUN wget https://github.com/AI4EPS/models/releases/download/PhaseNet-2018/model.tar && tar -xvf model.tar && rm model.tar
15
 
16
+ # EXPOSE 80
 
 
 
17
 
18
+ RUN git lfs pull
19
+ COPY --chown=user . /app
20
+ ENTRYPOINT ["uvicorn", "--app-dir", "phasenet", "app:app", "--reload","--host", "0.0.0.0", "--port", "7860"]
 
 
 
 
 
example_fastapi.ipynb ADDED
The diff for this file is too large to render. See raw diff
 
phasenet/app.py CHANGED
@@ -1,23 +1,21 @@
1
  import os
2
- from collections import defaultdict, namedtuple
3
  from datetime import datetime, timedelta
4
- from json import dumps
5
- from typing import Any, AnyStr, Dict, List, NamedTuple, Union, Optional
6
 
7
  import numpy as np
8
- import requests
9
  import tensorflow as tf
10
  from fastapi import FastAPI, WebSocket
11
- from kafka import KafkaProducer
12
  from pydantic import BaseModel
13
  from scipy.interpolate import interp1d
14
 
15
- from model import ModelConfig, UNet
16
- from postprocess import extract_picks
 
17
 
18
  tf.compat.v1.disable_eager_execution()
19
  tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
20
- PROJECT_ROOT = os.path.realpath(os.path.join(os.path.dirname(__file__), ".."))
21
  JSONObject = Dict[AnyStr, Any]
22
  JSONArray = List[Any]
23
  JSONStructure = Union[JSONArray, JSONObject]
@@ -39,41 +37,6 @@ latest_check_point = tf.train.latest_checkpoint(f"{PROJECT_ROOT}/model/190703-21
39
  print(f"restoring model {latest_check_point}")
40
  saver.restore(sess, latest_check_point)
41
 
42
- # GAMMA API Endpoint
43
- GAMMA_API_URL = "http://gamma-api:8001"
44
- # GAMMA_API_URL = 'http://localhost:8001'
45
- # GAMMA_API_URL = "http://gamma.quakeflow.com"
46
- # GAMMA_API_URL = "http://127.0.0.1:8001"
47
-
48
- # Kafak producer
49
- use_kafka = False
50
-
51
- try:
52
- print("Connecting to k8s kafka")
53
- BROKER_URL = "quakeflow-kafka-headless:9092"
54
- # BROKER_URL = "34.83.137.139:9094"
55
- producer = KafkaProducer(
56
- bootstrap_servers=[BROKER_URL],
57
- key_serializer=lambda x: dumps(x).encode("utf-8"),
58
- value_serializer=lambda x: dumps(x).encode("utf-8"),
59
- )
60
- use_kafka = True
61
- print("k8s kafka connection success!")
62
- except BaseException:
63
- print("k8s Kafka connection error")
64
- try:
65
- print("Connecting to local kafka")
66
- producer = KafkaProducer(
67
- bootstrap_servers=["localhost:9092"],
68
- key_serializer=lambda x: dumps(x).encode("utf-8"),
69
- value_serializer=lambda x: dumps(x).encode("utf-8"),
70
- )
71
- use_kafka = True
72
- print("local kafka connection success!")
73
- except BaseException:
74
- print("local Kafka connection error")
75
- print(f"Kafka status: {use_kafka}")
76
-
77
 
78
  def normalize_batch(data, window=3000):
79
  """
@@ -208,10 +171,7 @@ def get_prediction(data, return_preds=False):
208
 
209
 
210
  class Data(BaseModel):
211
- # id: Union[List[str], str]
212
- # timestamp: Union[List[str], str]
213
- # vec: Union[List[List[List[float]]], List[List[float]]]
214
- id: List[str]
215
  timestamp: List[Union[str, float, datetime]]
216
  vec: Union[List[List[List[float]]], List[List[float]]]
217
 
@@ -239,6 +199,13 @@ def predict(data: Data):
239
  return picks
240
 
241
 
 
 
 
 
 
 
 
242
  @app.websocket("/ws")
243
  async def websocket_endpoint(websocket: WebSocket):
244
  await websocket.accept()
@@ -251,95 +218,11 @@ async def websocket_endpoint(websocket: WebSocket):
251
  print("PhaseNet Updating...")
252
 
253
 
254
- @app.post("/predict_prob")
255
- def predict(data: Data):
256
- picks, preds = get_prediction(data, True)
257
-
258
- return picks, preds.tolist()
259
-
260
-
261
- @app.post("/predict_phasenet2gamma")
262
- def predict(data: Data):
263
- picks = get_prediction(data)
264
-
265
- # if use_kafka:
266
- # print("Push picks to kafka...")
267
- # for pick in picks:
268
- # producer.send("phasenet_picks", key=pick["id"], value=pick)
269
- try:
270
- catalog = requests.post(
271
- f"{GAMMA_API_URL}/predict", json={"picks": picks, "stations": data.stations, "config": data.config}
272
- )
273
- print(catalog.json()["catalog"])
274
- return catalog.json()
275
- except Exception as error:
276
- print(error)
277
-
278
- return {}
279
-
280
-
281
- @app.post("/predict_phasenet2gamma2ui")
282
- def predict(data: Data):
283
- picks = get_prediction(data)
284
-
285
- try:
286
- catalog = requests.post(
287
- f"{GAMMA_API_URL}/predict", json={"picks": picks, "stations": data.stations, "config": data.config}
288
- )
289
- print(catalog.json()["catalog"])
290
- return catalog.json()
291
- except Exception as error:
292
- print(error)
293
-
294
- if use_kafka:
295
- print("Push picks to kafka...")
296
- for pick in picks:
297
- producer.send("phasenet_picks", key=pick["id"], value=pick)
298
- print("Push waveform to kafka...")
299
- for id, timestamp, vec in zip(data.id, data.timestamp, data.vec):
300
- producer.send("waveform_phasenet", key=id, value={"timestamp": timestamp, "vec": vec, "dt": data.dt})
301
-
302
- return {}
303
-
304
-
305
- @app.post("/predict_stream_phasenet2gamma")
306
- def predict(data: Data):
307
- data = format_data(data)
308
- # for i in range(len(data.id)):
309
- # plt.clf()
310
- # plt.subplot(311)
311
- # plt.plot(np.array(data.vec)[i, :, 0])
312
- # plt.subplot(312)
313
- # plt.plot(np.array(data.vec)[i, :, 1])
314
- # plt.subplot(313)
315
- # plt.plot(np.array(data.vec)[i, :, 2])
316
- # plt.savefig(f"{data.id[i]}.png")
317
-
318
- picks = get_prediction(data)
319
-
320
- return_value = {}
321
- try:
322
- catalog = requests.post(f"{GAMMA_API_URL}/predict_stream", json={"picks": picks})
323
- print("GMMA:", catalog.json()["catalog"])
324
- return_value = catalog.json()
325
- except Exception as error:
326
- print(error)
327
-
328
- if use_kafka:
329
- print("Push picks to kafka...")
330
- for pick in picks:
331
- producer.send("phasenet_picks", key=pick["id"], value=pick)
332
- print("Push waveform to kafka...")
333
- for id, timestamp, vec in zip(data.id, data.timestamp, data.vec):
334
- producer.send("waveform_phasenet", key=id, value={"timestamp": timestamp, "vec": vec, "dt": data.dt})
335
-
336
- return return_value
337
-
338
-
339
  @app.get("/healthz")
340
  def healthz():
341
  return {"status": "ok"}
342
 
 
343
  @app.get("/")
344
- def read_root():
345
- return {"Hello": "PhaseNet!"}
 
1
  import os
2
+ from collections import defaultdict
3
  from datetime import datetime, timedelta
4
+ from typing import Any, AnyStr, Dict, List, NamedTuple, Optional, Union
 
5
 
6
  import numpy as np
 
7
  import tensorflow as tf
8
  from fastapi import FastAPI, WebSocket
9
+ from postprocess import extract_picks
10
  from pydantic import BaseModel
11
  from scipy.interpolate import interp1d
12
 
13
+ from model import UNet
14
+
15
+ PROJECT_ROOT = os.path.realpath(os.path.join(os.path.dirname(__file__), ".."))
16
 
17
  tf.compat.v1.disable_eager_execution()
18
  tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)
 
19
  JSONObject = Dict[AnyStr, Any]
20
  JSONArray = List[Any]
21
  JSONStructure = Union[JSONArray, JSONObject]
 
37
  print(f"restoring model {latest_check_point}")
38
  saver.restore(sess, latest_check_point)
39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
41
  def normalize_batch(data, window=3000):
42
  """
 
171
 
172
 
173
  class Data(BaseModel):
174
+ id: List[List[str]]
 
 
 
175
  timestamp: List[Union[str, float, datetime]]
176
  vec: Union[List[List[List[float]]], List[List[float]]]
177
 
 
199
  return picks
200
 
201
 
202
+ @app.post("/predict_prob")
203
+ def predict(data: Data):
204
+ picks, preds = get_prediction(data, True)
205
+
206
+ return picks, preds.tolist()
207
+
208
+
209
  @app.websocket("/ws")
210
  async def websocket_endpoint(websocket: WebSocket):
211
  await websocket.accept()
 
218
  print("PhaseNet Updating...")
219
 
220
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
221
  @app.get("/healthz")
222
  def healthz():
223
  return {"status": "ok"}
224
 
225
+
226
  @app.get("/")
227
+ def greet_json():
228
+ return {"Hello": "PhaseNet!"}