import threadingimport timeimport randomfrom kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers=‘192.168.1.10:9092‘)threads = [] class MyThread(threading.Thread): def __init__(self, threadName, delay): threading.Thread.__init__(self) self.threadName=threadName self.delay=delay def run(self): sendinfo(self.threadName, self.delay) def sendinfo( threadName, delay): count = 0 while count < 5: time.sleep(delay) count += 1 data = "".join(random.sample( [‘a‘, ‘b‘, ‘c‘, ‘d‘, ‘e‘, ‘f‘, ‘g‘, ‘h‘, ‘i‘, ‘j‘, ‘k‘, ‘l‘, ‘m‘, ‘n‘, ‘o‘, ‘p‘, ‘q‘, ‘r‘, ‘s‘, ‘t‘, ‘u‘, ‘v‘, ‘w‘, ‘x‘, ‘y‘, ‘z‘], 10)).replace(" ", "") word=("%s, %s, %s, %s" % (threadName, count, data, time.ctime(time.time()))) producer.send(‘test‘, key=threadName, value=word) print (word) try: t1=MyThread("Thread-1",0) threads.append(t1) t2=MyThread("Thread-2",0) threads.append(t2) t3=MyThread("Thread-3",0) threads.append(t3) for t in threads: t.start() for t in threads: t.join() producer.send(‘test‘, key="Thread-1", value="exit") producer.send(‘test‘, key="Thread-2", value="exit") producer.send(‘test‘, key="Thread-3", value="exit") print ("exit program with 0")except: print ("Error: failed to run producer program")
时间: 2024-11-05 13:43:41