Join us at FabCon Atlanta from March 16 - 20, 2026, for the ultimate Fabric, Power BI, AI and SQL community-led event. Save $200 with code FABCOMM.
Register now!Calling all Data Engineers! Fabric Data Engineer (Exam DP-700) live sessions are back! Starting October 16th. Sign up.
I'm using Python Notebook runtime 3.11 specifically. I tried using magic command "%run " to run another workbook from within the master notebook but it doesn't work ๐. Since I want to split out some common functions I use quite often into a common notebook that can load the functions which I can then call in my main notebook without having to copy the same code into each notebook. But this approach would also make the reference notebook easier to maintain and test as it also uses notebookutils library which is only available in Fabric environment. I experimented with creating python whl packages but this is so much work and I would have to copy these files into every python notebook anyway so it is not a great solution, and developing any python libraries in separate IDE like VS Code is bit of headache I would rather avoid. I just want an easy way to reuse code in multiple python notebooks?
I tried using notebookutils.notebook.run but this doesn't work either:
Solved! Go to Solution.
Hi @MangoMagic,
I use the REST APIs to call other notebooks.
I have a function that uses the API to start a notebook, and then polls the status of the notebook to check for it to be finished, then I can do other things after the notebook is done.
Here's some code snippets:
def _handler_notebook(job: Dict[str, Any]) -> Tuple[int, str]:
ws_id = job.get("workspace_id")
item_id = job.get("item_id")
pname = job.get("process_name") or f"job_{job.get('job_id','?')}"
if not ws_id or not item_id:
return 1, "Missing workspace_id/item_id for notebook run."
params = {"RUN_ID": RUN_ID, "ENVIRONMENT": ENVIRONMENT, "LAYER": LAYER, "PROCESS_NAME": pname}
print(f"[RUN] NOTEBOOK {pname} ws={ws_id} item={item_id}")
rc, note = _run_notebook_with_fallback(ws_id, item_id, params)
print(f"[DONE] NOTEBOOK {pname} status={'success' if rc==0 else 'failed'}")
return rc, notedef _run_notebook_with_fallback(ws_id: str, item_id: str, params: Dict[str, Any]) -> Tuple[int, str]:
ji = _start_job_instance(ws_id, item_id, "RunNotebook", parameters=params)
final = _poll_job_instance_with_warmup(ws_id, item_id, ji, job_type="RunNotebook")
st = (final.get("status") or "").lower()
if st == "completed":
note = {"jobType":"RunNotebook","jobInstanceId":ji,"status":st}
return 0, json.dumps(note, default=str)
if _is_exec_state_not_found(final):
print("[FALLBACK] Retrying with jobType=Notebook")
ji2 = _start_job_instance(ws_id, item_id, "Notebook", parameters=params)
final2 = _poll_job_instance_with_warmup(ws_id, item_id, ji2, job_type="Notebook")
st2 = (final2.get("status") or "").lower()
note = {"jobType":"Notebook","jobInstanceId":ji2,"status":st2,
"previousAttempt":{"jobType":"RunNotebook","jobInstanceId":ji,"status":st,"failureReason":final.get("failureReason")}}
return (0 if st2=="completed" else 1), json.dumps(note, default=str)
note = {"jobType":"RunNotebook","jobInstanceId":ji,"status":st,"failureReason":final.get("failureReason")}
return (1, json.dumps(note, default=str))def _poll_job_instance_with_warmup(ws_id: str, item_id: str, job_instance_id: str, job_type: str = "RunNotebook") -> dict:
start = time.time()
tries = 0
print(f"[WARMUP] Sleeping {FIRST_POLL_DELAY_SEC}s before first poll (jobType={job_type}, jobInstanceId={job_instance_id})")
time.sleep(FIRST_POLL_DELAY_SEC)
while True:
st = _get_job_instance(ws_id, item_id, job_instance_id)
status = (st.get("status") or "").lower()
print(f"[POLL] {job_instance_id} status={status}")
if status in ("completed", "failed", "cancelled", "canceled", "deduped"):
if (job_type.lower() in ("runnotebook","notebook")
and status == "failed"
and _is_exec_state_not_found(st)
and (time.time() - start) < EXEC_STATE_WARMUP_SEC):
backoff = min(POLL_INTERVAL_SEC * max(2, tries + 1), 30)
print(f"[POLL] transient NotFound during warm-up; retrying in {backoff}s")
time.sleep(backoff); tries += 1; continue
return st
backoff = POLL_INTERVAL_SEC if tries < 3 else min(POLL_INTERVAL_SEC * 2, 30)
time.sleep(backoff); tries += 1
if time.time() - start > POLL_TIMEOUT_SEC:
raise TimeoutError(f"Timeout waiting for job {job_instance_id}; last state={st}")
If you found this helpful, consider giving some Kudos. If I answered your question or solved your problem, mark this post as the solution.
Hi @MangoMagic,
I use the REST APIs to call other notebooks.
I have a function that uses the API to start a notebook, and then polls the status of the notebook to check for it to be finished, then I can do other things after the notebook is done.
Here's some code snippets:
def _handler_notebook(job: Dict[str, Any]) -> Tuple[int, str]:
ws_id = job.get("workspace_id")
item_id = job.get("item_id")
pname = job.get("process_name") or f"job_{job.get('job_id','?')}"
if not ws_id or not item_id:
return 1, "Missing workspace_id/item_id for notebook run."
params = {"RUN_ID": RUN_ID, "ENVIRONMENT": ENVIRONMENT, "LAYER": LAYER, "PROCESS_NAME": pname}
print(f"[RUN] NOTEBOOK {pname} ws={ws_id} item={item_id}")
rc, note = _run_notebook_with_fallback(ws_id, item_id, params)
print(f"[DONE] NOTEBOOK {pname} status={'success' if rc==0 else 'failed'}")
return rc, notedef _run_notebook_with_fallback(ws_id: str, item_id: str, params: Dict[str, Any]) -> Tuple[int, str]:
ji = _start_job_instance(ws_id, item_id, "RunNotebook", parameters=params)
final = _poll_job_instance_with_warmup(ws_id, item_id, ji, job_type="RunNotebook")
st = (final.get("status") or "").lower()
if st == "completed":
note = {"jobType":"RunNotebook","jobInstanceId":ji,"status":st}
return 0, json.dumps(note, default=str)
if _is_exec_state_not_found(final):
print("[FALLBACK] Retrying with jobType=Notebook")
ji2 = _start_job_instance(ws_id, item_id, "Notebook", parameters=params)
final2 = _poll_job_instance_with_warmup(ws_id, item_id, ji2, job_type="Notebook")
st2 = (final2.get("status") or "").lower()
note = {"jobType":"Notebook","jobInstanceId":ji2,"status":st2,
"previousAttempt":{"jobType":"RunNotebook","jobInstanceId":ji,"status":st,"failureReason":final.get("failureReason")}}
return (0 if st2=="completed" else 1), json.dumps(note, default=str)
note = {"jobType":"RunNotebook","jobInstanceId":ji,"status":st,"failureReason":final.get("failureReason")}
return (1, json.dumps(note, default=str))def _poll_job_instance_with_warmup(ws_id: str, item_id: str, job_instance_id: str, job_type: str = "RunNotebook") -> dict:
start = time.time()
tries = 0
print(f"[WARMUP] Sleeping {FIRST_POLL_DELAY_SEC}s before first poll (jobType={job_type}, jobInstanceId={job_instance_id})")
time.sleep(FIRST_POLL_DELAY_SEC)
while True:
st = _get_job_instance(ws_id, item_id, job_instance_id)
status = (st.get("status") or "").lower()
print(f"[POLL] {job_instance_id} status={status}")
if status in ("completed", "failed", "cancelled", "canceled", "deduped"):
if (job_type.lower() in ("runnotebook","notebook")
and status == "failed"
and _is_exec_state_not_found(st)
and (time.time() - start) < EXEC_STATE_WARMUP_SEC):
backoff = min(POLL_INTERVAL_SEC * max(2, tries + 1), 30)
print(f"[POLL] transient NotFound during warm-up; retrying in {backoff}s")
time.sleep(backoff); tries += 1; continue
return st
backoff = POLL_INTERVAL_SEC if tries < 3 else min(POLL_INTERVAL_SEC * 2, 30)
time.sleep(backoff); tries += 1
if time.time() - start > POLL_TIMEOUT_SEC:
raise TimeoutError(f"Timeout waiting for job {job_instance_id}; last state={st}")
If you found this helpful, consider giving some Kudos. If I answered your question or solved your problem, mark this post as the solution.
Hi,
For your scenario, the simplest way to reuse functions from another notebook in Fabric is to use the %run magic command.
For example, suppose you have a separate logger notebook (NB_Slv_Logger) with multiple functions, like log(). You can call these functions from another notebook (e.g., your Lander notebook) as follows:
- At the very top of your other notebook, add:
%run NB_Slv_Logger
โ ๏ธNote: It must be the first line in your notebook code, before any other imports or code execution.
- Then, anywhere inside your notebook, simply call the functions from the logger notebook:
log(stage=stage_name, status="Separator", message="=", level="INFO", pipeline_name=pipeline_name)
This approach allows you to:
Itโs much simpler than creating Python packages or managing external libraries, especially when your functions depend on Fabric-specific modules like notebookutils.
Hi @MangoMagic
I wanted to check if you had the opportunity to review the information provided. Please feel free to contact us if you have any further questions.
Thank you.
Hi @MangoMagic
May I check if this issue has been resolved? If not, Please feel free to contact us if you have any further questions.
Thank you
hi, use %run command to invoke another notebook.
%run <notebook name>
https://learn.microsoft.com/en-us/fabric/data-engineering/author-execute-notebook
Hi @MangoMagic
As we havenโt heard back from you, we wanted to kindly follow up to check if the suggestions provided by the community members for the issue worked. Please feel free to contact us if you have any further questions.
Thanks and regards
| User | Count |
|---|---|
| 11 | |
| 6 | |
| 3 | |
| 3 | |
| 3 |