编辑:对代码进行了一些更改,回答了我之前提出的一些问题,但现在我有新问题
我对线程和并行编程非常陌生,所以如果我做了非常规的事情,请原谅我。
我有两个线程,我试图让它们相互通信。其中一个比另一个短,所以我想要的是,当较短的一个结束时,较长的一个也会结束。sentinel
通过传递和扫描某个对象(这里我使用)来标记任一线程的完成,我在这方面取得了部分成功,但我对此有几个问题。这是我正在运行的示例代码:
from threading import Thread
import time
from queue import Queue
sentinel = object()
q = Queue()
q.put(0)
def say_hello(subject, q):
print("starting hello")
for i in range(5):
time.sleep(2)
print(f"\nhello {subject}! iter:{i}")
data = q.get()
if data is sentinel:
break
else:
q.put(data)
print(f"from hello: {q.queue}")
if i == 4:
print("hello finished because of iter")
q.put(sentinel)
else:
print("hello finished because of sentinel from foo")
q.put(sentinel)
def foo(q):
print("starting foo")
for j in range(2):
time.sleep(1)
print(f"\nfoo iter:{j}")
data = q.get()
if data is sentinel:
break
else:
q.put(data)
print(f"from foo: {q.queue}")
if j == 1:
print("foo finished because of iter")
q.put(sentinel)
else:
print("foo finished because of sentinel from hello")
q.put(sentinel)
def t():
print("t:0\n")
for k in range(1,6):
time.sleep(1)
print(f"\nt:{k}")
def run():
time_thread = Thread(target = t)
time_thread.start()
hello_thread = Thread(target = say_hello, args = ["lem", q])
hello_thread.start()
foo_thread = Thread(target = foo, args = [q])
foo_thread.start()
time_thread.join()
hello_thread.join()
foo_thread.join()
print("Done")
run()
在这种情况下,foo
应该先结束,say_hello
然后将结束,这就是从输出中看到的。
这是输出:
t:0
starting hello
starting foo
t:1
foo iter:0
from foo: deque([0])
hello lem! iter:0
t:2
foo iter:1
from foo: deque([0])
foo finished because of iter
from hello: deque([<object object at 0x000001B9CF908EA0>, 0])
t:3
t:4
hello lem! iter:1
hello finished because of sentinel from foo
t:5
Done
现在我的问题是:
- 这是这样做的正确方法吗?是否有一种更简单、更干净、更传统的方法来完成同样的事情?
- 输出似乎有点不稳定,每次我重新运行它时它们都会发生轻微的变化
- 线程之间是否共享同名的局部变量?我之前运行了所有循环
i
,看起来它几乎影响了所有线程 - 似乎
say_hello
并没有完全按照我想要的时间戳运行。那里t:1
不应该有任何输出,say_hello
但确实有。更奇怪的是,这是在's 部分完成from hello: deque...
后才打印的。我一直怀疑他们应该像往常foo
一样成对打印foo
- 有没有一种方法可以让两个线程使用 来查看队列,看看是否有任何内容,如果没有,则跳过它而不会让系统引发错误,而不是
0
在队列的开头包含 a ?.get_nowait()
非常感谢任何和所有的输入。谢谢!
相反,我会问:“这是正确的做法吗?” 您正在强制线程轮流。你的主线程
0
有效地放入队列中的是一根会说话的棍子。“hello”线程和“foo”线程来回传递它,并且它们都不允许做任何有趣的事情,除非它们持有它。同时,在其他两个线程都完成之前,您的主线程不允许执行任何操作。你的程序决不允许多个线程同时做任何有趣的事情。您不妨在一个线程中完成所有事情。代码会更简单(可能出错的地方更少),如果效率很重要,那么它会更高效。
您可能想要使用“会说话的棒”架构的原因之一是,如果两个线程表现为协程,其中一个线程的进度与另一个线程相关,但它们的内部状态进度彼此不同。协程比其他一些替代表示形式更容易阅读,因为任何一个协程的状态都隐含在其执行上下文中,而不是显式编码在数据对象中。这更像是我们在初学者时被教导如何理解单线程程序。
尽管如此,我在您的代码中没有看到这种“不同步”的依赖关系。你的两个例程只是彼此步调一致地进行,直到其中一个告诉另一个,“我完成了!”
当两个线程被允许彼此同时运行时(你的线程做了一点),那么就不能保证它们执行各种操作的精确顺序。
对 Python 函数的任何给定调用的局部变量实际上与对同一函数的任何其他调用中的局部变量不同。
但是,您确定它们确实是局部变量吗?我对 Jupyter Notebook 一无所知,但听起来您正在使用REPL。如果您在 REPL 环境中创建一个名为的顶级变量
i
,然后调用一个引用为自由变量的函数i
,则该变量在该函数调用中不是本地i
变量。该函数将使用顶级变量。如果你的函数在使用之前进行分配 ,那么应该为每个函数调用创建一个本地变量,但就像我说的,我不了解 Jupyter,所以我只是问一个问题:“你确定它们真的是本地的吗? ”
i
i
i
我不会更深入地研究这件事,但在上面,我撒了一点谎。在将“说话棒”返回给另一个线程后,您的“hello”线程和“foo”线程实际上都做了一些事情。这意味着它们在部分时间内并发运行,并且就像我上面所说的那样,无法保证并发线程执行操作的顺序。
这就是所谓的轮询——持续检查某些外部事件,而不是进行“休眠”直到事件发生的系统调用。在某些情况下,这是一种有用的技术,但在与其他应用程序共享桌面、服务器或移动计算平台的应用程序代码中,它通常不受欢迎,因为它会消耗 CPU 周期,并使用过多的电力(如果您不这样做)带有盲目调用的轮询循环
sleep
),否则响应不足(如果您确实使用sleep
调用。)